LCOV - code coverage report
Current view: top level - storage_controller/src - scheduler.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 89.3 % 1024 914
Test Date: 2025-02-20 13:11:02 Functions: 79.8 % 84 67

            Line data    Source code
       1              : use crate::{metrics::NodeLabelGroup, node::Node, tenant_shard::TenantShard};
       2              : use http_utils::error::ApiError;
       3              : use itertools::Itertools;
       4              : use pageserver_api::{controller_api::AvailabilityZone, models::PageserverUtilization};
       5              : use serde::Serialize;
       6              : use std::{collections::HashMap, fmt::Debug};
       7              : use utils::id::NodeId;
       8              : 
       9              : /// Scenarios in which we cannot find a suitable location for a tenant shard
      10              : #[derive(thiserror::Error, Debug)]
      11              : pub enum ScheduleError {
      12              :     #[error("No pageservers found")]
      13              :     NoPageservers,
      14              :     #[error("No pageserver found matching constraint")]
      15              :     ImpossibleConstraint,
      16              : }
      17              : 
      18              : impl From<ScheduleError> for ApiError {
      19            0 :     fn from(value: ScheduleError) -> Self {
      20            0 :         ApiError::Conflict(format!("Scheduling error: {}", value))
      21            0 :     }
      22              : }
      23              : 
      24              : #[derive(Serialize)]
      25              : pub enum MaySchedule {
      26              :     Yes(PageserverUtilization),
      27              :     No,
      28              : }
      29              : 
      30              : #[derive(Serialize)]
      31              : pub(crate) struct SchedulerNode {
      32              :     /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
      33              :     shard_count: usize,
      34              :     /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
      35              :     attached_shard_count: usize,
      36              :     /// How many shards have a location on this node (via [`crate::tenant_shard::IntentState`]) _and_ this node
      37              :     /// is in their preferred AZ (i.e. this is their 'home' location)
      38              :     home_shard_count: usize,
      39              :     /// Availability zone id in which the node resides
      40              :     az: AvailabilityZone,
      41              : 
      42              :     /// Whether this node is currently elegible to have new shards scheduled (this is derived
      43              :     /// from a node's availability state and scheduling policy).
      44              :     may_schedule: MaySchedule,
      45              : }
      46              : 
      47              : pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized {
      48              :     fn generate(
      49              :         node_id: &NodeId,
      50              :         node: &mut SchedulerNode,
      51              :         preferred_az: &Option<AvailabilityZone>,
      52              :         context: &ScheduleContext,
      53              :     ) -> Option<Self>;
      54              : 
      55              :     /// Return a score that drops any components based on node utilization: this is useful
      56              :     /// for finding scores for scheduling optimisation, when we want to avoid rescheduling
      57              :     /// shards due to e.g. disk usage, to avoid flapping.
      58              :     fn for_optimization(&self) -> Self;
      59              : 
      60              :     fn is_overloaded(&self) -> bool;
      61              :     fn node_id(&self) -> NodeId;
      62              : }
      63              : 
      64              : pub(crate) trait ShardTag {
      65              :     type Score: NodeSchedulingScore;
      66              : }
      67              : 
      68              : pub(crate) struct AttachedShardTag {}
      69              : impl ShardTag for AttachedShardTag {
      70              :     type Score = NodeAttachmentSchedulingScore;
      71              : }
      72              : 
      73              : pub(crate) struct SecondaryShardTag {}
      74              : impl ShardTag for SecondaryShardTag {
      75              :     type Score = NodeSecondarySchedulingScore;
      76              : }
      77              : 
      78              : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
      79              : enum AzMatch {
      80              :     Yes,
      81              :     No,
      82              :     Unknown,
      83              : }
      84              : 
      85              : impl AzMatch {
      86        91473 :     fn new(node_az: &AvailabilityZone, shard_preferred_az: Option<&AvailabilityZone>) -> Self {
      87        91301 :         match shard_preferred_az {
      88        91301 :             Some(preferred_az) if preferred_az == node_az => Self::Yes,
      89        52680 :             Some(_preferred_az) => Self::No,
      90          172 :             None => Self::Unknown,
      91              :         }
      92        91473 :     }
      93              : }
      94              : 
      95              : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
      96              : struct AttachmentAzMatch(AzMatch);
      97              : 
      98              : impl Ord for AttachmentAzMatch {
      99        65425 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
     100        65425 :         // Lower scores indicate a more suitable node.
     101        65425 :         // Note that we prefer a node for which we don't have
     102        65425 :         // info to a node which we are certain doesn't match the
     103        65425 :         // preferred AZ of the shard.
     104       130850 :         let az_match_score = |az_match: &AzMatch| match az_match {
     105        64133 :             AzMatch::Yes => 0,
     106          168 :             AzMatch::Unknown => 1,
     107        66549 :             AzMatch::No => 2,
     108       130850 :         };
     109              : 
     110        65425 :         az_match_score(&self.0).cmp(&az_match_score(&other.0))
     111        65425 :     }
     112              : }
     113              : 
     114              : impl PartialOrd for AttachmentAzMatch {
     115        65425 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
     116        65425 :         Some(self.cmp(other))
     117        65425 :     }
     118              : }
     119              : 
     120              : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
     121              : struct SecondaryAzMatch(AzMatch);
     122              : 
     123              : impl Ord for SecondaryAzMatch {
     124        36210 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
     125        36210 :         // Lower scores indicate a more suitable node.
     126        36210 :         // For secondary locations we wish to avoid the preferred AZ
     127        36210 :         // of the shard.
     128        72420 :         let az_match_score = |az_match: &AzMatch| match az_match {
     129        50434 :             AzMatch::No => 0,
     130           24 :             AzMatch::Unknown => 1,
     131        21962 :             AzMatch::Yes => 2,
     132        72420 :         };
     133              : 
     134        36210 :         az_match_score(&self.0).cmp(&az_match_score(&other.0))
     135        36210 :     }
     136              : }
     137              : 
     138              : impl PartialOrd for SecondaryAzMatch {
     139        36207 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
     140        36207 :         Some(self.cmp(other))
     141        36207 :     }
     142              : }
     143              : 
     144              : /// Scheduling score of a given node for shard attachments.
     145              : /// Lower scores indicate more suitable nodes.
     146              : /// Ordering is given by member declaration order (top to bottom).
     147              : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
     148              : pub(crate) struct NodeAttachmentSchedulingScore {
     149              :     /// Flag indicating whether this node matches the preferred AZ
     150              :     /// of the shard. For equal affinity scores, nodes in the matching AZ
     151              :     /// are considered first.
     152              :     az_match: AttachmentAzMatch,
     153              :     /// The number of shards belonging to the tenant currently being
     154              :     /// scheduled that are attached to this node.
     155              :     affinity_score: AffinityScore,
     156              :     /// Utilisation score that combines shard count and disk utilisation
     157              :     utilization_score: u64,
     158              :     /// Total number of shards attached to this node. When nodes have identical utilisation, this
     159              :     /// acts as an anti-affinity between attached shards.
     160              :     total_attached_shard_count: usize,
     161              :     /// Convenience to make selection deterministic in tests and empty systems
     162              :     node_id: NodeId,
     163              : }
     164              : 
     165              : impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
     166        52280 :     fn generate(
     167        52280 :         node_id: &NodeId,
     168        52280 :         node: &mut SchedulerNode,
     169        52280 :         preferred_az: &Option<AvailabilityZone>,
     170        52280 :         context: &ScheduleContext,
     171        52280 :     ) -> Option<Self> {
     172        52280 :         let utilization = match &mut node.may_schedule {
     173        52278 :             MaySchedule::Yes(u) => u,
     174              :             MaySchedule::No => {
     175            2 :                 return None;
     176              :             }
     177              :         };
     178              : 
     179        52278 :         Some(Self {
     180        52278 :             affinity_score: context
     181        52278 :                 .nodes
     182        52278 :                 .get(node_id)
     183        52278 :                 .copied()
     184        52278 :                 .unwrap_or(AffinityScore::FREE),
     185        52278 :             az_match: AttachmentAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
     186        52278 :             utilization_score: utilization.cached_score(),
     187        52278 :             total_attached_shard_count: node.attached_shard_count,
     188        52278 :             node_id: *node_id,
     189        52278 :         })
     190        52280 :     }
     191              : 
     192              :     /// For use in scheduling optimisation, where we only want to consider the aspects
     193              :     /// of the score that can only be resolved by moving things (such as inter-shard affinity
     194              :     /// and AZ affinity), and ignore aspects that reflect the total utilization of a node (which
     195              :     /// can fluctuate for other reasons)
     196          110 :     fn for_optimization(&self) -> Self {
     197          110 :         Self {
     198          110 :             utilization_score: 0,
     199          110 :             total_attached_shard_count: 0,
     200          110 :             node_id: NodeId(0),
     201          110 :             ..*self
     202          110 :         }
     203          110 :     }
     204              : 
     205        52168 :     fn is_overloaded(&self) -> bool {
     206        52168 :         PageserverUtilization::is_overloaded(self.utilization_score)
     207        52168 :     }
     208              : 
     209        12879 :     fn node_id(&self) -> NodeId {
     210        12879 :         self.node_id
     211        12879 :     }
     212              : }
     213              : 
     214              : /// Scheduling score of a given node for shard secondaries.
     215              : /// Lower scores indicate more suitable nodes.
     216              : /// Ordering is given by member declaration order (top to bottom).
     217              : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
     218              : pub(crate) struct NodeSecondarySchedulingScore {
     219              :     /// Flag indicating whether this node matches the preferred AZ
     220              :     /// of the shard. For secondary locations we wish to avoid nodes in.
     221              :     /// the preferred AZ of the shard, since that's where the attached location
     222              :     /// should be scheduled and having the secondary in the same AZ is bad for HA.
     223              :     az_match: SecondaryAzMatch,
     224              :     /// The number of shards belonging to the tenant currently being
     225              :     /// scheduled that are attached to this node.
     226              :     affinity_score: AffinityScore,
     227              :     /// Utilisation score that combines shard count and disk utilisation
     228              :     utilization_score: u64,
     229              :     /// Anti-affinity with other non-home locations: this gives the behavior that secondaries
     230              :     /// will spread out across the nodes in an AZ.
     231              :     total_non_home_shard_count: usize,
     232              :     /// Convenience to make selection deterministic in tests and empty systems
     233              :     node_id: NodeId,
     234              : }
     235              : 
     236              : impl NodeSchedulingScore for NodeSecondarySchedulingScore {
     237        39196 :     fn generate(
     238        39196 :         node_id: &NodeId,
     239        39196 :         node: &mut SchedulerNode,
     240        39196 :         preferred_az: &Option<AvailabilityZone>,
     241        39196 :         context: &ScheduleContext,
     242        39196 :     ) -> Option<Self> {
     243        39196 :         let utilization = match &mut node.may_schedule {
     244        39195 :             MaySchedule::Yes(u) => u,
     245              :             MaySchedule::No => {
     246            1 :                 return None;
     247              :             }
     248              :         };
     249              : 
     250        39195 :         Some(Self {
     251        39195 :             az_match: SecondaryAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
     252        39195 :             affinity_score: context
     253        39195 :                 .nodes
     254        39195 :                 .get(node_id)
     255        39195 :                 .copied()
     256        39195 :                 .unwrap_or(AffinityScore::FREE),
     257        39195 :             utilization_score: utilization.cached_score(),
     258        39195 :             total_non_home_shard_count: (node.shard_count - node.home_shard_count),
     259        39195 :             node_id: *node_id,
     260        39195 :         })
     261        39196 :     }
     262              : 
     263           14 :     fn for_optimization(&self) -> Self {
     264           14 :         Self {
     265           14 :             utilization_score: 0,
     266           14 :             total_non_home_shard_count: 0,
     267           14 :             node_id: NodeId(0),
     268           14 :             ..*self
     269           14 :         }
     270           14 :     }
     271              : 
     272        39172 :     fn is_overloaded(&self) -> bool {
     273        39172 :         PageserverUtilization::is_overloaded(self.utilization_score)
     274        39172 :     }
     275              : 
     276        12846 :     fn node_id(&self) -> NodeId {
     277        12846 :         self.node_id
     278        12846 :     }
     279              : }
     280              : 
     281              : impl PartialEq for SchedulerNode {
     282            3 :     fn eq(&self, other: &Self) -> bool {
     283            3 :         let may_schedule_matches = matches!(
     284            3 :             (&self.may_schedule, &other.may_schedule),
     285              :             (MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
     286              :         );
     287              : 
     288            3 :         may_schedule_matches
     289            3 :             && self.shard_count == other.shard_count
     290            3 :             && self.attached_shard_count == other.attached_shard_count
     291            3 :             && self.az == other.az
     292            3 :     }
     293              : }
     294              : 
     295              : impl Eq for SchedulerNode {}
     296              : 
     297              : /// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
     298              : /// on which to run.
     299              : ///
     300              : /// The type has no persistent state of its own: this is all populated at startup.  The Serialize
     301              : /// impl is only for debug dumps.
     302              : #[derive(Serialize)]
     303              : pub(crate) struct Scheduler {
     304              :     nodes: HashMap<NodeId, SchedulerNode>,
     305              : }
     306              : 
     307              : /// Score for soft constraint scheduling: lower scores are preferred to higher scores.
     308              : ///
     309              : /// For example, we may set an affinity score based on the number of shards from the same
     310              : /// tenant already on a node, to implicitly prefer to balance out shards.
     311              : #[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
     312              : pub(crate) struct AffinityScore(pub(crate) usize);
     313              : 
     314              : impl AffinityScore {
     315              :     /// If we have no anti-affinity at all toward a node, this is its score.  It means
     316              :     /// the scheduler has a free choice amongst nodes with this score, and may pick a node
     317              :     /// based on other information such as total utilization.
     318              :     pub(crate) const FREE: Self = Self(0);
     319              : 
     320        25773 :     pub(crate) fn inc(&mut self) {
     321        25773 :         self.0 += 1;
     322        25773 :     }
     323              : 
     324          118 :     pub(crate) fn dec(&mut self) {
     325          118 :         self.0 -= 1;
     326          118 :     }
     327              : }
     328              : 
     329              : impl std::ops::Add for AffinityScore {
     330              :     type Output = Self;
     331              : 
     332            0 :     fn add(self, rhs: Self) -> Self::Output {
     333            0 :         Self(self.0 + rhs.0)
     334            0 :     }
     335              : }
     336              : 
     337              : /// Hint for whether this is a sincere attempt to schedule, or a speculative
     338              : /// check for where we _would_ schedule (done during optimization)
     339              : #[derive(Debug, Clone)]
     340              : pub(crate) enum ScheduleMode {
     341              :     Normal,
     342              :     Speculative,
     343              : }
     344              : 
     345              : impl Default for ScheduleMode {
     346         5330 :     fn default() -> Self {
     347         5330 :         Self::Normal
     348         5330 :     }
     349              : }
     350              : 
     351              : // For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
     352              : // it for many shards in the same tenant.
     353              : #[derive(Debug, Default, Clone)]
     354              : pub(crate) struct ScheduleContext {
     355              :     /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
     356              :     pub(crate) nodes: HashMap<NodeId, AffinityScore>,
     357              : 
     358              :     pub(crate) mode: ScheduleMode,
     359              : }
     360              : 
     361              : impl ScheduleContext {
     362            3 :     pub(crate) fn new(mode: ScheduleMode) -> Self {
     363            3 :         Self {
     364            3 :             nodes: HashMap::new(),
     365            3 :             mode,
     366            3 :         }
     367            3 :     }
     368              : 
     369              :     /// Input is a list of nodes we would like to avoid using again within this context.  The more
     370              :     /// times a node is passed into this call, the less inclined we are to use it.
     371        12896 :     pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
     372        38669 :         for node_id in nodes {
     373        25773 :             let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
     374        25773 :             entry.inc()
     375              :         }
     376        12896 :     }
     377              : 
     378              :     /// Remove `shard`'s contributions to this context.  This is useful when considering scheduling
     379              :     /// this shard afresh, where we don't want it to e.g. experience anti-affinity to its current location.
     380           60 :     pub(crate) fn project_detach(&self, shard: &TenantShard) -> Self {
     381           60 :         let mut new_context = self.clone();
     382              : 
     383           60 :         if let Some(attached) = shard.intent.get_attached() {
     384           57 :             if let Some(score) = new_context.nodes.get_mut(attached) {
     385           57 :                 score.dec();
     386           57 :             }
     387            3 :         }
     388              : 
     389           63 :         for secondary in shard.intent.get_secondary() {
     390           63 :             if let Some(score) = new_context.nodes.get_mut(secondary) {
     391           61 :                 score.dec();
     392           61 :             }
     393              :         }
     394              : 
     395           60 :         new_context
     396           60 :     }
     397              : 
     398              :     /// For test, track the sum of AffinityScore values, which is effectively how many
     399              :     /// attached or secondary locations have been registered with this context.
     400              :     #[cfg(test)]
     401            3 :     pub(crate) fn location_count(&self) -> usize {
     402            7 :         self.nodes.values().map(|i| i.0).sum()
     403            3 :     }
     404              : }
     405              : 
     406              : pub(crate) enum RefCountUpdate {
     407              :     PromoteSecondary,
     408              :     Attach,
     409              :     Detach,
     410              :     DemoteAttached,
     411              :     AddSecondary,
     412              :     RemoveSecondary,
     413              : }
     414              : 
     415              : impl Scheduler {
     416           68 :     pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
     417           68 :         let mut scheduler_nodes = HashMap::new();
     418          129 :         for node in nodes {
     419           61 :             scheduler_nodes.insert(
     420           61 :                 node.get_id(),
     421           61 :                 SchedulerNode {
     422           61 :                     shard_count: 0,
     423           61 :                     attached_shard_count: 0,
     424           61 :                     home_shard_count: 0,
     425           61 :                     may_schedule: node.may_schedule(),
     426           61 :                     az: node.get_availability_zone_id().clone(),
     427           61 :                 },
     428           61 :             );
     429           61 :         }
     430              : 
     431           68 :         Self {
     432           68 :             nodes: scheduler_nodes,
     433           68 :         }
     434           68 :     }
     435              : 
     436              :     /// For debug/support: check that our internal statistics are in sync with the state of
     437              :     /// the nodes & tenant shards.
     438              :     ///
     439              :     /// If anything is inconsistent, log details and return an error.
     440            1 :     pub(crate) fn consistency_check<'a>(
     441            1 :         &self,
     442            1 :         nodes: impl Iterator<Item = &'a Node>,
     443            1 :         shards: impl Iterator<Item = &'a TenantShard>,
     444            1 :     ) -> anyhow::Result<()> {
     445            1 :         let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
     446            4 :         for node in nodes {
     447            3 :             expect_nodes.insert(
     448            3 :                 node.get_id(),
     449            3 :                 SchedulerNode {
     450            3 :                     shard_count: 0,
     451            3 :                     attached_shard_count: 0,
     452            3 :                     home_shard_count: 0,
     453            3 :                     may_schedule: node.may_schedule(),
     454            3 :                     az: node.get_availability_zone_id().clone(),
     455            3 :                 },
     456            3 :             );
     457            3 :         }
     458              : 
     459            2 :         for shard in shards {
     460            1 :             if let Some(node_id) = shard.intent.get_attached() {
     461            1 :                 match expect_nodes.get_mut(node_id) {
     462            1 :                     Some(node) => {
     463            1 :                         node.shard_count += 1;
     464            1 :                         node.attached_shard_count += 1;
     465            1 :                         if Some(&node.az) == shard.preferred_az() {
     466            0 :                             node.home_shard_count += 1;
     467            1 :                         }
     468              :                     }
     469            0 :                     None => anyhow::bail!(
     470            0 :                         "Tenant {} references nonexistent node {}",
     471            0 :                         shard.tenant_shard_id,
     472            0 :                         node_id
     473            0 :                     ),
     474              :                 }
     475            0 :             }
     476              : 
     477            1 :             for node_id in shard.intent.get_secondary() {
     478            1 :                 match expect_nodes.get_mut(node_id) {
     479            1 :                     Some(node) => {
     480            1 :                         node.shard_count += 1;
     481            1 :                         if Some(&node.az) == shard.preferred_az() {
     482            0 :                             node.home_shard_count += 1;
     483            1 :                         }
     484              :                     }
     485            0 :                     None => anyhow::bail!(
     486            0 :                         "Tenant {} references nonexistent node {}",
     487            0 :                         shard.tenant_shard_id,
     488            0 :                         node_id
     489            0 :                     ),
     490              :                 }
     491              :             }
     492              :         }
     493              : 
     494            4 :         for (node_id, expect_node) in &expect_nodes {
     495            3 :             let Some(self_node) = self.nodes.get(node_id) else {
     496            0 :                 anyhow::bail!("Node {node_id} not found in Self")
     497              :             };
     498              : 
     499            3 :             if self_node != expect_node {
     500            0 :                 tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
     501            0 :                 tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
     502            0 :                 tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
     503              : 
     504            0 :                 anyhow::bail!("Inconsistent state on {node_id}");
     505            3 :             }
     506              :         }
     507              : 
     508            1 :         if expect_nodes.len() != self.nodes.len() {
     509              :             // We just checked that all the expected nodes are present.  If the lengths don't match,
     510              :             // it means that we have nodes in Self that are unexpected.
     511            0 :             for node_id in self.nodes.keys() {
     512            0 :                 if !expect_nodes.contains_key(node_id) {
     513            0 :                     anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
     514            0 :                 }
     515              :             }
     516            1 :         }
     517              : 
     518            1 :         Ok(())
     519            1 :     }
     520              : 
     521              :     /// Update the reference counts of a node. These reference counts are used to guide scheduling
     522              :     /// decisions, not for memory management: they represent the number of tenant shard whose IntentState
     523              :     /// targets this node and the number of tenants shars whose IntentState is attached to this
     524              :     /// node.
     525              :     ///
     526              :     /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
     527              :     /// [`Self::new`] or [`Self::node_upsert`])
     528        51416 :     pub(crate) fn update_node_ref_counts(
     529        51416 :         &mut self,
     530        51416 :         node_id: NodeId,
     531        51416 :         preferred_az: Option<&AvailabilityZone>,
     532        51416 :         update: RefCountUpdate,
     533        51416 :     ) {
     534        51416 :         let Some(node) = self.nodes.get_mut(&node_id) else {
     535            0 :             debug_assert!(false);
     536            0 :             tracing::error!("Scheduler missing node {node_id}");
     537            0 :             return;
     538              :         };
     539              : 
     540        51416 :         let is_home_az = Some(&node.az) == preferred_az;
     541        51416 : 
     542        51416 :         match update {
     543            6 :             RefCountUpdate::PromoteSecondary => {
     544            6 :                 node.attached_shard_count += 1;
     545            6 :             }
     546              :             RefCountUpdate::Attach => {
     547        12857 :                 node.shard_count += 1;
     548        12857 :                 node.attached_shard_count += 1;
     549        12857 :                 if is_home_az {
     550        12817 :                     node.home_shard_count += 1;
     551        12817 :                 }
     552              :             }
     553              :             RefCountUpdate::Detach => {
     554        12855 :                 node.shard_count -= 1;
     555        12855 :                 node.attached_shard_count -= 1;
     556        12855 :                 if is_home_az {
     557        12818 :                     node.home_shard_count -= 1;
     558        12818 :                 }
     559              :             }
     560            7 :             RefCountUpdate::DemoteAttached => {
     561            7 :                 node.attached_shard_count -= 1;
     562            7 :             }
     563              :             RefCountUpdate::AddSecondary => {
     564        12845 :                 node.shard_count += 1;
     565        12845 :                 if is_home_az {
     566            5 :                     node.home_shard_count += 1;
     567        12840 :                 }
     568              :             }
     569              :             RefCountUpdate::RemoveSecondary => {
     570        12846 :                 node.shard_count -= 1;
     571        12846 :                 if is_home_az {
     572            4 :                     node.home_shard_count -= 1;
     573        12842 :                 }
     574              :             }
     575              :         }
     576              : 
     577              :         // Maybe update PageserverUtilization
     578        51416 :         match update {
     579              :             RefCountUpdate::AddSecondary | RefCountUpdate::Attach => {
     580              :                 // Referencing the node: if this takes our shard_count above the utilzation structure's
     581              :                 // shard count, then artifically bump it: this ensures that the scheduler immediately
     582              :                 // recognizes that this node has more work on it, without waiting for the next heartbeat
     583              :                 // to update the utilization.
     584        25702 :                 if let MaySchedule::Yes(utilization) = &mut node.may_schedule {
     585        25702 :                     utilization.adjust_shard_count_max(node.shard_count as u32);
     586        25702 :                 }
     587              :             }
     588              :             RefCountUpdate::PromoteSecondary
     589              :             | RefCountUpdate::Detach
     590              :             | RefCountUpdate::RemoveSecondary
     591        25714 :             | RefCountUpdate::DemoteAttached => {
     592        25714 :                 // De-referencing the node: leave the utilization's shard_count at a stale higher
     593        25714 :                 // value until some future heartbeat after we have physically removed this shard
     594        25714 :                 // from the node: this prevents the scheduler over-optimistically trying to schedule
     595        25714 :                 // more work onto the node before earlier detaches are done.
     596        25714 :             }
     597              :         }
     598        51416 :     }
     599              : 
     600              :     // Check if the number of shards attached to a given node is lagging below
     601              :     // the cluster average. If that's the case, the node should be filled.
     602            0 :     pub(crate) fn compute_fill_requirement(&self, node_id: NodeId) -> usize {
     603            0 :         let Some(node) = self.nodes.get(&node_id) else {
     604            0 :             debug_assert!(false);
     605            0 :             tracing::error!("Scheduler missing node {node_id}");
     606            0 :             return 0;
     607              :         };
     608            0 :         assert!(!self.nodes.is_empty());
     609            0 :         let expected_attached_shards_per_node = self.expected_attached_shard_count();
     610              : 
     611            0 :         for (node_id, node) in self.nodes.iter() {
     612            0 :             tracing::trace!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
     613              :         }
     614              : 
     615            0 :         if node.attached_shard_count < expected_attached_shards_per_node {
     616            0 :             expected_attached_shards_per_node - node.attached_shard_count
     617              :         } else {
     618            0 :             0
     619              :         }
     620            0 :     }
     621              : 
     622            0 :     pub(crate) fn expected_attached_shard_count(&self) -> usize {
     623            0 :         let total_attached_shards: usize =
     624            0 :             self.nodes.values().map(|n| n.attached_shard_count).sum();
     625            0 : 
     626            0 :         assert!(!self.nodes.is_empty());
     627            0 :         total_attached_shards / self.nodes.len()
     628            0 :     }
     629              : 
     630            0 :     pub(crate) fn nodes_by_attached_shard_count(&self) -> Vec<(NodeId, usize)> {
     631            0 :         self.nodes
     632            0 :             .iter()
     633            0 :             .map(|(node_id, stats)| (*node_id, stats.attached_shard_count))
     634            0 :             .sorted_by(|lhs, rhs| Ord::cmp(&lhs.1, &rhs.1).reverse())
     635            0 :             .collect()
     636            0 :     }
     637              : 
     638          215 :     pub(crate) fn node_upsert(&mut self, node: &Node) {
     639              :         use std::collections::hash_map::Entry::*;
     640          215 :         match self.nodes.entry(node.get_id()) {
     641            4 :             Occupied(mut entry) => {
     642            4 :                 // Updates to MaySchedule are how we receive updated PageserverUtilization: adjust these values
     643            4 :                 // to account for any shards scheduled on the controller but not yet visible to the pageserver.
     644            4 :                 let mut may_schedule = node.may_schedule();
     645            4 :                 match &mut may_schedule {
     646            2 :                     MaySchedule::Yes(utilization) => {
     647            2 :                         utilization.adjust_shard_count_max(entry.get().shard_count as u32);
     648            2 :                     }
     649            2 :                     MaySchedule::No => { // Nothing to tweak
     650            2 :                     }
     651              :                 }
     652              : 
     653            4 :                 entry.get_mut().may_schedule = may_schedule;
     654              :             }
     655          211 :             Vacant(entry) => {
     656          211 :                 entry.insert(SchedulerNode {
     657          211 :                     shard_count: 0,
     658          211 :                     attached_shard_count: 0,
     659          211 :                     home_shard_count: 0,
     660          211 :                     may_schedule: node.may_schedule(),
     661          211 :                     az: node.get_availability_zone_id().clone(),
     662          211 :                 });
     663          211 :             }
     664              :         }
     665          215 :     }
     666              : 
     667            0 :     pub(crate) fn node_remove(&mut self, node_id: NodeId) {
     668            0 :         if self.nodes.remove(&node_id).is_none() {
     669            0 :             tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
     670            0 :         }
     671            0 :     }
     672              : 
     673              :     /// Calculate a single node's score, used in optimizer logic to compare specific
     674              :     /// nodes' scores.
     675          135 :     pub(crate) fn compute_node_score<Score>(
     676          135 :         &mut self,
     677          135 :         node_id: NodeId,
     678          135 :         preferred_az: &Option<AvailabilityZone>,
     679          135 :         context: &ScheduleContext,
     680          135 :     ) -> Option<Score>
     681          135 :     where
     682          135 :         Score: NodeSchedulingScore,
     683          135 :     {
     684          135 :         self.nodes
     685          135 :             .get_mut(&node_id)
     686          135 :             .and_then(|node| Score::generate(&node_id, node, preferred_az, context))
     687          135 :     }
     688              : 
     689              :     /// Compute a schedulling score for each node that the scheduler knows of
     690              :     /// minus a set of hard excluded nodes.
     691        25725 :     fn compute_node_scores<Score>(
     692        25725 :         &mut self,
     693        25725 :         hard_exclude: &[NodeId],
     694        25725 :         preferred_az: &Option<AvailabilityZone>,
     695        25725 :         context: &ScheduleContext,
     696        25725 :     ) -> Vec<Score>
     697        25725 :     where
     698        25725 :         Score: NodeSchedulingScore,
     699        25725 :     {
     700        25725 :         self.nodes
     701        25725 :             .iter_mut()
     702       104187 :             .filter_map(|(k, v)| {
     703       104187 :                 if hard_exclude.contains(k) {
     704        12846 :                     None
     705              :                 } else {
     706        91341 :                     Score::generate(k, v, preferred_az, context)
     707              :                 }
     708       104187 :             })
     709        25725 :             .collect()
     710        25725 :     }
     711              : 
     712              :     /// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
     713              :     /// are already in use by this shard -- we use this to avoid picking the same node
     714              :     /// as both attached and secondary location.  This is a hard constraint: if we cannot
     715              :     /// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
     716              :     ///
     717              :     /// context: we prefer to avoid using nodes identified in the context, according
     718              :     /// to their anti-affinity score.  We use this to prefeer to avoid placing shards in
     719              :     /// the same tenant on the same node.  This is a soft constraint: the context will never
     720              :     /// cause us to fail to schedule a shard.
     721        25725 :     pub(crate) fn schedule_shard<Tag: ShardTag>(
     722        25725 :         &mut self,
     723        25725 :         hard_exclude: &[NodeId],
     724        25725 :         preferred_az: &Option<AvailabilityZone>,
     725        25725 :         context: &ScheduleContext,
     726        25725 :     ) -> Result<NodeId, ScheduleError> {
     727        25725 :         if self.nodes.is_empty() {
     728            0 :             return Err(ScheduleError::NoPageservers);
     729        25725 :         }
     730        25725 : 
     731        25725 :         let mut scores =
     732        25725 :             self.compute_node_scores::<Tag::Score>(hard_exclude, preferred_az, context);
     733        25725 : 
     734        25725 :         // Exclude nodes whose utilization is critically high, if there are alternatives available.  This will
     735        25725 :         // cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example
     736        25725 :         // we may place shards in the same tenant together on the same pageserver if all other pageservers are
     737        25725 :         // overloaded.
     738        25725 :         let non_overloaded_scores = scores
     739        25725 :             .iter()
     740        91340 :             .filter(|i| !i.is_overloaded())
     741        25725 :             .copied()
     742        25725 :             .collect::<Vec<_>>();
     743        25725 :         if !non_overloaded_scores.is_empty() {
     744        25725 :             scores = non_overloaded_scores;
     745        25725 :         }
     746              : 
     747              :         // Sort the nodes by score. The one with the lowest scores will be the preferred node.
     748              :         // Refer to [`NodeAttachmentSchedulingScore`] for attached locations and
     749              :         // [`NodeSecondarySchedulingScore`] for secondary locations to understand how the nodes
     750              :         // are ranked.
     751        25725 :         scores.sort();
     752        25725 : 
     753        25725 :         if scores.is_empty() {
     754              :             // After applying constraints, no pageservers were left.
     755            0 :             if !matches!(context.mode, ScheduleMode::Speculative) {
     756              :                 // If this was not a speculative attempt, log details to understand why we couldn't
     757              :                 // schedule: this may help an engineer understand if some nodes are marked offline
     758              :                 // in a way that's preventing progress.
     759            0 :                 tracing::info!(
     760            0 :                     "Scheduling failure, while excluding {hard_exclude:?}, node states:"
     761              :                 );
     762            0 :                 for (node_id, node) in &self.nodes {
     763            0 :                     tracing::info!(
     764            0 :                         "Node {node_id}: may_schedule={} shards={}",
     765            0 :                         !matches!(node.may_schedule, MaySchedule::No),
     766              :                         node.shard_count
     767              :                     );
     768              :                 }
     769            0 :             }
     770            0 :             return Err(ScheduleError::ImpossibleConstraint);
     771        25725 :         }
     772        25725 : 
     773        25725 :         // Lowest score wins
     774        25725 :         let node_id = scores.first().unwrap().node_id();
     775              : 
     776        25725 :         if !matches!(context.mode, ScheduleMode::Speculative) {
     777        25725 :             tracing::info!(
     778            0 :             "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?}, preferred_az: {:?})",
     779            0 :             scores.iter().map(|i| i.node_id().0).collect::<Vec<_>>(),
     780              :             preferred_az,
     781              :        );
     782            0 :         }
     783              : 
     784              :         // Note that we do not update shard count here to reflect the scheduling: that
     785              :         // is IntentState's job when the scheduled location is used.
     786              : 
     787        25725 :         Ok(node_id)
     788        25725 :     }
     789              : 
     790              :     /// Selects any available node. This is suitable for performing background work (e.g. S3
     791              :     /// deletions).
     792            0 :     pub(crate) fn any_available_node(&mut self) -> Result<NodeId, ScheduleError> {
     793            0 :         self.schedule_shard::<AttachedShardTag>(&[], &None, &ScheduleContext::default())
     794            0 :     }
     795              : 
     796              :     /// For choosing which AZ to schedule a new shard into, use this.  It will return the
     797              :     /// AZ with the the lowest number of shards currently scheduled in this AZ as their home
     798              :     /// location.
     799              :     ///
     800              :     /// We use an AZ-wide measure rather than simply selecting the AZ of the least-loaded
     801              :     /// node, because while tenants start out single sharded, when they grow and undergo
     802              :     /// shard-split, they will occupy space on many nodes within an AZ.  It is important
     803              :     /// that we pick the AZ in a way that balances this _future_ load.
     804              :     ///
     805              :     /// Once we've picked an AZ, subsequent scheduling within that AZ will be driven by
     806              :     /// nodes' utilization scores.
     807          303 :     pub(crate) fn get_az_for_new_tenant(&self) -> Option<AvailabilityZone> {
     808          303 :         if self.nodes.is_empty() {
     809            0 :             return None;
     810          303 :         }
     811              : 
     812              :         #[derive(Default)]
     813              :         struct AzScore {
     814              :             home_shard_count: usize,
     815              :             scheduleable: bool,
     816              :         }
     817              : 
     818          303 :         let mut azs: HashMap<&AvailabilityZone, AzScore> = HashMap::new();
     819         1818 :         for node in self.nodes.values() {
     820         1818 :             let az = azs.entry(&node.az).or_default();
     821         1818 :             az.home_shard_count += node.home_shard_count;
     822         1818 :             az.scheduleable |= matches!(node.may_schedule, MaySchedule::Yes(_));
     823              :         }
     824              : 
     825              :         // If any AZs are schedulable, then filter out the non-schedulable ones (i.e. AZs where
     826              :         // all nodes are overloaded or otherwise unschedulable).
     827          303 :         if azs.values().any(|i| i.scheduleable) {
     828          906 :             azs.retain(|_, i| i.scheduleable);
     829          303 :         }
     830              : 
     831              :         // Find the AZ with the lowest number of shards currently allocated
     832          303 :         Some(
     833          303 :             azs.into_iter()
     834          906 :                 .min_by_key(|i| (i.1.home_shard_count, i.0))
     835          303 :                 .unwrap()
     836          303 :                 .0
     837          303 :                 .clone(),
     838          303 :         )
     839          303 :     }
     840              : 
     841            9 :     pub(crate) fn get_node_az(&self, node_id: &NodeId) -> Option<AvailabilityZone> {
     842            9 :         self.nodes.get(node_id).map(|n| n.az.clone())
     843            9 :     }
     844              : 
     845              :     /// For use when choosing a preferred secondary location: filter out nodes that are not
     846              :     /// available, and gather their AZs.
     847        12831 :     pub(crate) fn filter_usable_nodes(
     848        12831 :         &self,
     849        12831 :         nodes: &[NodeId],
     850        12831 :     ) -> Vec<(NodeId, Option<AvailabilityZone>)> {
     851        12831 :         nodes
     852        12831 :             .iter()
     853        12831 :             .filter_map(|node_id| {
     854            3 :                 let node = self
     855            3 :                     .nodes
     856            3 :                     .get(node_id)
     857            3 :                     .expect("Referenced nodes always exist");
     858            3 :                 if matches!(node.may_schedule, MaySchedule::Yes(_)) {
     859            2 :                     Some((*node_id, Some(node.az.clone())))
     860              :                 } else {
     861            1 :                     None
     862              :                 }
     863        12831 :             })
     864        12831 :             .collect()
     865        12831 :     }
     866              : 
     867              :     /// Unit test access to internal state
     868              :     #[cfg(test)]
     869           19 :     pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
     870           19 :         self.nodes.get(&node_id).unwrap().shard_count
     871           19 :     }
     872              : 
     873              :     #[cfg(test)]
     874           19 :     pub(crate) fn get_node_attached_shard_count(&self, node_id: NodeId) -> usize {
     875           19 :         self.nodes.get(&node_id).unwrap().attached_shard_count
     876           19 :     }
     877              : 
     878              :     /// Some metrics that we only calculate periodically: this is simpler than
     879              :     /// rigorously updating them on every change.
     880            0 :     pub(crate) fn update_metrics(&self) {
     881            0 :         for (node_id, node) in &self.nodes {
     882            0 :             let node_id_str = format!("{}", node_id);
     883            0 :             let label_group = NodeLabelGroup {
     884            0 :                 az: &node.az.0,
     885            0 :                 node_id: &node_id_str,
     886            0 :             };
     887            0 : 
     888            0 :             crate::metrics::METRICS_REGISTRY
     889            0 :                 .metrics_group
     890            0 :                 .storage_controller_node_shards
     891            0 :                 .set(label_group.clone(), node.shard_count as i64);
     892            0 : 
     893            0 :             crate::metrics::METRICS_REGISTRY
     894            0 :                 .metrics_group
     895            0 :                 .storage_controller_node_attached_shards
     896            0 :                 .set(label_group.clone(), node.attached_shard_count as i64);
     897            0 : 
     898            0 :             crate::metrics::METRICS_REGISTRY
     899            0 :                 .metrics_group
     900            0 :                 .storage_controller_node_home_shards
     901            0 :                 .set(label_group.clone(), node.home_shard_count as i64);
     902            0 :         }
     903            0 :     }
     904              : }
     905              : 
     906              : #[cfg(test)]
     907              : pub(crate) mod test_utils {
     908              : 
     909              :     use crate::node::Node;
     910              :     use pageserver_api::{
     911              :         controller_api::{AvailabilityZone, NodeAvailability},
     912              :         models::utilization::test_utilization,
     913              :     };
     914              :     use std::collections::HashMap;
     915              :     use utils::id::NodeId;
     916              : 
     917              :     /// Test helper: synthesize the requested number of nodes, all in active state.
     918              :     ///
     919              :     /// Node IDs start at one.
     920              :     ///
     921              :     /// The `azs` argument specifies the list of availability zones which will be assigned
     922              :     /// to nodes in round-robin fashion. If empy, a default AZ is assigned.
     923           68 :     pub(crate) fn make_test_nodes(n: u64, azs: &[AvailabilityZone]) -> HashMap<NodeId, Node> {
     924           68 :         let mut az_iter = azs.iter().cycle();
     925           68 : 
     926           68 :         (1..n + 1)
     927          272 :             .map(|i| {
     928          272 :                 (NodeId(i), {
     929          272 :                     let mut node = Node::new(
     930          272 :                         NodeId(i),
     931          272 :                         format!("httphost-{i}"),
     932          272 :                         80 + i as u16,
     933          272 :                         format!("pghost-{i}"),
     934          272 :                         5432 + i as u16,
     935          272 :                         az_iter
     936          272 :                             .next()
     937          272 :                             .cloned()
     938          272 :                             .unwrap_or(AvailabilityZone("test-az".to_string())),
     939          272 :                     );
     940          272 :                     node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
     941          272 :                     assert!(node.is_available());
     942          272 :                     node
     943          272 :                 })
     944          272 :             })
     945           68 :             .collect()
     946           68 :     }
     947              : }
     948              : 
     949              : #[cfg(test)]
     950              : mod tests {
     951              :     use pageserver_api::{
     952              :         controller_api::NodeAvailability, models::utilization::test_utilization,
     953              :         shard::ShardIdentity,
     954              :     };
     955              :     use utils::{
     956              :         id::TenantId,
     957              :         shard::{ShardCount, ShardNumber, TenantShardId},
     958              :     };
     959              : 
     960              :     use super::*;
     961              : 
     962              :     use crate::tenant_shard::IntentState;
     963              :     #[test]
     964            1 :     fn scheduler_basic() -> anyhow::Result<()> {
     965            1 :         let nodes = test_utils::make_test_nodes(2, &[]);
     966            1 : 
     967            1 :         let mut scheduler = Scheduler::new(nodes.values());
     968            1 :         let mut t1_intent = IntentState::new(None);
     969            1 :         let mut t2_intent = IntentState::new(None);
     970            1 : 
     971            1 :         let context = ScheduleContext::default();
     972              : 
     973            1 :         let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
     974            1 :         t1_intent.set_attached(&mut scheduler, Some(scheduled));
     975            1 :         let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
     976            1 :         t2_intent.set_attached(&mut scheduler, Some(scheduled));
     977            1 : 
     978            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
     979            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
     980              : 
     981            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
     982            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
     983              : 
     984            1 :         let scheduled = scheduler.schedule_shard::<AttachedShardTag>(
     985            1 :             &t1_intent.all_pageservers(),
     986            1 :             &None,
     987            1 :             &context,
     988            1 :         )?;
     989            1 :         t1_intent.push_secondary(&mut scheduler, scheduled);
     990            1 : 
     991            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
     992            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
     993              : 
     994            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
     995            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
     996              : 
     997            1 :         t1_intent.clear(&mut scheduler);
     998            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
     999            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
    1000              : 
    1001            1 :         let total_attached = scheduler.get_node_attached_shard_count(NodeId(1))
    1002            1 :             + scheduler.get_node_attached_shard_count(NodeId(2));
    1003            1 :         assert_eq!(total_attached, 1);
    1004              : 
    1005            1 :         if cfg!(debug_assertions) {
    1006              :             // Dropping an IntentState without clearing it causes a panic in debug mode,
    1007              :             // because we have failed to properly update scheduler shard counts.
    1008            1 :             let result = std::panic::catch_unwind(move || {
    1009            1 :                 drop(t2_intent);
    1010            1 :             });
    1011            1 :             assert!(result.is_err());
    1012              :         } else {
    1013            0 :             t2_intent.clear(&mut scheduler);
    1014            0 : 
    1015            0 :             assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
    1016            0 :             assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 0);
    1017              : 
    1018            0 :             assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 0);
    1019            0 :             assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0);
    1020              :         }
    1021              : 
    1022            1 :         Ok(())
    1023            1 :     }
    1024              : 
    1025              :     #[test]
    1026              :     /// Test the PageserverUtilization's contribution to scheduling algorithm
    1027            1 :     fn scheduler_utilization() {
    1028            1 :         let mut nodes = test_utils::make_test_nodes(3, &[]);
    1029            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1030            1 : 
    1031            1 :         // Need to keep these alive because they contribute to shard counts via RAII
    1032            1 :         let mut scheduled_intents = Vec::new();
    1033            1 : 
    1034            1 :         let empty_context = ScheduleContext::default();
    1035              : 
    1036           11 :         fn assert_scheduler_chooses(
    1037           11 :             expect_node: NodeId,
    1038           11 :             scheduled_intents: &mut Vec<IntentState>,
    1039           11 :             scheduler: &mut Scheduler,
    1040           11 :             context: &ScheduleContext,
    1041           11 :         ) {
    1042           11 :             let scheduled = scheduler
    1043           11 :                 .schedule_shard::<AttachedShardTag>(&[], &None, context)
    1044           11 :                 .unwrap();
    1045           11 :             let mut intent = IntentState::new(None);
    1046           11 :             intent.set_attached(scheduler, Some(scheduled));
    1047           11 :             scheduled_intents.push(intent);
    1048           11 :             assert_eq!(scheduled, expect_node);
    1049           11 :         }
    1050              : 
    1051              :         // Independent schedule calls onto empty nodes should round-robin, because each node's
    1052              :         // utilization's shard count is updated inline.  The order is determinsitic because when all other factors are
    1053              :         // equal, we order by node ID.
    1054            1 :         assert_scheduler_chooses(
    1055            1 :             NodeId(1),
    1056            1 :             &mut scheduled_intents,
    1057            1 :             &mut scheduler,
    1058            1 :             &empty_context,
    1059            1 :         );
    1060            1 :         assert_scheduler_chooses(
    1061            1 :             NodeId(2),
    1062            1 :             &mut scheduled_intents,
    1063            1 :             &mut scheduler,
    1064            1 :             &empty_context,
    1065            1 :         );
    1066            1 :         assert_scheduler_chooses(
    1067            1 :             NodeId(3),
    1068            1 :             &mut scheduled_intents,
    1069            1 :             &mut scheduler,
    1070            1 :             &empty_context,
    1071            1 :         );
    1072            1 : 
    1073            1 :         // Manually setting utilization higher should cause schedule calls to round-robin the other nodes
    1074            1 :         // which have equal utilization.
    1075            1 :         nodes
    1076            1 :             .get_mut(&NodeId(1))
    1077            1 :             .unwrap()
    1078            1 :             .set_availability(NodeAvailability::Active(test_utilization::simple(
    1079            1 :                 10,
    1080            1 :                 1024 * 1024 * 1024,
    1081            1 :             )));
    1082            1 :         scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
    1083            1 : 
    1084            1 :         assert_scheduler_chooses(
    1085            1 :             NodeId(2),
    1086            1 :             &mut scheduled_intents,
    1087            1 :             &mut scheduler,
    1088            1 :             &empty_context,
    1089            1 :         );
    1090            1 :         assert_scheduler_chooses(
    1091            1 :             NodeId(3),
    1092            1 :             &mut scheduled_intents,
    1093            1 :             &mut scheduler,
    1094            1 :             &empty_context,
    1095            1 :         );
    1096            1 :         assert_scheduler_chooses(
    1097            1 :             NodeId(2),
    1098            1 :             &mut scheduled_intents,
    1099            1 :             &mut scheduler,
    1100            1 :             &empty_context,
    1101            1 :         );
    1102            1 :         assert_scheduler_chooses(
    1103            1 :             NodeId(3),
    1104            1 :             &mut scheduled_intents,
    1105            1 :             &mut scheduler,
    1106            1 :             &empty_context,
    1107            1 :         );
    1108            1 : 
    1109            1 :         // The scheduler should prefer nodes with lower affinity score,
    1110            1 :         // even if they have higher utilization (as long as they aren't utilized at >100%)
    1111            1 :         let mut context_prefer_node1 = ScheduleContext::default();
    1112            1 :         context_prefer_node1.avoid(&[NodeId(2), NodeId(3)]);
    1113            1 :         assert_scheduler_chooses(
    1114            1 :             NodeId(1),
    1115            1 :             &mut scheduled_intents,
    1116            1 :             &mut scheduler,
    1117            1 :             &context_prefer_node1,
    1118            1 :         );
    1119            1 :         assert_scheduler_chooses(
    1120            1 :             NodeId(1),
    1121            1 :             &mut scheduled_intents,
    1122            1 :             &mut scheduler,
    1123            1 :             &context_prefer_node1,
    1124            1 :         );
    1125            1 : 
    1126            1 :         // If a node is over-utilized, it will not be used even if affinity scores prefer it
    1127            1 :         nodes
    1128            1 :             .get_mut(&NodeId(1))
    1129            1 :             .unwrap()
    1130            1 :             .set_availability(NodeAvailability::Active(test_utilization::simple(
    1131            1 :                 20000,
    1132            1 :                 1024 * 1024 * 1024,
    1133            1 :             )));
    1134            1 :         scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
    1135            1 :         assert_scheduler_chooses(
    1136            1 :             NodeId(2),
    1137            1 :             &mut scheduled_intents,
    1138            1 :             &mut scheduler,
    1139            1 :             &context_prefer_node1,
    1140            1 :         );
    1141            1 :         assert_scheduler_chooses(
    1142            1 :             NodeId(3),
    1143            1 :             &mut scheduled_intents,
    1144            1 :             &mut scheduler,
    1145            1 :             &context_prefer_node1,
    1146            1 :         );
    1147              : 
    1148           12 :         for mut intent in scheduled_intents {
    1149           11 :             intent.clear(&mut scheduler);
    1150           11 :         }
    1151            1 :     }
    1152              : 
    1153              :     #[test]
    1154              :     /// A simple test that showcases AZ-aware scheduling and its interaction with
    1155              :     /// affinity scores.
    1156            1 :     fn az_scheduling() {
    1157            1 :         let az_a_tag = AvailabilityZone("az-a".to_string());
    1158            1 :         let az_b_tag = AvailabilityZone("az-b".to_string());
    1159            1 : 
    1160            1 :         let nodes = test_utils::make_test_nodes(3, &[az_a_tag.clone(), az_b_tag.clone()]);
    1161            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1162            1 : 
    1163            1 :         // Need to keep these alive because they contribute to shard counts via RAII
    1164            1 :         let mut scheduled_intents = Vec::new();
    1165            1 : 
    1166            1 :         let mut context = ScheduleContext::default();
    1167              : 
    1168            4 :         fn assert_scheduler_chooses<Tag: ShardTag>(
    1169            4 :             expect_node: NodeId,
    1170            4 :             preferred_az: Option<AvailabilityZone>,
    1171            4 :             scheduled_intents: &mut Vec<IntentState>,
    1172            4 :             scheduler: &mut Scheduler,
    1173            4 :             context: &mut ScheduleContext,
    1174            4 :         ) {
    1175            4 :             let scheduled = scheduler
    1176            4 :                 .schedule_shard::<Tag>(&[], &preferred_az, context)
    1177            4 :                 .unwrap();
    1178            4 :             let mut intent = IntentState::new(preferred_az.clone());
    1179            4 :             intent.set_attached(scheduler, Some(scheduled));
    1180            4 :             scheduled_intents.push(intent);
    1181            4 :             assert_eq!(scheduled, expect_node);
    1182              : 
    1183            4 :             context.avoid(&[scheduled]);
    1184            4 :         }
    1185              : 
    1186            1 :         assert_scheduler_chooses::<AttachedShardTag>(
    1187            1 :             NodeId(1),
    1188            1 :             Some(az_a_tag.clone()),
    1189            1 :             &mut scheduled_intents,
    1190            1 :             &mut scheduler,
    1191            1 :             &mut context,
    1192            1 :         );
    1193            1 : 
    1194            1 :         // Node 2 and 3 have affinity score equal to 0, but node 3
    1195            1 :         // is in "az-a" so we prefer that.
    1196            1 :         assert_scheduler_chooses::<AttachedShardTag>(
    1197            1 :             NodeId(3),
    1198            1 :             Some(az_a_tag.clone()),
    1199            1 :             &mut scheduled_intents,
    1200            1 :             &mut scheduler,
    1201            1 :             &mut context,
    1202            1 :         );
    1203            1 : 
    1204            1 :         // Node 1 and 3 (az-a) have same affinity score, so prefer the lowest node id.
    1205            1 :         assert_scheduler_chooses::<AttachedShardTag>(
    1206            1 :             NodeId(1),
    1207            1 :             Some(az_a_tag.clone()),
    1208            1 :             &mut scheduled_intents,
    1209            1 :             &mut scheduler,
    1210            1 :             &mut context,
    1211            1 :         );
    1212            1 : 
    1213            1 :         // Avoid nodes in "az-a" for the secondary location.
    1214            1 :         assert_scheduler_chooses::<SecondaryShardTag>(
    1215            1 :             NodeId(2),
    1216            1 :             Some(az_a_tag.clone()),
    1217            1 :             &mut scheduled_intents,
    1218            1 :             &mut scheduler,
    1219            1 :             &mut context,
    1220            1 :         );
    1221              : 
    1222            5 :         for mut intent in scheduled_intents {
    1223            4 :             intent.clear(&mut scheduler);
    1224            4 :         }
    1225            1 :     }
    1226              : 
    1227              :     #[test]
    1228            1 :     fn az_scheduling_for_new_tenant() {
    1229            1 :         let az_a_tag = AvailabilityZone("az-a".to_string());
    1230            1 :         let az_b_tag = AvailabilityZone("az-b".to_string());
    1231            1 :         let nodes = test_utils::make_test_nodes(
    1232            1 :             6,
    1233            1 :             &[
    1234            1 :                 az_a_tag.clone(),
    1235            1 :                 az_a_tag.clone(),
    1236            1 :                 az_a_tag.clone(),
    1237            1 :                 az_b_tag.clone(),
    1238            1 :                 az_b_tag.clone(),
    1239            1 :                 az_b_tag.clone(),
    1240            1 :             ],
    1241            1 :         );
    1242            1 : 
    1243            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1244              : 
    1245              :         /// Force the `home_shard_count` of a node directly: this is the metric used
    1246              :         /// by the scheduler when picking AZs.
    1247            3 :         fn set_shard_count(scheduler: &mut Scheduler, node_id: NodeId, shard_count: usize) {
    1248            3 :             let node = scheduler.nodes.get_mut(&node_id).unwrap();
    1249            3 :             node.home_shard_count = shard_count;
    1250            3 :         }
    1251              : 
    1252              :         // Initial empty state.  Scores are tied, scheduler prefers lower AZ ID.
    1253            1 :         assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone()));
    1254              : 
    1255              :         // Home shard count is higher in AZ A, so AZ B will be preferred
    1256            1 :         set_shard_count(&mut scheduler, NodeId(1), 10);
    1257            1 :         assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_b_tag.clone()));
    1258              : 
    1259              :         // Total home shard count is higher in AZ B, so we revert to preferring AZ A
    1260            1 :         set_shard_count(&mut scheduler, NodeId(4), 6);
    1261            1 :         set_shard_count(&mut scheduler, NodeId(5), 6);
    1262            1 :         assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone()));
    1263            1 :     }
    1264              : 
    1265              :     /// Test that when selecting AZs for many new tenants, we get the expected balance across nodes
    1266              :     #[test]
    1267            1 :     fn az_selection_many() {
    1268            1 :         let az_a_tag = AvailabilityZone("az-a".to_string());
    1269            1 :         let az_b_tag = AvailabilityZone("az-b".to_string());
    1270            1 :         let az_c_tag = AvailabilityZone("az-c".to_string());
    1271            1 :         let nodes = test_utils::make_test_nodes(
    1272            1 :             6,
    1273            1 :             &[
    1274            1 :                 az_a_tag.clone(),
    1275            1 :                 az_b_tag.clone(),
    1276            1 :                 az_c_tag.clone(),
    1277            1 :                 az_a_tag.clone(),
    1278            1 :                 az_b_tag.clone(),
    1279            1 :                 az_c_tag.clone(),
    1280            1 :             ],
    1281            1 :         );
    1282            1 : 
    1283            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1284            1 : 
    1285            1 :         // We should get 1/6th of these on each node, give or take a few...
    1286            1 :         let total_tenants = 300;
    1287            1 : 
    1288            1 :         // ...where the 'few' is the number of AZs, because the scheduling will sometimes overshoot
    1289            1 :         // on one AZ before correcting itself.  This is because we select the 'home' AZ based on
    1290            1 :         // an AZ-wide metric, but we select the location for secondaries on a purely node-based
    1291            1 :         // metric (while excluding the home AZ).
    1292            1 :         let grace = 3;
    1293            1 : 
    1294            1 :         let mut scheduled_shards = Vec::new();
    1295          300 :         for _i in 0..total_tenants {
    1296          300 :             let preferred_az = scheduler.get_az_for_new_tenant().unwrap();
    1297          300 : 
    1298          300 :             let mut node_home_counts = scheduler
    1299          300 :                 .nodes
    1300          300 :                 .iter()
    1301         1800 :                 .map(|(node_id, node)| (node_id, node.home_shard_count))
    1302          300 :                 .collect::<Vec<_>>();
    1303         7800 :             node_home_counts.sort_by_key(|i| i.0);
    1304          300 :             eprintln!("Selected {}, vs nodes {:?}", preferred_az, node_home_counts);
    1305          300 : 
    1306          300 :             let tenant_shard_id = TenantShardId {
    1307          300 :                 tenant_id: TenantId::generate(),
    1308          300 :                 shard_number: ShardNumber(0),
    1309          300 :                 shard_count: ShardCount(1),
    1310          300 :             };
    1311          300 : 
    1312          300 :             let shard_identity = ShardIdentity::new(
    1313          300 :                 tenant_shard_id.shard_number,
    1314          300 :                 tenant_shard_id.shard_count,
    1315          300 :                 pageserver_api::shard::ShardStripeSize(1),
    1316          300 :             )
    1317          300 :             .unwrap();
    1318          300 :             let mut shard = TenantShard::new(
    1319          300 :                 tenant_shard_id,
    1320          300 :                 shard_identity,
    1321          300 :                 pageserver_api::controller_api::PlacementPolicy::Attached(1),
    1322          300 :                 Some(preferred_az),
    1323          300 :             );
    1324          300 : 
    1325          300 :             let mut context = ScheduleContext::default();
    1326          300 :             shard.schedule(&mut scheduler, &mut context).unwrap();
    1327          300 :             eprintln!("Scheduled shard at {:?}", shard.intent);
    1328          300 : 
    1329          300 :             scheduled_shards.push(shard);
    1330          300 :         }
    1331              : 
    1332            7 :         for (node_id, node) in &scheduler.nodes {
    1333            6 :             eprintln!(
    1334            6 :                 "Node {}: {} {} {}",
    1335            6 :                 node_id, node.shard_count, node.attached_shard_count, node.home_shard_count
    1336            6 :             );
    1337            6 :         }
    1338              : 
    1339            6 :         for node in scheduler.nodes.values() {
    1340            6 :             assert!((node.home_shard_count as i64 - total_tenants as i64 / 6).abs() < grace);
    1341              :         }
    1342              : 
    1343          301 :         for mut shard in scheduled_shards {
    1344          300 :             shard.intent.clear(&mut scheduler);
    1345          300 :         }
    1346            1 :     }
    1347              : 
    1348              :     #[test]
    1349              :     /// Make sure that when we have an odd number of nodes and an even number of shards, we still
    1350              :     /// get scheduling stability.
    1351            1 :     fn odd_nodes_stability() {
    1352            1 :         let az_a = AvailabilityZone("az-a".to_string());
    1353            1 :         let az_b = AvailabilityZone("az-b".to_string());
    1354            1 : 
    1355            1 :         let nodes = test_utils::make_test_nodes(
    1356            1 :             10,
    1357            1 :             &[
    1358            1 :                 az_a.clone(),
    1359            1 :                 az_a.clone(),
    1360            1 :                 az_a.clone(),
    1361            1 :                 az_a.clone(),
    1362            1 :                 az_a.clone(),
    1363            1 :                 az_b.clone(),
    1364            1 :                 az_b.clone(),
    1365            1 :                 az_b.clone(),
    1366            1 :                 az_b.clone(),
    1367            1 :                 az_b.clone(),
    1368            1 :             ],
    1369            1 :         );
    1370            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1371            1 : 
    1372            1 :         // Need to keep these alive because they contribute to shard counts via RAII
    1373            1 :         let mut scheduled_shards = Vec::new();
    1374            1 : 
    1375            1 :         let mut context = ScheduleContext::default();
    1376              : 
    1377            8 :         fn schedule_shard(
    1378            8 :             tenant_shard_id: TenantShardId,
    1379            8 :             expect_attached: NodeId,
    1380            8 :             expect_secondary: NodeId,
    1381            8 :             scheduled_shards: &mut Vec<TenantShard>,
    1382            8 :             scheduler: &mut Scheduler,
    1383            8 :             preferred_az: Option<AvailabilityZone>,
    1384            8 :             context: &mut ScheduleContext,
    1385            8 :         ) {
    1386            8 :             let shard_identity = ShardIdentity::new(
    1387            8 :                 tenant_shard_id.shard_number,
    1388            8 :                 tenant_shard_id.shard_count,
    1389            8 :                 pageserver_api::shard::ShardStripeSize(1),
    1390            8 :             )
    1391            8 :             .unwrap();
    1392            8 :             let mut shard = TenantShard::new(
    1393            8 :                 tenant_shard_id,
    1394            8 :                 shard_identity,
    1395            8 :                 pageserver_api::controller_api::PlacementPolicy::Attached(1),
    1396            8 :                 preferred_az,
    1397            8 :             );
    1398            8 : 
    1399            8 :             shard.schedule(scheduler, context).unwrap();
    1400            8 : 
    1401            8 :             assert_eq!(shard.intent.get_attached().unwrap(), expect_attached);
    1402            8 :             assert_eq!(
    1403            8 :                 shard.intent.get_secondary().first().unwrap(),
    1404            8 :                 &expect_secondary
    1405            8 :             );
    1406              : 
    1407            8 :             scheduled_shards.push(shard);
    1408            8 :         }
    1409              : 
    1410            1 :         let tenant_id = TenantId::generate();
    1411            1 : 
    1412            1 :         schedule_shard(
    1413            1 :             TenantShardId {
    1414            1 :                 tenant_id,
    1415            1 :                 shard_number: ShardNumber(0),
    1416            1 :                 shard_count: ShardCount(8),
    1417            1 :             },
    1418            1 :             NodeId(1),
    1419            1 :             NodeId(6),
    1420            1 :             &mut scheduled_shards,
    1421            1 :             &mut scheduler,
    1422            1 :             Some(az_a.clone()),
    1423            1 :             &mut context,
    1424            1 :         );
    1425            1 : 
    1426            1 :         schedule_shard(
    1427            1 :             TenantShardId {
    1428            1 :                 tenant_id,
    1429            1 :                 shard_number: ShardNumber(1),
    1430            1 :                 shard_count: ShardCount(8),
    1431            1 :             },
    1432            1 :             NodeId(2),
    1433            1 :             NodeId(7),
    1434            1 :             &mut scheduled_shards,
    1435            1 :             &mut scheduler,
    1436            1 :             Some(az_a.clone()),
    1437            1 :             &mut context,
    1438            1 :         );
    1439            1 : 
    1440            1 :         schedule_shard(
    1441            1 :             TenantShardId {
    1442            1 :                 tenant_id,
    1443            1 :                 shard_number: ShardNumber(2),
    1444            1 :                 shard_count: ShardCount(8),
    1445            1 :             },
    1446            1 :             NodeId(3),
    1447            1 :             NodeId(8),
    1448            1 :             &mut scheduled_shards,
    1449            1 :             &mut scheduler,
    1450            1 :             Some(az_a.clone()),
    1451            1 :             &mut context,
    1452            1 :         );
    1453            1 : 
    1454            1 :         schedule_shard(
    1455            1 :             TenantShardId {
    1456            1 :                 tenant_id,
    1457            1 :                 shard_number: ShardNumber(3),
    1458            1 :                 shard_count: ShardCount(8),
    1459            1 :             },
    1460            1 :             NodeId(4),
    1461            1 :             NodeId(9),
    1462            1 :             &mut scheduled_shards,
    1463            1 :             &mut scheduler,
    1464            1 :             Some(az_a.clone()),
    1465            1 :             &mut context,
    1466            1 :         );
    1467            1 : 
    1468            1 :         schedule_shard(
    1469            1 :             TenantShardId {
    1470            1 :                 tenant_id,
    1471            1 :                 shard_number: ShardNumber(4),
    1472            1 :                 shard_count: ShardCount(8),
    1473            1 :             },
    1474            1 :             NodeId(5),
    1475            1 :             NodeId(10),
    1476            1 :             &mut scheduled_shards,
    1477            1 :             &mut scheduler,
    1478            1 :             Some(az_a.clone()),
    1479            1 :             &mut context,
    1480            1 :         );
    1481            1 : 
    1482            1 :         schedule_shard(
    1483            1 :             TenantShardId {
    1484            1 :                 tenant_id,
    1485            1 :                 shard_number: ShardNumber(5),
    1486            1 :                 shard_count: ShardCount(8),
    1487            1 :             },
    1488            1 :             NodeId(1),
    1489            1 :             NodeId(6),
    1490            1 :             &mut scheduled_shards,
    1491            1 :             &mut scheduler,
    1492            1 :             Some(az_a.clone()),
    1493            1 :             &mut context,
    1494            1 :         );
    1495            1 : 
    1496            1 :         schedule_shard(
    1497            1 :             TenantShardId {
    1498            1 :                 tenant_id,
    1499            1 :                 shard_number: ShardNumber(6),
    1500            1 :                 shard_count: ShardCount(8),
    1501            1 :             },
    1502            1 :             NodeId(2),
    1503            1 :             NodeId(7),
    1504            1 :             &mut scheduled_shards,
    1505            1 :             &mut scheduler,
    1506            1 :             Some(az_a.clone()),
    1507            1 :             &mut context,
    1508            1 :         );
    1509            1 : 
    1510            1 :         schedule_shard(
    1511            1 :             TenantShardId {
    1512            1 :                 tenant_id,
    1513            1 :                 shard_number: ShardNumber(7),
    1514            1 :                 shard_count: ShardCount(8),
    1515            1 :             },
    1516            1 :             NodeId(3),
    1517            1 :             NodeId(8),
    1518            1 :             &mut scheduled_shards,
    1519            1 :             &mut scheduler,
    1520            1 :             Some(az_a.clone()),
    1521            1 :             &mut context,
    1522            1 :         );
    1523              : 
    1524              :         // Assert that the optimizer suggests nochanges, i.e. our initial scheduling was stable.
    1525            9 :         for shard in &scheduled_shards {
    1526            8 :             assert_eq!(shard.optimize_attachment(&mut scheduler, &context), None);
    1527              :         }
    1528              : 
    1529            9 :         for mut shard in scheduled_shards {
    1530            8 :             shard.intent.clear(&mut scheduler);
    1531            8 :         }
    1532            1 :     }
    1533              : }
        

Generated by: LCOV version 2.1-beta