LCOV - code coverage report
Current view: top level - storage_controller/src - scheduler.rs (source / functions) Coverage Total Hit
Test: f8d8f5b90fa487a9e82c42da223f012f5d4fece7.info Lines: 83.0 % 464 385
Test Date: 2024-09-19 20:36:02 Functions: 69.8 % 43 30

            Line data    Source code
       1              : use crate::{node::Node, tenant_shard::TenantShard};
       2              : use itertools::Itertools;
       3              : use pageserver_api::models::PageserverUtilization;
       4              : use serde::Serialize;
       5              : use std::collections::HashMap;
       6              : use utils::{http::error::ApiError, id::NodeId};
       7              : 
       8              : /// Scenarios in which we cannot find a suitable location for a tenant shard
       9            0 : #[derive(thiserror::Error, Debug)]
      10              : pub enum ScheduleError {
      11              :     #[error("No pageservers found")]
      12              :     NoPageservers,
      13              :     #[error("No pageserver found matching constraint")]
      14              :     ImpossibleConstraint,
      15              : }
      16              : 
      17              : impl From<ScheduleError> for ApiError {
      18            0 :     fn from(value: ScheduleError) -> Self {
      19            0 :         ApiError::Conflict(format!("Scheduling error: {}", value))
      20            0 :     }
      21              : }
      22              : 
      23              : #[derive(Serialize)]
      24              : pub enum MaySchedule {
      25              :     Yes(PageserverUtilization),
      26              :     No,
      27              : }
      28              : 
      29              : #[derive(Serialize)]
      30              : struct SchedulerNode {
      31              :     /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
      32              :     shard_count: usize,
      33              :     /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
      34              :     attached_shard_count: usize,
      35              : 
      36              :     /// Whether this node is currently elegible to have new shards scheduled (this is derived
      37              :     /// from a node's availability state and scheduling policy).
      38              :     may_schedule: MaySchedule,
      39              : }
      40              : 
      41              : impl PartialEq for SchedulerNode {
      42            3 :     fn eq(&self, other: &Self) -> bool {
      43            3 :         let may_schedule_matches = matches!(
      44            3 :             (&self.may_schedule, &other.may_schedule),
      45              :             (MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
      46              :         );
      47              : 
      48            3 :         may_schedule_matches
      49            3 :             && self.shard_count == other.shard_count
      50            3 :             && self.attached_shard_count == other.attached_shard_count
      51            3 :     }
      52              : }
      53              : 
      54              : impl Eq for SchedulerNode {}
      55              : 
      56              : /// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
      57              : /// on which to run.
      58              : ///
      59              : /// The type has no persistent state of its own: this is all populated at startup.  The Serialize
      60              : /// impl is only for debug dumps.
      61              : #[derive(Serialize)]
      62              : pub(crate) struct Scheduler {
      63              :     nodes: HashMap<NodeId, SchedulerNode>,
      64              : }
      65              : 
      66              : /// Score for soft constraint scheduling: lower scores are preferred to higher scores.
      67              : ///
      68              : /// For example, we may set an affinity score based on the number of shards from the same
      69              : /// tenant already on a node, to implicitly prefer to balance out shards.
      70              : #[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
      71              : pub(crate) struct AffinityScore(pub(crate) usize);
      72              : 
      73              : impl AffinityScore {
      74              :     /// If we have no anti-affinity at all toward a node, this is its score.  It means
      75              :     /// the scheduler has a free choice amongst nodes with this score, and may pick a node
      76              :     /// based on other information such as total utilization.
      77              :     pub(crate) const FREE: Self = Self(0);
      78              : 
      79           80 :     pub(crate) fn inc(&mut self) {
      80           80 :         self.0 += 1;
      81           80 :     }
      82              : }
      83              : 
      84              : impl std::ops::Add for AffinityScore {
      85              :     type Output = Self;
      86              : 
      87           20 :     fn add(self, rhs: Self) -> Self::Output {
      88           20 :         Self(self.0 + rhs.0)
      89           20 :     }
      90              : }
      91              : 
      92              : /// Hint for whether this is a sincere attempt to schedule, or a speculative
      93              : /// check for where we _would_ schedule (done during optimization)
      94              : #[derive(Debug)]
      95              : pub(crate) enum ScheduleMode {
      96              :     Normal,
      97              :     Speculative,
      98              : }
      99              : 
     100              : impl Default for ScheduleMode {
     101           16 :     fn default() -> Self {
     102           16 :         Self::Normal
     103           16 :     }
     104              : }
     105              : 
     106              : // For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
     107              : // it for many shards in the same tenant.
     108              : #[derive(Debug, Default)]
     109              : pub(crate) struct ScheduleContext {
     110              :     /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
     111              :     pub(crate) nodes: HashMap<NodeId, AffinityScore>,
     112              : 
     113              :     /// Specifically how many _attached_ locations are on each node
     114              :     pub(crate) attached_nodes: HashMap<NodeId, usize>,
     115              : 
     116              :     pub(crate) mode: ScheduleMode,
     117              : }
     118              : 
     119              : impl ScheduleContext {
     120              :     /// Input is a list of nodes we would like to avoid using again within this context.  The more
     121              :     /// times a node is passed into this call, the less inclined we are to use it.
     122           41 :     pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
     123          121 :         for node_id in nodes {
     124           80 :             let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
     125           80 :             entry.inc()
     126              :         }
     127           41 :     }
     128              : 
     129           39 :     pub(crate) fn push_attached(&mut self, node_id: NodeId) {
     130           39 :         let entry = self.attached_nodes.entry(node_id).or_default();
     131           39 :         *entry += 1;
     132           39 :     }
     133              : 
     134           45 :     pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
     135           45 :         self.nodes
     136           45 :             .get(&node_id)
     137           45 :             .copied()
     138           45 :             .unwrap_or(AffinityScore::FREE)
     139           45 :     }
     140              : 
     141           45 :     pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
     142           45 :         self.attached_nodes.get(&node_id).copied().unwrap_or(0)
     143           45 :     }
     144              : }
     145              : 
     146              : pub(crate) enum RefCountUpdate {
     147              :     PromoteSecondary,
     148              :     Attach,
     149              :     Detach,
     150              :     DemoteAttached,
     151              :     AddSecondary,
     152              :     RemoveSecondary,
     153              : }
     154              : 
     155              : impl Scheduler {
     156            8 :     pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
     157            8 :         let mut scheduler_nodes = HashMap::new();
     158           29 :         for node in nodes {
     159           21 :             scheduler_nodes.insert(
     160           21 :                 node.get_id(),
     161           21 :                 SchedulerNode {
     162           21 :                     shard_count: 0,
     163           21 :                     attached_shard_count: 0,
     164           21 :                     may_schedule: node.may_schedule(),
     165           21 :                 },
     166           21 :             );
     167           21 :         }
     168              : 
     169            8 :         Self {
     170            8 :             nodes: scheduler_nodes,
     171            8 :         }
     172            8 :     }
     173              : 
     174              :     /// For debug/support: check that our internal statistics are in sync with the state of
     175              :     /// the nodes & tenant shards.
     176              :     ///
     177              :     /// If anything is inconsistent, log details and return an error.
     178            1 :     pub(crate) fn consistency_check<'a>(
     179            1 :         &self,
     180            1 :         nodes: impl Iterator<Item = &'a Node>,
     181            1 :         shards: impl Iterator<Item = &'a TenantShard>,
     182            1 :     ) -> anyhow::Result<()> {
     183            1 :         let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
     184            4 :         for node in nodes {
     185            3 :             expect_nodes.insert(
     186            3 :                 node.get_id(),
     187            3 :                 SchedulerNode {
     188            3 :                     shard_count: 0,
     189            3 :                     attached_shard_count: 0,
     190            3 :                     may_schedule: node.may_schedule(),
     191            3 :                 },
     192            3 :             );
     193            3 :         }
     194              : 
     195            2 :         for shard in shards {
     196            1 :             if let Some(node_id) = shard.intent.get_attached() {
     197            1 :                 match expect_nodes.get_mut(node_id) {
     198            1 :                     Some(node) => {
     199            1 :                         node.shard_count += 1;
     200            1 :                         node.attached_shard_count += 1;
     201            1 :                     }
     202            0 :                     None => anyhow::bail!(
     203            0 :                         "Tenant {} references nonexistent node {}",
     204            0 :                         shard.tenant_shard_id,
     205            0 :                         node_id
     206            0 :                     ),
     207              :                 }
     208            0 :             }
     209              : 
     210            1 :             for node_id in shard.intent.get_secondary() {
     211            1 :                 match expect_nodes.get_mut(node_id) {
     212            1 :                     Some(node) => node.shard_count += 1,
     213            0 :                     None => anyhow::bail!(
     214            0 :                         "Tenant {} references nonexistent node {}",
     215            0 :                         shard.tenant_shard_id,
     216            0 :                         node_id
     217            0 :                     ),
     218              :                 }
     219              :             }
     220              :         }
     221              : 
     222            4 :         for (node_id, expect_node) in &expect_nodes {
     223            3 :             let Some(self_node) = self.nodes.get(node_id) else {
     224            0 :                 anyhow::bail!("Node {node_id} not found in Self")
     225              :             };
     226              : 
     227            3 :             if self_node != expect_node {
     228            0 :                 tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
     229            0 :                 tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
     230            0 :                 tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
     231              : 
     232            0 :                 anyhow::bail!("Inconsistent state on {node_id}");
     233            3 :             }
     234              :         }
     235              : 
     236            1 :         if expect_nodes.len() != self.nodes.len() {
     237              :             // We just checked that all the expected nodes are present.  If the lengths don't match,
     238              :             // it means that we have nodes in Self that are unexpected.
     239            0 :             for node_id in self.nodes.keys() {
     240            0 :                 if !expect_nodes.contains_key(node_id) {
     241            0 :                     anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
     242            0 :                 }
     243              :             }
     244            1 :         }
     245              : 
     246            1 :         Ok(())
     247            1 :     }
     248              : 
     249              :     /// Update the reference counts of a node. These reference counts are used to guide scheduling
     250              :     /// decisions, not for memory management: they represent the number of tenant shard whose IntentState
     251              :     /// targets this node and the number of tenants shars whose IntentState is attached to this
     252              :     /// node.
     253              :     ///
     254              :     /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
     255              :     /// [`Self::new`] or [`Self::node_upsert`])
     256           91 :     pub(crate) fn update_node_ref_counts(&mut self, node_id: NodeId, update: RefCountUpdate) {
     257           91 :         let Some(node) = self.nodes.get_mut(&node_id) else {
     258            0 :             debug_assert!(false);
     259            0 :             tracing::error!("Scheduler missing node {node_id}");
     260            0 :             return;
     261              :         };
     262              : 
     263           91 :         match update {
     264            5 :             RefCountUpdate::PromoteSecondary => {
     265            5 :                 node.attached_shard_count += 1;
     266            5 :             }
     267           24 :             RefCountUpdate::Attach => {
     268           24 :                 node.shard_count += 1;
     269           24 :                 node.attached_shard_count += 1;
     270           24 :             }
     271           23 :             RefCountUpdate::Detach => {
     272           23 :                 node.shard_count -= 1;
     273           23 :                 node.attached_shard_count -= 1;
     274           23 :             }
     275            5 :             RefCountUpdate::DemoteAttached => {
     276            5 :                 node.attached_shard_count -= 1;
     277            5 :             }
     278           17 :             RefCountUpdate::AddSecondary => {
     279           17 :                 node.shard_count += 1;
     280           17 :             }
     281           17 :             RefCountUpdate::RemoveSecondary => {
     282           17 :                 node.shard_count -= 1;
     283           17 :             }
     284              :         }
     285              : 
     286              :         // Maybe update PageserverUtilization
     287           91 :         match update {
     288              :             RefCountUpdate::AddSecondary | RefCountUpdate::Attach => {
     289              :                 // Referencing the node: if this takes our shard_count above the utilzation structure's
     290              :                 // shard count, then artifically bump it: this ensures that the scheduler immediately
     291              :                 // recognizes that this node has more work on it, without waiting for the next heartbeat
     292              :                 // to update the utilization.
     293           41 :                 if let MaySchedule::Yes(utilization) = &mut node.may_schedule {
     294           41 :                     utilization.adjust_shard_count_max(node.shard_count as u32);
     295           41 :                 }
     296              :             }
     297              :             RefCountUpdate::PromoteSecondary
     298              :             | RefCountUpdate::Detach
     299              :             | RefCountUpdate::RemoveSecondary
     300           50 :             | RefCountUpdate::DemoteAttached => {
     301           50 :                 // De-referencing the node: leave the utilization's shard_count at a stale higher
     302           50 :                 // value until some future heartbeat after we have physically removed this shard
     303           50 :                 // from the node: this prevents the scheduler over-optimistically trying to schedule
     304           50 :                 // more work onto the node before earlier detaches are done.
     305           50 :             }
     306              :         }
     307           91 :     }
     308              : 
     309              :     // Check if the number of shards attached to a given node is lagging below
     310              :     // the cluster average. If that's the case, the node should be filled.
     311            0 :     pub(crate) fn compute_fill_requirement(&self, node_id: NodeId) -> usize {
     312            0 :         let Some(node) = self.nodes.get(&node_id) else {
     313            0 :             debug_assert!(false);
     314            0 :             tracing::error!("Scheduler missing node {node_id}");
     315            0 :             return 0;
     316              :         };
     317            0 :         assert!(!self.nodes.is_empty());
     318            0 :         let expected_attached_shards_per_node = self.expected_attached_shard_count();
     319              : 
     320            0 :         for (node_id, node) in self.nodes.iter() {
     321            0 :             tracing::trace!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
     322              :         }
     323              : 
     324            0 :         if node.attached_shard_count < expected_attached_shards_per_node {
     325            0 :             expected_attached_shards_per_node - node.attached_shard_count
     326              :         } else {
     327            0 :             0
     328              :         }
     329            0 :     }
     330              : 
     331            0 :     pub(crate) fn expected_attached_shard_count(&self) -> usize {
     332            0 :         let total_attached_shards: usize =
     333            0 :             self.nodes.values().map(|n| n.attached_shard_count).sum();
     334            0 : 
     335            0 :         assert!(!self.nodes.is_empty());
     336            0 :         total_attached_shards / self.nodes.len()
     337            0 :     }
     338              : 
     339            0 :     pub(crate) fn nodes_by_attached_shard_count(&self) -> Vec<(NodeId, usize)> {
     340            0 :         self.nodes
     341            0 :             .iter()
     342            0 :             .map(|(node_id, stats)| (*node_id, stats.attached_shard_count))
     343            0 :             .sorted_by(|lhs, rhs| Ord::cmp(&lhs.1, &rhs.1).reverse())
     344            0 :             .collect()
     345            0 :     }
     346              : 
     347            7 :     pub(crate) fn node_upsert(&mut self, node: &Node) {
     348              :         use std::collections::hash_map::Entry::*;
     349            7 :         match self.nodes.entry(node.get_id()) {
     350            3 :             Occupied(mut entry) => {
     351            3 :                 // Updates to MaySchedule are how we receive updated PageserverUtilization: adjust these values
     352            3 :                 // to account for any shards scheduled on the controller but not yet visible to the pageserver.
     353            3 :                 let mut may_schedule = node.may_schedule();
     354            3 :                 match &mut may_schedule {
     355            2 :                     MaySchedule::Yes(utilization) => {
     356            2 :                         utilization.adjust_shard_count_max(entry.get().shard_count as u32);
     357            2 :                     }
     358            1 :                     MaySchedule::No => { // Nothing to tweak
     359            1 :                     }
     360              :                 }
     361              : 
     362            3 :                 entry.get_mut().may_schedule = may_schedule;
     363              :             }
     364            4 :             Vacant(entry) => {
     365            4 :                 entry.insert(SchedulerNode {
     366            4 :                     shard_count: 0,
     367            4 :                     attached_shard_count: 0,
     368            4 :                     may_schedule: node.may_schedule(),
     369            4 :                 });
     370            4 :             }
     371              :         }
     372            7 :     }
     373              : 
     374            0 :     pub(crate) fn node_remove(&mut self, node_id: NodeId) {
     375            0 :         if self.nodes.remove(&node_id).is_none() {
     376            0 :             tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
     377            0 :         }
     378            0 :     }
     379              : 
     380              :     /// Where we have several nodes to choose from, for example when picking a secondary location
     381              :     /// to promote to an attached location, this method may be used to pick the best choice based
     382              :     /// on the scheduler's knowledge of utilization and availability.
     383              :     ///
     384              :     /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
     385              :     /// caller can pick a node some other way.
     386           14 :     pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option<NodeId> {
     387           14 :         if nodes.is_empty() {
     388           12 :             return None;
     389            2 :         }
     390            2 : 
     391            2 :         // TODO: When the utilization score returned by the pageserver becomes meaningful,
     392            2 :         // schedule based on that instead of the shard count.
     393            2 :         let node = nodes
     394            2 :             .iter()
     395            4 :             .map(|node_id| {
     396            4 :                 let may_schedule = self
     397            4 :                     .nodes
     398            4 :                     .get(node_id)
     399            4 :                     .map(|n| !matches!(n.may_schedule, MaySchedule::No))
     400            4 :                     .unwrap_or(false);
     401            4 :                 (*node_id, may_schedule)
     402            4 :             })
     403            4 :             .max_by_key(|(_n, may_schedule)| *may_schedule);
     404            2 : 
     405            2 :         // If even the preferred node has may_schedule==false, return None
     406            2 :         node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
     407           14 :     }
     408              : 
     409              :     /// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
     410              :     /// are already in use by this shard -- we use this to avoid picking the same node
     411              :     /// as both attached and secondary location.  This is a hard constraint: if we cannot
     412              :     /// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
     413              :     ///
     414              :     /// context: we prefer to avoid using nodes identified in the context, according
     415              :     /// to their anti-affinity score.  We use this to prefeer to avoid placing shards in
     416              :     /// the same tenant on the same node.  This is a soft constraint: the context will never
     417              :     /// cause us to fail to schedule a shard.
     418           38 :     pub(crate) fn schedule_shard(
     419           38 :         &mut self,
     420           38 :         hard_exclude: &[NodeId],
     421           38 :         context: &ScheduleContext,
     422           38 :     ) -> Result<NodeId, ScheduleError> {
     423           38 :         if self.nodes.is_empty() {
     424            0 :             return Err(ScheduleError::NoPageservers);
     425           38 :         }
     426           38 : 
     427           38 :         let mut scores: Vec<(NodeId, AffinityScore, u64, usize)> = self
     428           38 :             .nodes
     429           38 :             .iter_mut()
     430          115 :             .filter_map(|(k, v)| match &mut v.may_schedule {
     431            0 :                 MaySchedule::No => None,
     432          115 :                 MaySchedule::Yes(_) if hard_exclude.contains(k) => None,
     433           84 :                 MaySchedule::Yes(utilization) => Some((
     434           84 :                     *k,
     435           84 :                     context.nodes.get(k).copied().unwrap_or(AffinityScore::FREE),
     436           84 :                     utilization.cached_score(),
     437           84 :                     v.attached_shard_count,
     438           84 :                 )),
     439          115 :             })
     440           38 :             .collect();
     441           38 : 
     442           38 :         // Exclude nodes whose utilization is critically high, if there are alternatives available.  This will
     443           38 :         // cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example
     444           38 :         // we may place shards in the same tenant together on the same pageserver if all other pageservers are
     445           38 :         // overloaded.
     446           38 :         let non_overloaded_scores = scores
     447           38 :             .iter()
     448           84 :             .filter(|i| !PageserverUtilization::is_overloaded(i.2))
     449           38 :             .copied()
     450           38 :             .collect::<Vec<_>>();
     451           38 :         if !non_overloaded_scores.is_empty() {
     452           38 :             scores = non_overloaded_scores;
     453           38 :         }
     454              : 
     455              :         // Sort by, in order of precedence:
     456              :         //  1st: Affinity score.  We should never pick a higher-score node if a lower-score node is available
     457              :         //  2nd: Utilization score (this combines shard count and disk utilization)
     458              :         //  3rd: Attached shard count.  When nodes have identical utilization (e.g. when populating some
     459              :         //       empty nodes), this acts as an anti-affinity between attached shards.
     460              :         //  4th: Node ID.  This is a convenience to make selection deterministic in tests and empty systems.
     461          108 :         scores.sort_by_key(|i| (i.1, i.2, i.3, i.0));
     462           38 : 
     463           38 :         if scores.is_empty() {
     464              :             // After applying constraints, no pageservers were left.
     465            0 :             if !matches!(context.mode, ScheduleMode::Speculative) {
     466              :                 // If this was not a speculative attempt, log details to understand why we couldn't
     467              :                 // schedule: this may help an engineer understand if some nodes are marked offline
     468              :                 // in a way that's preventing progress.
     469            0 :                 tracing::info!(
     470            0 :                     "Scheduling failure, while excluding {hard_exclude:?}, node states:"
     471              :                 );
     472            0 :                 for (node_id, node) in &self.nodes {
     473            0 :                     tracing::info!(
     474            0 :                         "Node {node_id}: may_schedule={} shards={}",
     475            0 :                         !matches!(node.may_schedule, MaySchedule::No),
     476              :                         node.shard_count
     477              :                     );
     478              :                 }
     479            0 :             }
     480            0 :             return Err(ScheduleError::ImpossibleConstraint);
     481           38 :         }
     482           38 : 
     483           38 :         // Lowest score wins
     484           38 :         let node_id = scores.first().unwrap().0;
     485              : 
     486           38 :         if !matches!(context.mode, ScheduleMode::Speculative) {
     487           38 :             tracing::info!(
     488            0 :             "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
     489            0 :             scores.iter().map(|i| i.0 .0).collect::<Vec<_>>()
     490              :         );
     491            0 :         }
     492              : 
     493              :         // Note that we do not update shard count here to reflect the scheduling: that
     494              :         // is IntentState's job when the scheduled location is used.
     495              : 
     496           38 :         Ok(node_id)
     497           38 :     }
     498              : 
     499              :     /// Unit test access to internal state
     500              :     #[cfg(test)]
     501           12 :     pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
     502           12 :         self.nodes.get(&node_id).unwrap().shard_count
     503           12 :     }
     504              : 
     505              :     #[cfg(test)]
     506           12 :     pub(crate) fn get_node_attached_shard_count(&self, node_id: NodeId) -> usize {
     507           12 :         self.nodes.get(&node_id).unwrap().attached_shard_count
     508           12 :     }
     509              : }
     510              : 
     511              : #[cfg(test)]
     512              : pub(crate) mod test_utils {
     513              : 
     514              :     use crate::node::Node;
     515              :     use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
     516              :     use std::collections::HashMap;
     517              :     use utils::id::NodeId;
     518              :     /// Test helper: synthesize the requested number of nodes, all in active state.
     519              :     ///
     520              :     /// Node IDs start at one.
     521            8 :     pub(crate) fn make_test_nodes(n: u64) -> HashMap<NodeId, Node> {
     522            8 :         (1..n + 1)
     523           25 :             .map(|i| {
     524           25 :                 (NodeId(i), {
     525           25 :                     let mut node = Node::new(
     526           25 :                         NodeId(i),
     527           25 :                         format!("httphost-{i}"),
     528           25 :                         80 + i as u16,
     529           25 :                         format!("pghost-{i}"),
     530           25 :                         5432 + i as u16,
     531           25 :                         "test-az".to_string(),
     532           25 :                     );
     533           25 :                     node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
     534           25 :                     assert!(node.is_available());
     535           25 :                     node
     536           25 :                 })
     537           25 :             })
     538            8 :             .collect()
     539            8 :     }
     540              : }
     541              : 
     542              : #[cfg(test)]
     543              : mod tests {
     544              :     use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
     545              : 
     546              :     use super::*;
     547              : 
     548              :     use crate::tenant_shard::IntentState;
     549              :     #[test]
     550            1 :     fn scheduler_basic() -> anyhow::Result<()> {
     551            1 :         let nodes = test_utils::make_test_nodes(2);
     552            1 : 
     553            1 :         let mut scheduler = Scheduler::new(nodes.values());
     554            1 :         let mut t1_intent = IntentState::new();
     555            1 :         let mut t2_intent = IntentState::new();
     556            1 : 
     557            1 :         let context = ScheduleContext::default();
     558              : 
     559            1 :         let scheduled = scheduler.schedule_shard(&[], &context)?;
     560            1 :         t1_intent.set_attached(&mut scheduler, Some(scheduled));
     561            1 :         let scheduled = scheduler.schedule_shard(&[], &context)?;
     562            1 :         t2_intent.set_attached(&mut scheduler, Some(scheduled));
     563            1 : 
     564            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
     565            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
     566              : 
     567            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
     568            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
     569              : 
     570            1 :         let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers(), &context)?;
     571            1 :         t1_intent.push_secondary(&mut scheduler, scheduled);
     572            1 : 
     573            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
     574            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
     575              : 
     576            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
     577            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
     578              : 
     579            1 :         t1_intent.clear(&mut scheduler);
     580            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
     581            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
     582              : 
     583            1 :         let total_attached = scheduler.get_node_attached_shard_count(NodeId(1))
     584            1 :             + scheduler.get_node_attached_shard_count(NodeId(2));
     585            1 :         assert_eq!(total_attached, 1);
     586              : 
     587            1 :         if cfg!(debug_assertions) {
     588              :             // Dropping an IntentState without clearing it causes a panic in debug mode,
     589              :             // because we have failed to properly update scheduler shard counts.
     590            1 :             let result = std::panic::catch_unwind(move || {
     591            1 :                 drop(t2_intent);
     592            1 :             });
     593            1 :             assert!(result.is_err());
     594              :         } else {
     595            0 :             t2_intent.clear(&mut scheduler);
     596            0 : 
     597            0 :             assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
     598            0 :             assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 0);
     599              : 
     600            0 :             assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 0);
     601            0 :             assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0);
     602              :         }
     603              : 
     604            1 :         Ok(())
     605            1 :     }
     606              : 
     607              :     #[test]
     608              :     /// Test the PageserverUtilization's contribution to scheduling algorithm
     609            1 :     fn scheduler_utilization() {
     610            1 :         let mut nodes = test_utils::make_test_nodes(3);
     611            1 :         let mut scheduler = Scheduler::new(nodes.values());
     612            1 : 
     613            1 :         // Need to keep these alive because they contribute to shard counts via RAII
     614            1 :         let mut scheduled_intents = Vec::new();
     615            1 : 
     616            1 :         let empty_context = ScheduleContext::default();
     617              : 
     618           11 :         fn assert_scheduler_chooses(
     619           11 :             expect_node: NodeId,
     620           11 :             scheduled_intents: &mut Vec<IntentState>,
     621           11 :             scheduler: &mut Scheduler,
     622           11 :             context: &ScheduleContext,
     623           11 :         ) {
     624           11 :             let scheduled = scheduler.schedule_shard(&[], context).unwrap();
     625           11 :             let mut intent = IntentState::new();
     626           11 :             intent.set_attached(scheduler, Some(scheduled));
     627           11 :             scheduled_intents.push(intent);
     628           11 :             assert_eq!(scheduled, expect_node);
     629           11 :         }
     630              : 
     631              :         // Independent schedule calls onto empty nodes should round-robin, because each node's
     632              :         // utilization's shard count is updated inline.  The order is determinsitic because when all other factors are
     633              :         // equal, we order by node ID.
     634            1 :         assert_scheduler_chooses(
     635            1 :             NodeId(1),
     636            1 :             &mut scheduled_intents,
     637            1 :             &mut scheduler,
     638            1 :             &empty_context,
     639            1 :         );
     640            1 :         assert_scheduler_chooses(
     641            1 :             NodeId(2),
     642            1 :             &mut scheduled_intents,
     643            1 :             &mut scheduler,
     644            1 :             &empty_context,
     645            1 :         );
     646            1 :         assert_scheduler_chooses(
     647            1 :             NodeId(3),
     648            1 :             &mut scheduled_intents,
     649            1 :             &mut scheduler,
     650            1 :             &empty_context,
     651            1 :         );
     652            1 : 
     653            1 :         // Manually setting utilization higher should cause schedule calls to round-robin the other nodes
     654            1 :         // which have equal utilization.
     655            1 :         nodes
     656            1 :             .get_mut(&NodeId(1))
     657            1 :             .unwrap()
     658            1 :             .set_availability(NodeAvailability::Active(test_utilization::simple(
     659            1 :                 10,
     660            1 :                 1024 * 1024 * 1024,
     661            1 :             )));
     662            1 :         scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
     663            1 : 
     664            1 :         assert_scheduler_chooses(
     665            1 :             NodeId(2),
     666            1 :             &mut scheduled_intents,
     667            1 :             &mut scheduler,
     668            1 :             &empty_context,
     669            1 :         );
     670            1 :         assert_scheduler_chooses(
     671            1 :             NodeId(3),
     672            1 :             &mut scheduled_intents,
     673            1 :             &mut scheduler,
     674            1 :             &empty_context,
     675            1 :         );
     676            1 :         assert_scheduler_chooses(
     677            1 :             NodeId(2),
     678            1 :             &mut scheduled_intents,
     679            1 :             &mut scheduler,
     680            1 :             &empty_context,
     681            1 :         );
     682            1 :         assert_scheduler_chooses(
     683            1 :             NodeId(3),
     684            1 :             &mut scheduled_intents,
     685            1 :             &mut scheduler,
     686            1 :             &empty_context,
     687            1 :         );
     688            1 : 
     689            1 :         // The scheduler should prefer nodes with lower affinity score,
     690            1 :         // even if they have higher utilization (as long as they aren't utilized at >100%)
     691            1 :         let mut context_prefer_node1 = ScheduleContext::default();
     692            1 :         context_prefer_node1.avoid(&[NodeId(2), NodeId(3)]);
     693            1 :         assert_scheduler_chooses(
     694            1 :             NodeId(1),
     695            1 :             &mut scheduled_intents,
     696            1 :             &mut scheduler,
     697            1 :             &context_prefer_node1,
     698            1 :         );
     699            1 :         assert_scheduler_chooses(
     700            1 :             NodeId(1),
     701            1 :             &mut scheduled_intents,
     702            1 :             &mut scheduler,
     703            1 :             &context_prefer_node1,
     704            1 :         );
     705            1 : 
     706            1 :         // If a node is over-utilized, it will not be used even if affinity scores prefer it
     707            1 :         nodes
     708            1 :             .get_mut(&NodeId(1))
     709            1 :             .unwrap()
     710            1 :             .set_availability(NodeAvailability::Active(test_utilization::simple(
     711            1 :                 20000,
     712            1 :                 1024 * 1024 * 1024,
     713            1 :             )));
     714            1 :         scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
     715            1 :         assert_scheduler_chooses(
     716            1 :             NodeId(2),
     717            1 :             &mut scheduled_intents,
     718            1 :             &mut scheduler,
     719            1 :             &context_prefer_node1,
     720            1 :         );
     721            1 :         assert_scheduler_chooses(
     722            1 :             NodeId(3),
     723            1 :             &mut scheduled_intents,
     724            1 :             &mut scheduler,
     725            1 :             &context_prefer_node1,
     726            1 :         );
     727              : 
     728           12 :         for mut intent in scheduled_intents {
     729           11 :             intent.clear(&mut scheduler);
     730           11 :         }
     731            1 :     }
     732              : }
        

Generated by: LCOV version 2.1-beta