LCOV - code coverage report
Current view: top level - storage_controller/src - scheduler.rs (source / functions) Coverage Total Hit
Test: 4f58e98c51285c7fa348e0b410c88a10caf68ad2.info Lines: 89.3 % 736 657
Test Date: 2025-01-07 20:58:07 Functions: 79.5 % 73 58

            Line data    Source code
       1              : use crate::{node::Node, tenant_shard::TenantShard};
       2              : use itertools::Itertools;
       3              : use pageserver_api::{controller_api::AvailabilityZone, models::PageserverUtilization};
       4              : use serde::Serialize;
       5              : use std::{collections::HashMap, fmt::Debug};
       6              : use utils::{http::error::ApiError, id::NodeId};
       7              : 
       8              : /// Scenarios in which we cannot find a suitable location for a tenant shard
       9              : #[derive(thiserror::Error, Debug)]
      10              : pub enum ScheduleError {
      11              :     #[error("No pageservers found")]
      12              :     NoPageservers,
      13              :     #[error("No pageserver found matching constraint")]
      14              :     ImpossibleConstraint,
      15              : }
      16              : 
      17              : impl From<ScheduleError> for ApiError {
      18            0 :     fn from(value: ScheduleError) -> Self {
      19            0 :         ApiError::Conflict(format!("Scheduling error: {}", value))
      20            0 :     }
      21              : }
      22              : 
      23              : #[derive(Serialize)]
      24              : pub enum MaySchedule {
      25              :     Yes(PageserverUtilization),
      26              :     No,
      27              : }
      28              : 
      29              : #[derive(Serialize)]
      30              : pub(crate) struct SchedulerNode {
      31              :     /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
      32              :     shard_count: usize,
      33              :     /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
      34              :     attached_shard_count: usize,
      35              :     /// Availability zone id in which the node resides
      36              :     az: AvailabilityZone,
      37              : 
      38              :     /// Whether this node is currently elegible to have new shards scheduled (this is derived
      39              :     /// from a node's availability state and scheduling policy).
      40              :     may_schedule: MaySchedule,
      41              : }
      42              : 
      43              : pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized {
      44              :     fn generate(
      45              :         node_id: &NodeId,
      46              :         node: &mut SchedulerNode,
      47              :         preferred_az: &Option<AvailabilityZone>,
      48              :         context: &ScheduleContext,
      49              :     ) -> Option<Self>;
      50              :     fn is_overloaded(&self) -> bool;
      51              :     fn node_id(&self) -> NodeId;
      52              : }
      53              : 
      54              : pub(crate) trait ShardTag {
      55              :     type Score: NodeSchedulingScore;
      56              : }
      57              : 
      58              : pub(crate) struct AttachedShardTag {}
      59              : impl ShardTag for AttachedShardTag {
      60              :     type Score = NodeAttachmentSchedulingScore;
      61              : }
      62              : 
      63              : pub(crate) struct SecondaryShardTag {}
      64              : impl ShardTag for SecondaryShardTag {
      65              :     type Score = NodeSecondarySchedulingScore;
      66              : }
      67              : 
      68              : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
      69              : enum AzMatch {
      70              :     Yes,
      71              :     No,
      72              :     Unknown,
      73              : }
      74              : 
      75              : impl AzMatch {
      76        87656 :     fn new(node_az: &AvailabilityZone, shard_preferred_az: Option<&AvailabilityZone>) -> Self {
      77        87518 :         match shard_preferred_az {
      78        87518 :             Some(preferred_az) if preferred_az == node_az => Self::Yes,
      79        50008 :             Some(_preferred_az) => Self::No,
      80          138 :             None => Self::Unknown,
      81              :         }
      82        87656 :     }
      83              : }
      84              : 
      85              : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
      86              : struct AttachmentAzMatch(AzMatch);
      87              : 
      88              : impl Ord for AttachmentAzMatch {
      89        46817 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
      90        46817 :         // Lower scores indicate a more suitable node.
      91        46817 :         // Note that we prefer a node for which we don't have
      92        46817 :         // info to a node which we are certain doesn't match the
      93        46817 :         // preferred AZ of the shard.
      94        93634 :         let az_match_score = |az_match: &AzMatch| match az_match {
      95        46770 :             AzMatch::Yes => 0,
      96           98 :             AzMatch::Unknown => 1,
      97        46766 :             AzMatch::No => 2,
      98        93634 :         };
      99              : 
     100        46817 :         az_match_score(&self.0).cmp(&az_match_score(&other.0))
     101        46817 :     }
     102              : }
     103              : 
     104              : impl PartialOrd for AttachmentAzMatch {
     105        46817 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
     106        46817 :         Some(self.cmp(other))
     107        46817 :     }
     108              : }
     109              : 
     110              : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
     111              : struct SecondaryAzMatch(AzMatch);
     112              : 
     113              : impl Ord for SecondaryAzMatch {
     114        33355 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
     115        33355 :         // Lower scores indicate a more suitable node.
     116        33355 :         // For secondary locations we wish to avoid the preferred AZ
     117        33355 :         // of the shard.
     118        66710 :         let az_match_score = |az_match: &AzMatch| match az_match {
     119        45836 :             AzMatch::No => 0,
     120           40 :             AzMatch::Unknown => 1,
     121        20834 :             AzMatch::Yes => 2,
     122        66710 :         };
     123              : 
     124        33355 :         az_match_score(&self.0).cmp(&az_match_score(&other.0))
     125        33355 :     }
     126              : }
     127              : 
     128              : impl PartialOrd for SecondaryAzMatch {
     129        33355 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
     130        33355 :         Some(self.cmp(other))
     131        33355 :     }
     132              : }
     133              : 
     134              : /// Scheduling score of a given node for shard attachments.
     135              : /// Lower scores indicate more suitable nodes.
     136              : /// Ordering is given by member declaration order (top to bottom).
     137              : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
     138              : pub(crate) struct NodeAttachmentSchedulingScore {
     139              :     /// The number of shards belonging to the tenant currently being
     140              :     /// scheduled that are attached to this node.
     141              :     affinity_score: AffinityScore,
     142              :     /// Flag indicating whether this node matches the preferred AZ
     143              :     /// of the shard. For equal affinity scores, nodes in the matching AZ
     144              :     /// are considered first.
     145              :     az_match: AttachmentAzMatch,
     146              :     /// Size of [`ScheduleContext::attached_nodes`] for the current node.
     147              :     /// This normally tracks the number of attached shards belonging to the
     148              :     /// tenant being scheduled that are already on this node.
     149              :     attached_shards_in_context: usize,
     150              :     /// Utilisation score that combines shard count and disk utilisation
     151              :     utilization_score: u64,
     152              :     /// Total number of shards attached to this node. When nodes have identical utilisation, this
     153              :     /// acts as an anti-affinity between attached shards.
     154              :     total_attached_shard_count: usize,
     155              :     /// Convenience to make selection deterministic in tests and empty systems
     156              :     node_id: NodeId,
     157              : }
     158              : 
     159              : impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
     160        50095 :     fn generate(
     161        50095 :         node_id: &NodeId,
     162        50095 :         node: &mut SchedulerNode,
     163        50095 :         preferred_az: &Option<AvailabilityZone>,
     164        50095 :         context: &ScheduleContext,
     165        50095 :     ) -> Option<Self> {
     166        50095 :         let utilization = match &mut node.may_schedule {
     167        50095 :             MaySchedule::Yes(u) => u,
     168              :             MaySchedule::No => {
     169            0 :                 return None;
     170              :             }
     171              :         };
     172              : 
     173        50095 :         Some(Self {
     174        50095 :             affinity_score: context
     175        50095 :                 .nodes
     176        50095 :                 .get(node_id)
     177        50095 :                 .copied()
     178        50095 :                 .unwrap_or(AffinityScore::FREE),
     179        50095 :             az_match: AttachmentAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
     180        50095 :             attached_shards_in_context: context.attached_nodes.get(node_id).copied().unwrap_or(0),
     181        50095 :             utilization_score: utilization.cached_score(),
     182        50095 :             total_attached_shard_count: node.attached_shard_count,
     183        50095 :             node_id: *node_id,
     184        50095 :         })
     185        50095 :     }
     186              : 
     187        50095 :     fn is_overloaded(&self) -> bool {
     188        50095 :         PageserverUtilization::is_overloaded(self.utilization_score)
     189        50095 :     }
     190              : 
     191        12537 :     fn node_id(&self) -> NodeId {
     192        12537 :         self.node_id
     193        12537 :     }
     194              : }
     195              : 
     196              : /// Scheduling score of a given node for shard secondaries.
     197              : /// Lower scores indicate more suitable nodes.
     198              : /// Ordering is given by member declaration order (top to bottom).
     199              : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
     200              : pub(crate) struct NodeSecondarySchedulingScore {
     201              :     /// Flag indicating whether this node matches the preferred AZ
     202              :     /// of the shard. For secondary locations we wish to avoid nodes in.
     203              :     /// the preferred AZ of the shard, since that's where the attached location
     204              :     /// should be scheduled and having the secondary in the same AZ is bad for HA.
     205              :     az_match: SecondaryAzMatch,
     206              :     /// The number of shards belonging to the tenant currently being
     207              :     /// scheduled that are attached to this node.
     208              :     affinity_score: AffinityScore,
     209              :     /// Utilisation score that combines shard count and disk utilisation
     210              :     utilization_score: u64,
     211              :     /// Total number of shards attached to this node. When nodes have identical utilisation, this
     212              :     /// acts as an anti-affinity between attached shards.
     213              :     total_attached_shard_count: usize,
     214              :     /// Convenience to make selection deterministic in tests and empty systems
     215              :     node_id: NodeId,
     216              : }
     217              : 
     218              : impl NodeSchedulingScore for NodeSecondarySchedulingScore {
     219        37561 :     fn generate(
     220        37561 :         node_id: &NodeId,
     221        37561 :         node: &mut SchedulerNode,
     222        37561 :         preferred_az: &Option<AvailabilityZone>,
     223        37561 :         context: &ScheduleContext,
     224        37561 :     ) -> Option<Self> {
     225        37561 :         let utilization = match &mut node.may_schedule {
     226        37561 :             MaySchedule::Yes(u) => u,
     227              :             MaySchedule::No => {
     228            0 :                 return None;
     229              :             }
     230              :         };
     231              : 
     232        37561 :         Some(Self {
     233        37561 :             az_match: SecondaryAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
     234        37561 :             affinity_score: context
     235        37561 :                 .nodes
     236        37561 :                 .get(node_id)
     237        37561 :                 .copied()
     238        37561 :                 .unwrap_or(AffinityScore::FREE),
     239        37561 :             utilization_score: utilization.cached_score(),
     240        37561 :             total_attached_shard_count: node.attached_shard_count,
     241        37561 :             node_id: *node_id,
     242        37561 :         })
     243        37561 :     }
     244              : 
     245        37561 :     fn is_overloaded(&self) -> bool {
     246        37561 :         PageserverUtilization::is_overloaded(self.utilization_score)
     247        37561 :     }
     248              : 
     249        12535 :     fn node_id(&self) -> NodeId {
     250        12535 :         self.node_id
     251        12535 :     }
     252              : }
     253              : 
     254              : impl PartialEq for SchedulerNode {
     255            3 :     fn eq(&self, other: &Self) -> bool {
     256            3 :         let may_schedule_matches = matches!(
     257            3 :             (&self.may_schedule, &other.may_schedule),
     258              :             (MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
     259              :         );
     260              : 
     261            3 :         may_schedule_matches
     262            3 :             && self.shard_count == other.shard_count
     263            3 :             && self.attached_shard_count == other.attached_shard_count
     264            3 :             && self.az == other.az
     265            3 :     }
     266              : }
     267              : 
     268              : impl Eq for SchedulerNode {}
     269              : 
     270              : /// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
     271              : /// on which to run.
     272              : ///
     273              : /// The type has no persistent state of its own: this is all populated at startup.  The Serialize
     274              : /// impl is only for debug dumps.
     275              : #[derive(Serialize)]
     276              : pub(crate) struct Scheduler {
     277              :     nodes: HashMap<NodeId, SchedulerNode>,
     278              : }
     279              : 
     280              : /// Score for soft constraint scheduling: lower scores are preferred to higher scores.
     281              : ///
     282              : /// For example, we may set an affinity score based on the number of shards from the same
     283              : /// tenant already on a node, to implicitly prefer to balance out shards.
     284              : #[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
     285              : pub(crate) struct AffinityScore(pub(crate) usize);
     286              : 
     287              : impl AffinityScore {
     288              :     /// If we have no anti-affinity at all toward a node, this is its score.  It means
     289              :     /// the scheduler has a free choice amongst nodes with this score, and may pick a node
     290              :     /// based on other information such as total utilization.
     291              :     pub(crate) const FREE: Self = Self(0);
     292              : 
     293        25142 :     pub(crate) fn inc(&mut self) {
     294        25142 :         self.0 += 1;
     295        25142 :     }
     296              : }
     297              : 
     298              : impl std::ops::Add for AffinityScore {
     299              :     type Output = Self;
     300              : 
     301           24 :     fn add(self, rhs: Self) -> Self::Output {
     302           24 :         Self(self.0 + rhs.0)
     303           24 :     }
     304              : }
     305              : 
     306              : /// Hint for whether this is a sincere attempt to schedule, or a speculative
     307              : /// check for where we _would_ schedule (done during optimization)
     308              : #[derive(Debug, Clone)]
     309              : pub(crate) enum ScheduleMode {
     310              :     Normal,
     311              :     Speculative,
     312              : }
     313              : 
     314              : impl Default for ScheduleMode {
     315         5022 :     fn default() -> Self {
     316         5022 :         Self::Normal
     317         5022 :     }
     318              : }
     319              : 
     320              : // For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
     321              : // it for many shards in the same tenant.
     322              : #[derive(Debug, Default, Clone)]
     323              : pub(crate) struct ScheduleContext {
     324              :     /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
     325              :     pub(crate) nodes: HashMap<NodeId, AffinityScore>,
     326              : 
     327              :     /// Specifically how many _attached_ locations are on each node
     328              :     pub(crate) attached_nodes: HashMap<NodeId, usize>,
     329              : 
     330              :     pub(crate) mode: ScheduleMode,
     331              : }
     332              : 
     333              : impl ScheduleContext {
     334            3 :     pub(crate) fn new(mode: ScheduleMode) -> Self {
     335            3 :         Self {
     336            3 :             nodes: HashMap::new(),
     337            3 :             attached_nodes: HashMap::new(),
     338            3 :             mode,
     339            3 :         }
     340            3 :     }
     341              : 
     342              :     /// Input is a list of nodes we would like to avoid using again within this context.  The more
     343              :     /// times a node is passed into this call, the less inclined we are to use it.
     344        12575 :     pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
     345        37717 :         for node_id in nodes {
     346        25142 :             let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
     347        25142 :             entry.inc()
     348              :         }
     349        12575 :     }
     350              : 
     351        12567 :     pub(crate) fn push_attached(&mut self, node_id: NodeId) {
     352        12567 :         let entry = self.attached_nodes.entry(node_id).or_default();
     353        12567 :         *entry += 1;
     354        12567 :     }
     355              : 
     356           69 :     pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
     357           69 :         self.nodes
     358           69 :             .get(&node_id)
     359           69 :             .copied()
     360           69 :             .unwrap_or(AffinityScore::FREE)
     361           69 :     }
     362              : 
     363           69 :     pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
     364           69 :         self.attached_nodes.get(&node_id).copied().unwrap_or(0)
     365           69 :     }
     366              : 
     367              :     #[cfg(test)]
     368            3 :     pub(crate) fn attach_count(&self) -> usize {
     369            3 :         self.attached_nodes.values().sum()
     370            3 :     }
     371              : }
     372              : 
     373              : pub(crate) enum RefCountUpdate {
     374              :     PromoteSecondary,
     375              :     Attach,
     376              :     Detach,
     377              :     DemoteAttached,
     378              :     AddSecondary,
     379              :     RemoveSecondary,
     380              : }
     381              : 
     382              : impl Scheduler {
     383           62 :     pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
     384           62 :         let mut scheduler_nodes = HashMap::new();
     385           95 :         for node in nodes {
     386           33 :             scheduler_nodes.insert(
     387           33 :                 node.get_id(),
     388           33 :                 SchedulerNode {
     389           33 :                     shard_count: 0,
     390           33 :                     attached_shard_count: 0,
     391           33 :                     may_schedule: node.may_schedule(),
     392           33 :                     az: node.get_availability_zone_id().clone(),
     393           33 :                 },
     394           33 :             );
     395           33 :         }
     396              : 
     397           62 :         Self {
     398           62 :             nodes: scheduler_nodes,
     399           62 :         }
     400           62 :     }
     401              : 
     402              :     /// For debug/support: check that our internal statistics are in sync with the state of
     403              :     /// the nodes & tenant shards.
     404              :     ///
     405              :     /// If anything is inconsistent, log details and return an error.
     406            1 :     pub(crate) fn consistency_check<'a>(
     407            1 :         &self,
     408            1 :         nodes: impl Iterator<Item = &'a Node>,
     409            1 :         shards: impl Iterator<Item = &'a TenantShard>,
     410            1 :     ) -> anyhow::Result<()> {
     411            1 :         let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
     412            4 :         for node in nodes {
     413            3 :             expect_nodes.insert(
     414            3 :                 node.get_id(),
     415            3 :                 SchedulerNode {
     416            3 :                     shard_count: 0,
     417            3 :                     attached_shard_count: 0,
     418            3 :                     may_schedule: node.may_schedule(),
     419            3 :                     az: node.get_availability_zone_id().clone(),
     420            3 :                 },
     421            3 :             );
     422            3 :         }
     423              : 
     424            2 :         for shard in shards {
     425            1 :             if let Some(node_id) = shard.intent.get_attached() {
     426            1 :                 match expect_nodes.get_mut(node_id) {
     427            1 :                     Some(node) => {
     428            1 :                         node.shard_count += 1;
     429            1 :                         node.attached_shard_count += 1;
     430            1 :                     }
     431            0 :                     None => anyhow::bail!(
     432            0 :                         "Tenant {} references nonexistent node {}",
     433            0 :                         shard.tenant_shard_id,
     434            0 :                         node_id
     435            0 :                     ),
     436              :                 }
     437            0 :             }
     438              : 
     439            1 :             for node_id in shard.intent.get_secondary() {
     440            1 :                 match expect_nodes.get_mut(node_id) {
     441            1 :                     Some(node) => node.shard_count += 1,
     442            0 :                     None => anyhow::bail!(
     443            0 :                         "Tenant {} references nonexistent node {}",
     444            0 :                         shard.tenant_shard_id,
     445            0 :                         node_id
     446            0 :                     ),
     447              :                 }
     448              :             }
     449              :         }
     450              : 
     451            4 :         for (node_id, expect_node) in &expect_nodes {
     452            3 :             let Some(self_node) = self.nodes.get(node_id) else {
     453            0 :                 anyhow::bail!("Node {node_id} not found in Self")
     454              :             };
     455              : 
     456            3 :             if self_node != expect_node {
     457            0 :                 tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
     458            0 :                 tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
     459            0 :                 tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
     460              : 
     461            0 :                 anyhow::bail!("Inconsistent state on {node_id}");
     462            3 :             }
     463              :         }
     464              : 
     465            1 :         if expect_nodes.len() != self.nodes.len() {
     466              :             // We just checked that all the expected nodes are present.  If the lengths don't match,
     467              :             // it means that we have nodes in Self that are unexpected.
     468            0 :             for node_id in self.nodes.keys() {
     469            0 :                 if !expect_nodes.contains_key(node_id) {
     470            0 :                     anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
     471            0 :                 }
     472              :             }
     473            1 :         }
     474              : 
     475            1 :         Ok(())
     476            1 :     }
     477              : 
     478              :     /// Update the reference counts of a node. These reference counts are used to guide scheduling
     479              :     /// decisions, not for memory management: they represent the number of tenant shard whose IntentState
     480              :     /// targets this node and the number of tenants shars whose IntentState is attached to this
     481              :     /// node.
     482              :     ///
     483              :     /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
     484              :     /// [`Self::new`] or [`Self::node_upsert`])
     485        50159 :     pub(crate) fn update_node_ref_counts(&mut self, node_id: NodeId, update: RefCountUpdate) {
     486        50159 :         let Some(node) = self.nodes.get_mut(&node_id) else {
     487            0 :             debug_assert!(false);
     488            0 :             tracing::error!("Scheduler missing node {node_id}");
     489            0 :             return;
     490              :         };
     491              : 
     492        50159 :         match update {
     493            5 :             RefCountUpdate::PromoteSecondary => {
     494            5 :                 node.attached_shard_count += 1;
     495            5 :             }
     496        12544 :             RefCountUpdate::Attach => {
     497        12544 :                 node.shard_count += 1;
     498        12544 :                 node.attached_shard_count += 1;
     499        12544 :             }
     500        12543 :             RefCountUpdate::Detach => {
     501        12543 :                 node.shard_count -= 1;
     502        12543 :                 node.attached_shard_count -= 1;
     503        12543 :             }
     504            5 :             RefCountUpdate::DemoteAttached => {
     505            5 :                 node.attached_shard_count -= 1;
     506            5 :             }
     507        12531 :             RefCountUpdate::AddSecondary => {
     508        12531 :                 node.shard_count += 1;
     509        12531 :             }
     510        12531 :             RefCountUpdate::RemoveSecondary => {
     511        12531 :                 node.shard_count -= 1;
     512        12531 :             }
     513              :         }
     514              : 
     515              :         // Maybe update PageserverUtilization
     516        50159 :         match update {
     517              :             RefCountUpdate::AddSecondary | RefCountUpdate::Attach => {
     518              :                 // Referencing the node: if this takes our shard_count above the utilzation structure's
     519              :                 // shard count, then artifically bump it: this ensures that the scheduler immediately
     520              :                 // recognizes that this node has more work on it, without waiting for the next heartbeat
     521              :                 // to update the utilization.
     522        25075 :                 if let MaySchedule::Yes(utilization) = &mut node.may_schedule {
     523        25075 :                     utilization.adjust_shard_count_max(node.shard_count as u32);
     524        25075 :                 }
     525              :             }
     526              :             RefCountUpdate::PromoteSecondary
     527              :             | RefCountUpdate::Detach
     528              :             | RefCountUpdate::RemoveSecondary
     529        25084 :             | RefCountUpdate::DemoteAttached => {
     530        25084 :                 // De-referencing the node: leave the utilization's shard_count at a stale higher
     531        25084 :                 // value until some future heartbeat after we have physically removed this shard
     532        25084 :                 // from the node: this prevents the scheduler over-optimistically trying to schedule
     533        25084 :                 // more work onto the node before earlier detaches are done.
     534        25084 :             }
     535              :         }
     536        50159 :     }
     537              : 
     538              :     // Check if the number of shards attached to a given node is lagging below
     539              :     // the cluster average. If that's the case, the node should be filled.
     540            0 :     pub(crate) fn compute_fill_requirement(&self, node_id: NodeId) -> usize {
     541            0 :         let Some(node) = self.nodes.get(&node_id) else {
     542            0 :             debug_assert!(false);
     543            0 :             tracing::error!("Scheduler missing node {node_id}");
     544            0 :             return 0;
     545              :         };
     546            0 :         assert!(!self.nodes.is_empty());
     547            0 :         let expected_attached_shards_per_node = self.expected_attached_shard_count();
     548              : 
     549            0 :         for (node_id, node) in self.nodes.iter() {
     550            0 :             tracing::trace!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
     551              :         }
     552              : 
     553            0 :         if node.attached_shard_count < expected_attached_shards_per_node {
     554            0 :             expected_attached_shards_per_node - node.attached_shard_count
     555              :         } else {
     556            0 :             0
     557              :         }
     558            0 :     }
     559              : 
     560            0 :     pub(crate) fn expected_attached_shard_count(&self) -> usize {
     561            0 :         let total_attached_shards: usize =
     562            0 :             self.nodes.values().map(|n| n.attached_shard_count).sum();
     563            0 : 
     564            0 :         assert!(!self.nodes.is_empty());
     565            0 :         total_attached_shards / self.nodes.len()
     566            0 :     }
     567              : 
     568            0 :     pub(crate) fn nodes_by_attached_shard_count(&self) -> Vec<(NodeId, usize)> {
     569            0 :         self.nodes
     570            0 :             .iter()
     571            0 :             .map(|(node_id, stats)| (*node_id, stats.attached_shard_count))
     572            0 :             .sorted_by(|lhs, rhs| Ord::cmp(&lhs.1, &rhs.1).reverse())
     573            0 :             .collect()
     574            0 :     }
     575              : 
     576          211 :     pub(crate) fn node_upsert(&mut self, node: &Node) {
     577              :         use std::collections::hash_map::Entry::*;
     578          211 :         match self.nodes.entry(node.get_id()) {
     579            5 :             Occupied(mut entry) => {
     580            5 :                 // Updates to MaySchedule are how we receive updated PageserverUtilization: adjust these values
     581            5 :                 // to account for any shards scheduled on the controller but not yet visible to the pageserver.
     582            5 :                 let mut may_schedule = node.may_schedule();
     583            5 :                 match &mut may_schedule {
     584            4 :                     MaySchedule::Yes(utilization) => {
     585            4 :                         utilization.adjust_shard_count_max(entry.get().shard_count as u32);
     586            4 :                     }
     587            1 :                     MaySchedule::No => { // Nothing to tweak
     588            1 :                     }
     589              :                 }
     590              : 
     591            5 :                 entry.get_mut().may_schedule = may_schedule;
     592              :             }
     593          206 :             Vacant(entry) => {
     594          206 :                 entry.insert(SchedulerNode {
     595          206 :                     shard_count: 0,
     596          206 :                     attached_shard_count: 0,
     597          206 :                     may_schedule: node.may_schedule(),
     598          206 :                     az: node.get_availability_zone_id().clone(),
     599          206 :                 });
     600          206 :             }
     601              :         }
     602          211 :     }
     603              : 
     604            0 :     pub(crate) fn node_remove(&mut self, node_id: NodeId) {
     605            0 :         if self.nodes.remove(&node_id).is_none() {
     606            0 :             tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
     607            0 :         }
     608            0 :     }
     609              : 
     610              :     /// Where we have several nodes to choose from, for example when picking a secondary location
     611              :     /// to promote to an attached location, this method may be used to pick the best choice based
     612              :     /// on the scheduler's knowledge of utilization and availability.
     613              :     ///
     614              :     /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
     615              :     /// caller can pick a node some other way.
     616        25042 :     pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option<NodeId> {
     617        25042 :         if nodes.is_empty() {
     618        25040 :             return None;
     619            2 :         }
     620            2 : 
     621            2 :         // TODO: When the utilization score returned by the pageserver becomes meaningful,
     622            2 :         // schedule based on that instead of the shard count.
     623            2 :         let node = nodes
     624            2 :             .iter()
     625            4 :             .map(|node_id| {
     626            4 :                 let may_schedule = self
     627            4 :                     .nodes
     628            4 :                     .get(node_id)
     629            4 :                     .map(|n| !matches!(n.may_schedule, MaySchedule::No))
     630            4 :                     .unwrap_or(false);
     631            4 :                 (*node_id, may_schedule)
     632            4 :             })
     633            4 :             .max_by_key(|(_n, may_schedule)| *may_schedule);
     634            2 : 
     635            2 :         // If even the preferred node has may_schedule==false, return None
     636            2 :         node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
     637        25042 :     }
     638              : 
     639              :     /// Compute a schedulling score for each node that the scheduler knows of
     640              :     /// minus a set of hard excluded nodes.
     641        25080 :     fn compute_node_scores<Score>(
     642        25080 :         &mut self,
     643        25080 :         hard_exclude: &[NodeId],
     644        25080 :         preferred_az: &Option<AvailabilityZone>,
     645        25080 :         context: &ScheduleContext,
     646        25080 :     ) -> Vec<Score>
     647        25080 :     where
     648        25080 :         Score: NodeSchedulingScore,
     649        25080 :     {
     650        25080 :         self.nodes
     651        25080 :             .iter_mut()
     652       100217 :             .filter_map(|(k, v)| {
     653       100217 :                 if hard_exclude.contains(k) {
     654        12561 :                     None
     655              :                 } else {
     656        87656 :                     Score::generate(k, v, preferred_az, context)
     657              :                 }
     658       100217 :             })
     659        25080 :             .collect()
     660        25080 :     }
     661              : 
     662              :     /// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
     663              :     /// are already in use by this shard -- we use this to avoid picking the same node
     664              :     /// as both attached and secondary location.  This is a hard constraint: if we cannot
     665              :     /// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
     666              :     ///
     667              :     /// context: we prefer to avoid using nodes identified in the context, according
     668              :     /// to their anti-affinity score.  We use this to prefeer to avoid placing shards in
     669              :     /// the same tenant on the same node.  This is a soft constraint: the context will never
     670              :     /// cause us to fail to schedule a shard.
     671        25080 :     pub(crate) fn schedule_shard<Tag: ShardTag>(
     672        25080 :         &mut self,
     673        25080 :         hard_exclude: &[NodeId],
     674        25080 :         preferred_az: &Option<AvailabilityZone>,
     675        25080 :         context: &ScheduleContext,
     676        25080 :     ) -> Result<NodeId, ScheduleError> {
     677        25080 :         if self.nodes.is_empty() {
     678            0 :             return Err(ScheduleError::NoPageservers);
     679        25080 :         }
     680        25080 : 
     681        25080 :         let mut scores =
     682        25080 :             self.compute_node_scores::<Tag::Score>(hard_exclude, preferred_az, context);
     683        25080 : 
     684        25080 :         // Exclude nodes whose utilization is critically high, if there are alternatives available.  This will
     685        25080 :         // cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example
     686        25080 :         // we may place shards in the same tenant together on the same pageserver if all other pageservers are
     687        25080 :         // overloaded.
     688        25080 :         let non_overloaded_scores = scores
     689        25080 :             .iter()
     690        87656 :             .filter(|i| !i.is_overloaded())
     691        25080 :             .copied()
     692        25080 :             .collect::<Vec<_>>();
     693        25080 :         if !non_overloaded_scores.is_empty() {
     694        25072 :             scores = non_overloaded_scores;
     695        25072 :         }
     696              : 
     697              :         // Sort the nodes by score. The one with the lowest scores will be the preferred node.
     698              :         // Refer to [`NodeAttachmentSchedulingScore`] for attached locations and
     699              :         // [`NodeSecondarySchedulingScore`] for secondary locations to understand how the nodes
     700              :         // are ranked.
     701        25080 :         scores.sort();
     702        25080 : 
     703        25080 :         if scores.is_empty() {
     704              :             // After applying constraints, no pageservers were left.
     705            8 :             if !matches!(context.mode, ScheduleMode::Speculative) {
     706              :                 // If this was not a speculative attempt, log details to understand why we couldn't
     707              :                 // schedule: this may help an engineer understand if some nodes are marked offline
     708              :                 // in a way that's preventing progress.
     709            8 :                 tracing::info!(
     710            0 :                     "Scheduling failure, while excluding {hard_exclude:?}, node states:"
     711              :                 );
     712           24 :                 for (node_id, node) in &self.nodes {
     713           16 :                     tracing::info!(
     714            0 :                         "Node {node_id}: may_schedule={} shards={}",
     715            0 :                         !matches!(node.may_schedule, MaySchedule::No),
     716              :                         node.shard_count
     717              :                     );
     718              :                 }
     719            0 :             }
     720            8 :             return Err(ScheduleError::ImpossibleConstraint);
     721        25072 :         }
     722        25072 : 
     723        25072 :         // Lowest score wins
     724        25072 :         let node_id = scores.first().unwrap().node_id();
     725              : 
     726        25072 :         if !matches!(context.mode, ScheduleMode::Speculative) {
     727        25072 :             tracing::info!(
     728            0 :             "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
     729            0 :             scores.iter().map(|i| i.node_id().0).collect::<Vec<_>>()
     730              :         );
     731            0 :         }
     732              : 
     733              :         // Note that we do not update shard count here to reflect the scheduling: that
     734              :         // is IntentState's job when the scheduled location is used.
     735              : 
     736        25072 :         Ok(node_id)
     737        25080 :     }
     738              : 
     739              :     /// Selects any available node. This is suitable for performing background work (e.g. S3
     740              :     /// deletions).
     741            0 :     pub(crate) fn any_available_node(&mut self) -> Result<NodeId, ScheduleError> {
     742            0 :         self.schedule_shard::<AttachedShardTag>(&[], &None, &ScheduleContext::default())
     743            0 :     }
     744              : 
     745              :     /// For choosing which AZ to schedule a new shard into, use this.  It will return the
     746              :     /// AZ with the lowest median utilization.
     747              :     ///
     748              :     /// We use an AZ-wide measure rather than simply selecting the AZ of the least-loaded
     749              :     /// node, because while tenants start out single sharded, when they grow and undergo
     750              :     /// shard-split, they will occupy space on many nodes within an AZ.
     751              :     ///
     752              :     /// We use median rather than total free space or mean utilization, because
     753              :     /// we wish to avoid preferring AZs that have low-load nodes resulting from
     754              :     /// recent replacements.
     755              :     ///
     756              :     /// The practical result is that we will pick an AZ based on its median node, and
     757              :     /// then actually _schedule_ the new shard onto the lowest-loaded node in that AZ.
     758            3 :     pub(crate) fn get_az_for_new_tenant(&self) -> Option<AvailabilityZone> {
     759            3 :         if self.nodes.is_empty() {
     760            0 :             return None;
     761            3 :         }
     762            3 : 
     763            3 :         let mut scores_by_az = HashMap::new();
     764           21 :         for (node_id, node) in &self.nodes {
     765           18 :             let az_scores = scores_by_az.entry(&node.az).or_insert_with(Vec::new);
     766           18 :             let score = match &node.may_schedule {
     767           18 :                 MaySchedule::Yes(utilization) => utilization.score(),
     768            0 :                 MaySchedule::No => PageserverUtilization::full().score(),
     769              :             };
     770           18 :             az_scores.push((node_id, node, score));
     771              :         }
     772              : 
     773              :         // Sort by utilization.  Also include the node ID to break ties.
     774            6 :         for scores in scores_by_az.values_mut() {
     775           34 :             scores.sort_by_key(|i| (i.2, i.0));
     776            6 :         }
     777              : 
     778            3 :         let mut median_by_az = scores_by_az
     779            3 :             .iter()
     780            6 :             .map(|(az, nodes)| (*az, nodes.get(nodes.len() / 2).unwrap().2))
     781            3 :             .collect::<Vec<_>>();
     782            3 :         // Sort by utilization.  Also include the AZ to break ties.
     783            6 :         median_by_az.sort_by_key(|i| (i.1, i.0));
     784            3 : 
     785            3 :         // Return the AZ with the lowest median utilization
     786            3 :         Some(median_by_az.first().unwrap().0.clone())
     787            3 :     }
     788              : 
     789              :     /// Unit test access to internal state
     790              :     #[cfg(test)]
     791           12 :     pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
     792           12 :         self.nodes.get(&node_id).unwrap().shard_count
     793           12 :     }
     794              : 
     795              :     #[cfg(test)]
     796           12 :     pub(crate) fn get_node_attached_shard_count(&self, node_id: NodeId) -> usize {
     797           12 :         self.nodes.get(&node_id).unwrap().attached_shard_count
     798           12 :     }
     799              : }
     800              : 
     801              : #[cfg(test)]
     802              : pub(crate) mod test_utils {
     803              : 
     804              :     use crate::node::Node;
     805              :     use pageserver_api::{
     806              :         controller_api::{AvailabilityZone, NodeAvailability},
     807              :         models::utilization::test_utilization,
     808              :     };
     809              :     use std::collections::HashMap;
     810              :     use utils::id::NodeId;
     811              : 
     812              :     /// Test helper: synthesize the requested number of nodes, all in active state.
     813              :     ///
     814              :     /// Node IDs start at one.
     815              :     ///
     816              :     /// The `azs` argument specifies the list of availability zones which will be assigned
     817              :     /// to nodes in round-robin fashion. If empy, a default AZ is assigned.
     818           62 :     pub(crate) fn make_test_nodes(n: u64, azs: &[AvailabilityZone]) -> HashMap<NodeId, Node> {
     819           62 :         let mut az_iter = azs.iter().cycle();
     820           62 : 
     821           62 :         (1..n + 1)
     822          239 :             .map(|i| {
     823          239 :                 (NodeId(i), {
     824          239 :                     let mut node = Node::new(
     825          239 :                         NodeId(i),
     826          239 :                         format!("httphost-{i}"),
     827          239 :                         80 + i as u16,
     828          239 :                         format!("pghost-{i}"),
     829          239 :                         5432 + i as u16,
     830          239 :                         az_iter
     831          239 :                             .next()
     832          239 :                             .cloned()
     833          239 :                             .unwrap_or(AvailabilityZone("test-az".to_string())),
     834          239 :                     );
     835          239 :                     node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
     836          239 :                     assert!(node.is_available());
     837          239 :                     node
     838          239 :                 })
     839          239 :             })
     840           62 :             .collect()
     841           62 :     }
     842              : }
     843              : 
     844              : #[cfg(test)]
     845              : mod tests {
     846              :     use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
     847              : 
     848              :     use super::*;
     849              : 
     850              :     use crate::tenant_shard::IntentState;
     851              :     #[test]
     852            1 :     fn scheduler_basic() -> anyhow::Result<()> {
     853            1 :         let nodes = test_utils::make_test_nodes(2, &[]);
     854            1 : 
     855            1 :         let mut scheduler = Scheduler::new(nodes.values());
     856            1 :         let mut t1_intent = IntentState::new();
     857            1 :         let mut t2_intent = IntentState::new();
     858            1 : 
     859            1 :         let context = ScheduleContext::default();
     860              : 
     861            1 :         let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
     862            1 :         t1_intent.set_attached(&mut scheduler, Some(scheduled));
     863            1 :         let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
     864            1 :         t2_intent.set_attached(&mut scheduler, Some(scheduled));
     865            1 : 
     866            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
     867            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
     868              : 
     869            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
     870            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
     871              : 
     872            1 :         let scheduled = scheduler.schedule_shard::<AttachedShardTag>(
     873            1 :             &t1_intent.all_pageservers(),
     874            1 :             &None,
     875            1 :             &context,
     876            1 :         )?;
     877            1 :         t1_intent.push_secondary(&mut scheduler, scheduled);
     878            1 : 
     879            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
     880            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
     881              : 
     882            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
     883            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
     884              : 
     885            1 :         t1_intent.clear(&mut scheduler);
     886            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
     887            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
     888              : 
     889            1 :         let total_attached = scheduler.get_node_attached_shard_count(NodeId(1))
     890            1 :             + scheduler.get_node_attached_shard_count(NodeId(2));
     891            1 :         assert_eq!(total_attached, 1);
     892              : 
     893            1 :         if cfg!(debug_assertions) {
     894              :             // Dropping an IntentState without clearing it causes a panic in debug mode,
     895              :             // because we have failed to properly update scheduler shard counts.
     896            1 :             let result = std::panic::catch_unwind(move || {
     897            1 :                 drop(t2_intent);
     898            1 :             });
     899            1 :             assert!(result.is_err());
     900              :         } else {
     901            0 :             t2_intent.clear(&mut scheduler);
     902            0 : 
     903            0 :             assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
     904            0 :             assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 0);
     905              : 
     906            0 :             assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 0);
     907            0 :             assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0);
     908              :         }
     909              : 
     910            1 :         Ok(())
     911            1 :     }
     912              : 
     913              :     #[test]
     914              :     /// Test the PageserverUtilization's contribution to scheduling algorithm
     915            1 :     fn scheduler_utilization() {
     916            1 :         let mut nodes = test_utils::make_test_nodes(3, &[]);
     917            1 :         let mut scheduler = Scheduler::new(nodes.values());
     918            1 : 
     919            1 :         // Need to keep these alive because they contribute to shard counts via RAII
     920            1 :         let mut scheduled_intents = Vec::new();
     921            1 : 
     922            1 :         let empty_context = ScheduleContext::default();
     923              : 
     924           11 :         fn assert_scheduler_chooses(
     925           11 :             expect_node: NodeId,
     926           11 :             scheduled_intents: &mut Vec<IntentState>,
     927           11 :             scheduler: &mut Scheduler,
     928           11 :             context: &ScheduleContext,
     929           11 :         ) {
     930           11 :             let scheduled = scheduler
     931           11 :                 .schedule_shard::<AttachedShardTag>(&[], &None, context)
     932           11 :                 .unwrap();
     933           11 :             let mut intent = IntentState::new();
     934           11 :             intent.set_attached(scheduler, Some(scheduled));
     935           11 :             scheduled_intents.push(intent);
     936           11 :             assert_eq!(scheduled, expect_node);
     937           11 :         }
     938              : 
     939              :         // Independent schedule calls onto empty nodes should round-robin, because each node's
     940              :         // utilization's shard count is updated inline.  The order is determinsitic because when all other factors are
     941              :         // equal, we order by node ID.
     942            1 :         assert_scheduler_chooses(
     943            1 :             NodeId(1),
     944            1 :             &mut scheduled_intents,
     945            1 :             &mut scheduler,
     946            1 :             &empty_context,
     947            1 :         );
     948            1 :         assert_scheduler_chooses(
     949            1 :             NodeId(2),
     950            1 :             &mut scheduled_intents,
     951            1 :             &mut scheduler,
     952            1 :             &empty_context,
     953            1 :         );
     954            1 :         assert_scheduler_chooses(
     955            1 :             NodeId(3),
     956            1 :             &mut scheduled_intents,
     957            1 :             &mut scheduler,
     958            1 :             &empty_context,
     959            1 :         );
     960            1 : 
     961            1 :         // Manually setting utilization higher should cause schedule calls to round-robin the other nodes
     962            1 :         // which have equal utilization.
     963            1 :         nodes
     964            1 :             .get_mut(&NodeId(1))
     965            1 :             .unwrap()
     966            1 :             .set_availability(NodeAvailability::Active(test_utilization::simple(
     967            1 :                 10,
     968            1 :                 1024 * 1024 * 1024,
     969            1 :             )));
     970            1 :         scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
     971            1 : 
     972            1 :         assert_scheduler_chooses(
     973            1 :             NodeId(2),
     974            1 :             &mut scheduled_intents,
     975            1 :             &mut scheduler,
     976            1 :             &empty_context,
     977            1 :         );
     978            1 :         assert_scheduler_chooses(
     979            1 :             NodeId(3),
     980            1 :             &mut scheduled_intents,
     981            1 :             &mut scheduler,
     982            1 :             &empty_context,
     983            1 :         );
     984            1 :         assert_scheduler_chooses(
     985            1 :             NodeId(2),
     986            1 :             &mut scheduled_intents,
     987            1 :             &mut scheduler,
     988            1 :             &empty_context,
     989            1 :         );
     990            1 :         assert_scheduler_chooses(
     991            1 :             NodeId(3),
     992            1 :             &mut scheduled_intents,
     993            1 :             &mut scheduler,
     994            1 :             &empty_context,
     995            1 :         );
     996            1 : 
     997            1 :         // The scheduler should prefer nodes with lower affinity score,
     998            1 :         // even if they have higher utilization (as long as they aren't utilized at >100%)
     999            1 :         let mut context_prefer_node1 = ScheduleContext::default();
    1000            1 :         context_prefer_node1.avoid(&[NodeId(2), NodeId(3)]);
    1001            1 :         assert_scheduler_chooses(
    1002            1 :             NodeId(1),
    1003            1 :             &mut scheduled_intents,
    1004            1 :             &mut scheduler,
    1005            1 :             &context_prefer_node1,
    1006            1 :         );
    1007            1 :         assert_scheduler_chooses(
    1008            1 :             NodeId(1),
    1009            1 :             &mut scheduled_intents,
    1010            1 :             &mut scheduler,
    1011            1 :             &context_prefer_node1,
    1012            1 :         );
    1013            1 : 
    1014            1 :         // If a node is over-utilized, it will not be used even if affinity scores prefer it
    1015            1 :         nodes
    1016            1 :             .get_mut(&NodeId(1))
    1017            1 :             .unwrap()
    1018            1 :             .set_availability(NodeAvailability::Active(test_utilization::simple(
    1019            1 :                 20000,
    1020            1 :                 1024 * 1024 * 1024,
    1021            1 :             )));
    1022            1 :         scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
    1023            1 :         assert_scheduler_chooses(
    1024            1 :             NodeId(2),
    1025            1 :             &mut scheduled_intents,
    1026            1 :             &mut scheduler,
    1027            1 :             &context_prefer_node1,
    1028            1 :         );
    1029            1 :         assert_scheduler_chooses(
    1030            1 :             NodeId(3),
    1031            1 :             &mut scheduled_intents,
    1032            1 :             &mut scheduler,
    1033            1 :             &context_prefer_node1,
    1034            1 :         );
    1035              : 
    1036           12 :         for mut intent in scheduled_intents {
    1037           11 :             intent.clear(&mut scheduler);
    1038           11 :         }
    1039            1 :     }
    1040              : 
    1041              :     #[test]
    1042              :     /// A simple test that showcases AZ-aware scheduling and its interaction with
    1043              :     /// affinity scores.
    1044            1 :     fn az_scheduling() {
    1045            1 :         let az_a_tag = AvailabilityZone("az-a".to_string());
    1046            1 :         let az_b_tag = AvailabilityZone("az-b".to_string());
    1047            1 : 
    1048            1 :         let nodes = test_utils::make_test_nodes(3, &[az_a_tag.clone(), az_b_tag.clone()]);
    1049            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1050            1 : 
    1051            1 :         // Need to keep these alive because they contribute to shard counts via RAII
    1052            1 :         let mut scheduled_intents = Vec::new();
    1053            1 : 
    1054            1 :         let mut context = ScheduleContext::default();
    1055              : 
    1056            6 :         fn assert_scheduler_chooses<Tag: ShardTag>(
    1057            6 :             expect_node: NodeId,
    1058            6 :             preferred_az: Option<AvailabilityZone>,
    1059            6 :             scheduled_intents: &mut Vec<IntentState>,
    1060            6 :             scheduler: &mut Scheduler,
    1061            6 :             context: &mut ScheduleContext,
    1062            6 :         ) {
    1063            6 :             let scheduled = scheduler
    1064            6 :                 .schedule_shard::<Tag>(&[], &preferred_az, context)
    1065            6 :                 .unwrap();
    1066            6 :             let mut intent = IntentState::new();
    1067            6 :             intent.set_attached(scheduler, Some(scheduled));
    1068            6 :             scheduled_intents.push(intent);
    1069            6 :             assert_eq!(scheduled, expect_node);
    1070              : 
    1071            6 :             context.avoid(&[scheduled]);
    1072            6 :         }
    1073              : 
    1074            1 :         assert_scheduler_chooses::<AttachedShardTag>(
    1075            1 :             NodeId(1),
    1076            1 :             Some(az_a_tag.clone()),
    1077            1 :             &mut scheduled_intents,
    1078            1 :             &mut scheduler,
    1079            1 :             &mut context,
    1080            1 :         );
    1081            1 : 
    1082            1 :         // Node 2 and 3 have affinity score equal to 0, but node 3
    1083            1 :         // is in "az-a" so we prefer that.
    1084            1 :         assert_scheduler_chooses::<AttachedShardTag>(
    1085            1 :             NodeId(3),
    1086            1 :             Some(az_a_tag.clone()),
    1087            1 :             &mut scheduled_intents,
    1088            1 :             &mut scheduler,
    1089            1 :             &mut context,
    1090            1 :         );
    1091            1 : 
    1092            1 :         // Node 2 is not in "az-a", but it has the lowest affinity so we prefer that.
    1093            1 :         assert_scheduler_chooses::<AttachedShardTag>(
    1094            1 :             NodeId(2),
    1095            1 :             Some(az_a_tag.clone()),
    1096            1 :             &mut scheduled_intents,
    1097            1 :             &mut scheduler,
    1098            1 :             &mut context,
    1099            1 :         );
    1100            1 : 
    1101            1 :         // Avoid nodes in "az-a" for the secondary location.
    1102            1 :         assert_scheduler_chooses::<SecondaryShardTag>(
    1103            1 :             NodeId(2),
    1104            1 :             Some(az_a_tag.clone()),
    1105            1 :             &mut scheduled_intents,
    1106            1 :             &mut scheduler,
    1107            1 :             &mut context,
    1108            1 :         );
    1109            1 : 
    1110            1 :         // Avoid nodes in "az-b" for the secondary location.
    1111            1 :         // Nodes 1 and 3 are identically loaded, so prefer the lowest node id.
    1112            1 :         assert_scheduler_chooses::<SecondaryShardTag>(
    1113            1 :             NodeId(1),
    1114            1 :             Some(az_b_tag.clone()),
    1115            1 :             &mut scheduled_intents,
    1116            1 :             &mut scheduler,
    1117            1 :             &mut context,
    1118            1 :         );
    1119            1 : 
    1120            1 :         // Avoid nodes in "az-b" for the secondary location.
    1121            1 :         // Node 3 has lower affinity score than 1, so prefer that.
    1122            1 :         assert_scheduler_chooses::<SecondaryShardTag>(
    1123            1 :             NodeId(3),
    1124            1 :             Some(az_b_tag.clone()),
    1125            1 :             &mut scheduled_intents,
    1126            1 :             &mut scheduler,
    1127            1 :             &mut context,
    1128            1 :         );
    1129              : 
    1130            7 :         for mut intent in scheduled_intents {
    1131            6 :             intent.clear(&mut scheduler);
    1132            6 :         }
    1133            1 :     }
    1134              : 
    1135              :     #[test]
    1136            1 :     fn az_scheduling_for_new_tenant() {
    1137            1 :         let az_a_tag = AvailabilityZone("az-a".to_string());
    1138            1 :         let az_b_tag = AvailabilityZone("az-b".to_string());
    1139            1 :         let nodes = test_utils::make_test_nodes(
    1140            1 :             6,
    1141            1 :             &[
    1142            1 :                 az_a_tag.clone(),
    1143            1 :                 az_a_tag.clone(),
    1144            1 :                 az_a_tag.clone(),
    1145            1 :                 az_b_tag.clone(),
    1146            1 :                 az_b_tag.clone(),
    1147            1 :                 az_b_tag.clone(),
    1148            1 :             ],
    1149            1 :         );
    1150            1 : 
    1151            1 :         let mut scheduler = Scheduler::new(nodes.values());
    1152              : 
    1153              :         /// Force the utilization of a node in Scheduler's state to a particular
    1154              :         /// number of bytes used.
    1155            2 :         fn set_utilization(scheduler: &mut Scheduler, node_id: NodeId, shard_count: u32) {
    1156            2 :             let mut node = Node::new(
    1157            2 :                 node_id,
    1158            2 :                 "".to_string(),
    1159            2 :                 0,
    1160            2 :                 "".to_string(),
    1161            2 :                 0,
    1162            2 :                 scheduler.nodes.get(&node_id).unwrap().az.clone(),
    1163            2 :             );
    1164            2 :             node.set_availability(NodeAvailability::Active(test_utilization::simple(
    1165            2 :                 shard_count,
    1166            2 :                 0,
    1167            2 :             )));
    1168            2 :             scheduler.node_upsert(&node);
    1169            2 :         }
    1170              : 
    1171              :         // Initial empty state.  Scores are tied, scheduler prefers lower AZ ID.
    1172            1 :         assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone()));
    1173              : 
    1174              :         // Put some utilization on one node in AZ A: this should change nothing, as the median hasn't changed
    1175            1 :         set_utilization(&mut scheduler, NodeId(1), 1000000);
    1176            1 :         assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone()));
    1177              : 
    1178              :         // Put some utilization on a second node in AZ A: now the median has changed, so the scheduler
    1179              :         // should prefer the other AZ.
    1180            1 :         set_utilization(&mut scheduler, NodeId(2), 1000000);
    1181            1 :         assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_b_tag.clone()));
    1182            1 :     }
    1183              : }
        

Generated by: LCOV version 2.1-beta