LCOV - code coverage report
Current view: top level - storage_controller/src - scheduler.rs (source / functions) Coverage Total Hit
Test: 2aa98e37cd3250b9a68c97ef6050b16fe702ab33.info Lines: 83.2 % 470 391
Test Date: 2024-08-29 11:33:10 Functions: 69.8 % 43 30

            Line data    Source code
       1              : use crate::{node::Node, tenant_shard::TenantShard};
       2              : use itertools::Itertools;
       3              : use pageserver_api::models::PageserverUtilization;
       4              : use serde::Serialize;
       5              : use std::collections::HashMap;
       6              : use utils::{http::error::ApiError, id::NodeId};
       7              : 
       8              : /// Scenarios in which we cannot find a suitable location for a tenant shard
       9            0 : #[derive(thiserror::Error, Debug)]
      10              : pub enum ScheduleError {
      11              :     #[error("No pageservers found")]
      12              :     NoPageservers,
      13              :     #[error("No pageserver found matching constraint")]
      14              :     ImpossibleConstraint,
      15              : }
      16              : 
      17              : impl From<ScheduleError> for ApiError {
      18            0 :     fn from(value: ScheduleError) -> Self {
      19            0 :         ApiError::Conflict(format!("Scheduling error: {}", value))
      20            0 :     }
      21              : }
      22              : 
      23              : #[derive(Serialize)]
      24              : pub enum MaySchedule {
      25              :     Yes(PageserverUtilization),
      26              :     No,
      27              : }
      28              : 
      29              : #[derive(Serialize)]
      30              : struct SchedulerNode {
      31              :     /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
      32              :     shard_count: usize,
      33              :     /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
      34              :     attached_shard_count: usize,
      35              : 
      36              :     /// Whether this node is currently elegible to have new shards scheduled (this is derived
      37              :     /// from a node's availability state and scheduling policy).
      38              :     may_schedule: MaySchedule,
      39              : }
      40              : 
      41              : impl PartialEq for SchedulerNode {
      42           18 :     fn eq(&self, other: &Self) -> bool {
      43           18 :         let may_schedule_matches = matches!(
      44           18 :             (&self.may_schedule, &other.may_schedule),
      45              :             (MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
      46              :         );
      47              : 
      48           18 :         may_schedule_matches
      49           18 :             && self.shard_count == other.shard_count
      50           18 :             && self.attached_shard_count == other.attached_shard_count
      51           18 :     }
      52              : }
      53              : 
      54              : impl Eq for SchedulerNode {}
      55              : 
      56              : /// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
      57              : /// on which to run.
      58              : ///
      59              : /// The type has no persistent state of its own: this is all populated at startup.  The Serialize
      60              : /// impl is only for debug dumps.
      61              : #[derive(Serialize)]
      62              : pub(crate) struct Scheduler {
      63              :     nodes: HashMap<NodeId, SchedulerNode>,
      64              : }
      65              : 
      66              : /// Score for soft constraint scheduling: lower scores are preferred to higher scores.
      67              : ///
      68              : /// For example, we may set an affinity score based on the number of shards from the same
      69              : /// tenant already on a node, to implicitly prefer to balance out shards.
      70              : #[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
      71              : pub(crate) struct AffinityScore(pub(crate) usize);
      72              : 
      73              : impl AffinityScore {
      74              :     /// If we have no anti-affinity at all toward a node, this is its score.  It means
      75              :     /// the scheduler has a free choice amongst nodes with this score, and may pick a node
      76              :     /// based on other information such as total utilization.
      77              :     pub(crate) const FREE: Self = Self(0);
      78              : 
      79          480 :     pub(crate) fn inc(&mut self) {
      80          480 :         self.0 += 1;
      81          480 :     }
      82              : }
      83              : 
      84              : impl std::ops::Add for AffinityScore {
      85              :     type Output = Self;
      86              : 
      87          120 :     fn add(self, rhs: Self) -> Self::Output {
      88          120 :         Self(self.0 + rhs.0)
      89          120 :     }
      90              : }
      91              : 
      92              : /// Hint for whether this is a sincere attempt to schedule, or a speculative
      93              : /// check for where we _would_ schedule (done during optimization)
      94              : #[derive(Debug)]
      95              : pub(crate) enum ScheduleMode {
      96              :     Normal,
      97              :     Speculative,
      98              : }
      99              : 
     100              : impl Default for ScheduleMode {
     101           96 :     fn default() -> Self {
     102           96 :         Self::Normal
     103           96 :     }
     104              : }
     105              : 
     106              : // For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
     107              : // it for many shards in the same tenant.
     108              : #[derive(Debug, Default)]
     109              : pub(crate) struct ScheduleContext {
     110              :     /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
     111              :     pub(crate) nodes: HashMap<NodeId, AffinityScore>,
     112              : 
     113              :     /// Specifically how many _attached_ locations are on each node
     114              :     pub(crate) attached_nodes: HashMap<NodeId, usize>,
     115              : 
     116              :     pub(crate) mode: ScheduleMode,
     117              : }
     118              : 
     119              : impl ScheduleContext {
     120              :     /// Input is a list of nodes we would like to avoid using again within this context.  The more
     121              :     /// times a node is passed into this call, the less inclined we are to use it.
     122          246 :     pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
     123          726 :         for node_id in nodes {
     124          480 :             let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
     125          480 :             entry.inc()
     126              :         }
     127          246 :     }
     128              : 
     129          234 :     pub(crate) fn push_attached(&mut self, node_id: NodeId) {
     130          234 :         let entry = self.attached_nodes.entry(node_id).or_default();
     131          234 :         *entry += 1;
     132          234 :     }
     133              : 
     134          270 :     pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
     135          270 :         self.nodes
     136          270 :             .get(&node_id)
     137          270 :             .copied()
     138          270 :             .unwrap_or(AffinityScore::FREE)
     139          270 :     }
     140              : 
     141          270 :     pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
     142          270 :         self.attached_nodes.get(&node_id).copied().unwrap_or(0)
     143          270 :     }
     144              : }
     145              : 
     146              : pub(crate) enum RefCountUpdate {
     147              :     PromoteSecondary,
     148              :     Attach,
     149              :     Detach,
     150              :     DemoteAttached,
     151              :     AddSecondary,
     152              :     RemoveSecondary,
     153              : }
     154              : 
     155              : impl Scheduler {
     156           48 :     pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
     157           48 :         let mut scheduler_nodes = HashMap::new();
     158          174 :         for node in nodes {
     159          126 :             scheduler_nodes.insert(
     160          126 :                 node.get_id(),
     161          126 :                 SchedulerNode {
     162          126 :                     shard_count: 0,
     163          126 :                     attached_shard_count: 0,
     164          126 :                     may_schedule: node.may_schedule(),
     165          126 :                 },
     166          126 :             );
     167          126 :         }
     168              : 
     169           48 :         Self {
     170           48 :             nodes: scheduler_nodes,
     171           48 :         }
     172           48 :     }
     173              : 
     174              :     /// For debug/support: check that our internal statistics are in sync with the state of
     175              :     /// the nodes & tenant shards.
     176              :     ///
     177              :     /// If anything is inconsistent, log details and return an error.
     178            6 :     pub(crate) fn consistency_check<'a>(
     179            6 :         &self,
     180            6 :         nodes: impl Iterator<Item = &'a Node>,
     181            6 :         shards: impl Iterator<Item = &'a TenantShard>,
     182            6 :     ) -> anyhow::Result<()> {
     183            6 :         let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
     184           24 :         for node in nodes {
     185           18 :             expect_nodes.insert(
     186           18 :                 node.get_id(),
     187           18 :                 SchedulerNode {
     188           18 :                     shard_count: 0,
     189           18 :                     attached_shard_count: 0,
     190           18 :                     may_schedule: node.may_schedule(),
     191           18 :                 },
     192           18 :             );
     193           18 :         }
     194              : 
     195           12 :         for shard in shards {
     196            6 :             if let Some(node_id) = shard.intent.get_attached() {
     197            6 :                 match expect_nodes.get_mut(node_id) {
     198            6 :                     Some(node) => {
     199            6 :                         node.shard_count += 1;
     200            6 :                         node.attached_shard_count += 1;
     201            6 :                     }
     202            0 :                     None => anyhow::bail!(
     203            0 :                         "Tenant {} references nonexistent node {}",
     204            0 :                         shard.tenant_shard_id,
     205            0 :                         node_id
     206            0 :                     ),
     207              :                 }
     208            0 :             }
     209              : 
     210            6 :             for node_id in shard.intent.get_secondary() {
     211            6 :                 match expect_nodes.get_mut(node_id) {
     212            6 :                     Some(node) => node.shard_count += 1,
     213            0 :                     None => anyhow::bail!(
     214            0 :                         "Tenant {} references nonexistent node {}",
     215            0 :                         shard.tenant_shard_id,
     216            0 :                         node_id
     217            0 :                     ),
     218              :                 }
     219              :             }
     220              :         }
     221              : 
     222           24 :         for (node_id, expect_node) in &expect_nodes {
     223           18 :             let Some(self_node) = self.nodes.get(node_id) else {
     224            0 :                 anyhow::bail!("Node {node_id} not found in Self")
     225              :             };
     226              : 
     227           18 :             if self_node != expect_node {
     228            0 :                 tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
     229            0 :                 tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
     230            0 :                 tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
     231              : 
     232            0 :                 anyhow::bail!("Inconsistent state on {node_id}");
     233           18 :             }
     234              :         }
     235              : 
     236            6 :         if expect_nodes.len() != self.nodes.len() {
     237              :             // We just checked that all the expected nodes are present.  If the lengths don't match,
     238              :             // it means that we have nodes in Self that are unexpected.
     239            0 :             for node_id in self.nodes.keys() {
     240            0 :                 if !expect_nodes.contains_key(node_id) {
     241            0 :                     anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
     242            0 :                 }
     243              :             }
     244            6 :         }
     245              : 
     246            6 :         Ok(())
     247            6 :     }
     248              : 
     249              :     /// Update the reference counts of a node. These reference counts are used to guide scheduling
     250              :     /// decisions, not for memory management: they represent the number of tenant shard whose IntentState
     251              :     /// targets this node and the number of tenants shars whose IntentState is attached to this
     252              :     /// node.
     253              :     ///
     254              :     /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
     255              :     /// [`Self::new`] or [`Self::node_upsert`])
     256          546 :     pub(crate) fn update_node_ref_counts(&mut self, node_id: NodeId, update: RefCountUpdate) {
     257          546 :         let Some(node) = self.nodes.get_mut(&node_id) else {
     258            0 :             debug_assert!(false);
     259            0 :             tracing::error!("Scheduler missing node {node_id}");
     260            0 :             return;
     261              :         };
     262              : 
     263          546 :         match update {
     264           30 :             RefCountUpdate::PromoteSecondary => {
     265           30 :                 node.attached_shard_count += 1;
     266           30 :             }
     267          144 :             RefCountUpdate::Attach => {
     268          144 :                 node.shard_count += 1;
     269          144 :                 node.attached_shard_count += 1;
     270          144 :             }
     271          138 :             RefCountUpdate::Detach => {
     272          138 :                 node.shard_count -= 1;
     273          138 :                 node.attached_shard_count -= 1;
     274          138 :             }
     275           30 :             RefCountUpdate::DemoteAttached => {
     276           30 :                 node.attached_shard_count -= 1;
     277           30 :             }
     278          102 :             RefCountUpdate::AddSecondary => {
     279          102 :                 node.shard_count += 1;
     280          102 :             }
     281          102 :             RefCountUpdate::RemoveSecondary => {
     282          102 :                 node.shard_count -= 1;
     283          102 :             }
     284              :         }
     285              : 
     286              :         // Maybe update PageserverUtilization
     287          546 :         match update {
     288              :             RefCountUpdate::AddSecondary | RefCountUpdate::Attach => {
     289              :                 // Referencing the node: if this takes our shard_count above the utilzation structure's
     290              :                 // shard count, then artifically bump it: this ensures that the scheduler immediately
     291              :                 // recognizes that this node has more work on it, without waiting for the next heartbeat
     292              :                 // to update the utilization.
     293          246 :                 if let MaySchedule::Yes(utilization) = &mut node.may_schedule {
     294          246 :                     utilization.adjust_shard_count_max(node.shard_count as u32);
     295          246 :                 }
     296              :             }
     297              :             RefCountUpdate::PromoteSecondary
     298              :             | RefCountUpdate::Detach
     299              :             | RefCountUpdate::RemoveSecondary
     300          300 :             | RefCountUpdate::DemoteAttached => {
     301          300 :                 // De-referencing the node: leave the utilization's shard_count at a stale higher
     302          300 :                 // value until some future heartbeat after we have physically removed this shard
     303          300 :                 // from the node: this prevents the scheduler over-optimistically trying to schedule
     304          300 :                 // more work onto the node before earlier detaches are done.
     305          300 :             }
     306              :         }
     307          546 :     }
     308              : 
     309              :     // Check if the number of shards attached to a given node is lagging below
     310              :     // the cluster average. If that's the case, the node should be filled.
     311            0 :     pub(crate) fn compute_fill_requirement(&self, node_id: NodeId) -> usize {
     312            0 :         let Some(node) = self.nodes.get(&node_id) else {
     313            0 :             debug_assert!(false);
     314            0 :             tracing::error!("Scheduler missing node {node_id}");
     315            0 :             return 0;
     316              :         };
     317            0 :         assert!(!self.nodes.is_empty());
     318            0 :         let expected_attached_shards_per_node = self.expected_attached_shard_count();
     319              : 
     320            0 :         for (node_id, node) in self.nodes.iter() {
     321            0 :             tracing::trace!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
     322              :         }
     323              : 
     324            0 :         if node.attached_shard_count < expected_attached_shards_per_node {
     325            0 :             expected_attached_shards_per_node - node.attached_shard_count
     326              :         } else {
     327            0 :             0
     328              :         }
     329            0 :     }
     330              : 
     331            0 :     pub(crate) fn expected_attached_shard_count(&self) -> usize {
     332            0 :         let total_attached_shards: usize =
     333            0 :             self.nodes.values().map(|n| n.attached_shard_count).sum();
     334            0 : 
     335            0 :         assert!(!self.nodes.is_empty());
     336            0 :         total_attached_shards / self.nodes.len()
     337            0 :     }
     338              : 
     339            0 :     pub(crate) fn nodes_by_attached_shard_count(&self) -> Vec<(NodeId, usize)> {
     340            0 :         self.nodes
     341            0 :             .iter()
     342            0 :             .map(|(node_id, stats)| (*node_id, stats.attached_shard_count))
     343            0 :             .sorted_by(|lhs, rhs| Ord::cmp(&lhs.1, &rhs.1).reverse())
     344            0 :             .collect()
     345            0 :     }
     346              : 
     347           42 :     pub(crate) fn node_upsert(&mut self, node: &Node) {
     348           42 :         use std::collections::hash_map::Entry::*;
     349           42 :         match self.nodes.entry(node.get_id()) {
     350           18 :             Occupied(mut entry) => {
     351           18 :                 // Updates to MaySchedule are how we receive updated PageserverUtilization: adjust these values
     352           18 :                 // to account for any shards scheduled on the controller but not yet visible to the pageserver.
     353           18 :                 let mut may_schedule = node.may_schedule();
     354           18 :                 match &mut may_schedule {
     355           12 :                     MaySchedule::Yes(utilization) => {
     356           12 :                         utilization.adjust_shard_count_max(entry.get().shard_count as u32);
     357           12 :                     }
     358            6 :                     MaySchedule::No => { // Nothing to tweak
     359            6 :                     }
     360              :                 }
     361              : 
     362           18 :                 entry.get_mut().may_schedule = may_schedule;
     363              :             }
     364           24 :             Vacant(entry) => {
     365           24 :                 entry.insert(SchedulerNode {
     366           24 :                     shard_count: 0,
     367           24 :                     attached_shard_count: 0,
     368           24 :                     may_schedule: node.may_schedule(),
     369           24 :                 });
     370           24 :             }
     371              :         }
     372           42 :     }
     373              : 
     374            0 :     pub(crate) fn node_remove(&mut self, node_id: NodeId) {
     375            0 :         if self.nodes.remove(&node_id).is_none() {
     376            0 :             tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
     377            0 :         }
     378            0 :     }
     379              : 
     380              :     /// Where we have several nodes to choose from, for example when picking a secondary location
     381              :     /// to promote to an attached location, this method may be used to pick the best choice based
     382              :     /// on the scheduler's knowledge of utilization and availability.
     383              :     ///
     384              :     /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
     385              :     /// caller can pick a node some other way.
     386           84 :     pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option<NodeId> {
     387           84 :         if nodes.is_empty() {
     388           72 :             return None;
     389           12 :         }
     390           12 : 
     391           12 :         // TODO: When the utilization score returned by the pageserver becomes meaningful,
     392           12 :         // schedule based on that instead of the shard count.
     393           12 :         let node = nodes
     394           12 :             .iter()
     395           24 :             .map(|node_id| {
     396           24 :                 let may_schedule = self
     397           24 :                     .nodes
     398           24 :                     .get(node_id)
     399           24 :                     .map(|n| !matches!(n.may_schedule, MaySchedule::No))
     400           24 :                     .unwrap_or(false);
     401           24 :                 (*node_id, may_schedule)
     402           24 :             })
     403           24 :             .max_by_key(|(_n, may_schedule)| *may_schedule);
     404           12 : 
     405           12 :         // If even the preferred node has may_schedule==false, return None
     406           12 :         node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
     407           84 :     }
     408              : 
     409              :     /// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
     410              :     /// are already in use by this shard -- we use this to avoid picking the same node
     411              :     /// as both attached and secondary location.  This is a hard constraint: if we cannot
     412              :     /// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
     413              :     ///
     414              :     /// context: we prefer to avoid using nodes identified in the context, according
     415              :     /// to their anti-affinity score.  We use this to prefeer to avoid placing shards in
     416              :     /// the same tenant on the same node.  This is a soft constraint: the context will never
     417              :     /// cause us to fail to schedule a shard.
     418          228 :     pub(crate) fn schedule_shard(
     419          228 :         &mut self,
     420          228 :         hard_exclude: &[NodeId],
     421          228 :         context: &ScheduleContext,
     422          228 :     ) -> Result<NodeId, ScheduleError> {
     423          228 :         if self.nodes.is_empty() {
     424            0 :             return Err(ScheduleError::NoPageservers);
     425          228 :         }
     426          228 : 
     427          228 :         let mut scores: Vec<(NodeId, AffinityScore, u64, usize)> = self
     428          228 :             .nodes
     429          228 :             .iter_mut()
     430          690 :             .filter_map(|(k, v)| match &mut v.may_schedule {
     431            0 :                 MaySchedule::No => None,
     432          690 :                 MaySchedule::Yes(_) if hard_exclude.contains(k) => None,
     433          504 :                 MaySchedule::Yes(utilization) => Some((
     434          504 :                     *k,
     435          504 :                     context.nodes.get(k).copied().unwrap_or(AffinityScore::FREE),
     436          504 :                     utilization.cached_score(),
     437          504 :                     v.attached_shard_count,
     438          504 :                 )),
     439          690 :             })
     440          228 :             .collect();
     441          228 : 
     442          228 :         // Exclude nodes whose utilization is critically high, if there are alternatives available.  This will
     443          228 :         // cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example
     444          228 :         // we may place shards in the same tenant together on the same pageserver if all other pageservers are
     445          228 :         // overloaded.
     446          228 :         let non_overloaded_scores = scores
     447          228 :             .iter()
     448          504 :             .filter(|i| !PageserverUtilization::is_overloaded(i.2))
     449          228 :             .copied()
     450          228 :             .collect::<Vec<_>>();
     451          228 :         if !non_overloaded_scores.is_empty() {
     452          228 :             scores = non_overloaded_scores;
     453          228 :         }
     454              : 
     455              :         // Sort by, in order of precedence:
     456              :         //  1st: Affinity score.  We should never pick a higher-score node if a lower-score node is available
     457              :         //  2nd: Utilization score (this combines shard count and disk utilization)
     458              :         //  3rd: Attached shard count.  When nodes have identical utilization (e.g. when populating some
     459              :         //       empty nodes), this acts as an anti-affinity between attached shards.
     460              :         //  4th: Node ID.  This is a convenience to make selection deterministic in tests and empty systems.
     461          612 :         scores.sort_by_key(|i| (i.1, i.2, i.3, i.0));
     462          228 : 
     463          228 :         if scores.is_empty() {
     464              :             // After applying constraints, no pageservers were left.
     465            0 :             if !matches!(context.mode, ScheduleMode::Speculative) {
     466              :                 // If this was not a speculative attempt, log details to understand why we couldn't
     467              :                 // schedule: this may help an engineer understand if some nodes are marked offline
     468              :                 // in a way that's preventing progress.
     469            0 :                 tracing::info!(
     470            0 :                     "Scheduling failure, while excluding {hard_exclude:?}, node states:"
     471              :                 );
     472            0 :                 for (node_id, node) in &self.nodes {
     473            0 :                     tracing::info!(
     474            0 :                         "Node {node_id}: may_schedule={} shards={}",
     475            0 :                         !matches!(node.may_schedule, MaySchedule::No),
     476              :                         node.shard_count
     477              :                     );
     478              :                 }
     479            0 :             }
     480            0 :             return Err(ScheduleError::ImpossibleConstraint);
     481          228 :         }
     482          228 : 
     483          228 :         // Lowest score wins
     484          228 :         let node_id = scores.first().unwrap().0;
     485              : 
     486          228 :         if !matches!(context.mode, ScheduleMode::Speculative) {
     487          228 :             tracing::info!(
     488            0 :             "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
     489            0 :             scores.iter().map(|i| i.0 .0).collect::<Vec<_>>()
     490              :         );
     491            0 :         }
     492              : 
     493              :         // Note that we do not update shard count here to reflect the scheduling: that
     494              :         // is IntentState's job when the scheduled location is used.
     495              : 
     496          228 :         Ok(node_id)
     497          228 :     }
     498              : 
     499              :     /// Unit test access to internal state
     500              :     #[cfg(test)]
     501           72 :     pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
     502           72 :         self.nodes.get(&node_id).unwrap().shard_count
     503           72 :     }
     504              : 
     505              :     #[cfg(test)]
     506           72 :     pub(crate) fn get_node_attached_shard_count(&self, node_id: NodeId) -> usize {
     507           72 :         self.nodes.get(&node_id).unwrap().attached_shard_count
     508           72 :     }
     509              : }
     510              : 
     511              : #[cfg(test)]
     512              : pub(crate) mod test_utils {
     513              : 
     514              :     use crate::node::Node;
     515              :     use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
     516              :     use std::collections::HashMap;
     517              :     use utils::id::NodeId;
     518              :     /// Test helper: synthesize the requested number of nodes, all in active state.
     519              :     ///
     520              :     /// Node IDs start at one.
     521           48 :     pub(crate) fn make_test_nodes(n: u64) -> HashMap<NodeId, Node> {
     522           48 :         (1..n + 1)
     523          150 :             .map(|i| {
     524          150 :                 (NodeId(i), {
     525          150 :                     let mut node = Node::new(
     526          150 :                         NodeId(i),
     527          150 :                         format!("httphost-{i}"),
     528          150 :                         80 + i as u16,
     529          150 :                         format!("pghost-{i}"),
     530          150 :                         5432 + i as u16,
     531          150 :                         None,
     532          150 :                     );
     533          150 :                     node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
     534          150 :                     assert!(node.is_available());
     535          150 :                     node
     536          150 :                 })
     537          150 :             })
     538           48 :             .collect()
     539           48 :     }
     540              : }
     541              : 
     542              : #[cfg(test)]
     543              : mod tests {
     544              :     use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
     545              : 
     546              :     use super::*;
     547              : 
     548              :     use crate::tenant_shard::IntentState;
     549              :     #[test]
     550            6 :     fn scheduler_basic() -> anyhow::Result<()> {
     551            6 :         let nodes = test_utils::make_test_nodes(2);
     552            6 : 
     553            6 :         let mut scheduler = Scheduler::new(nodes.values());
     554            6 :         let mut t1_intent = IntentState::new();
     555            6 :         let mut t2_intent = IntentState::new();
     556            6 : 
     557            6 :         let context = ScheduleContext::default();
     558              : 
     559            6 :         let scheduled = scheduler.schedule_shard(&[], &context)?;
     560            6 :         t1_intent.set_attached(&mut scheduler, Some(scheduled));
     561            6 :         let scheduled = scheduler.schedule_shard(&[], &context)?;
     562            6 :         t2_intent.set_attached(&mut scheduler, Some(scheduled));
     563            6 : 
     564            6 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
     565            6 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
     566              : 
     567            6 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
     568            6 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
     569              : 
     570            6 :         let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers(), &context)?;
     571            6 :         t1_intent.push_secondary(&mut scheduler, scheduled);
     572            6 : 
     573            6 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
     574            6 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
     575              : 
     576            6 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
     577            6 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
     578              : 
     579            6 :         t1_intent.clear(&mut scheduler);
     580            6 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
     581            6 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
     582              : 
     583            6 :         let total_attached = scheduler.get_node_attached_shard_count(NodeId(1))
     584            6 :             + scheduler.get_node_attached_shard_count(NodeId(2));
     585            6 :         assert_eq!(total_attached, 1);
     586              : 
     587            6 :         if cfg!(debug_assertions) {
     588              :             // Dropping an IntentState without clearing it causes a panic in debug mode,
     589              :             // because we have failed to properly update scheduler shard counts.
     590            6 :             let result = std::panic::catch_unwind(move || {
     591            6 :                 drop(t2_intent);
     592            6 :             });
     593            6 :             assert!(result.is_err());
     594              :         } else {
     595            0 :             t2_intent.clear(&mut scheduler);
     596            0 : 
     597            0 :             assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
     598            0 :             assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 0);
     599              : 
     600            0 :             assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 0);
     601            0 :             assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0);
     602              :         }
     603              : 
     604            6 :         Ok(())
     605            6 :     }
     606              : 
     607              :     #[test]
     608              :     /// Test the PageserverUtilization's contribution to scheduling algorithm
     609            6 :     fn scheduler_utilization() {
     610            6 :         let mut nodes = test_utils::make_test_nodes(3);
     611            6 :         let mut scheduler = Scheduler::new(nodes.values());
     612            6 : 
     613            6 :         // Need to keep these alive because they contribute to shard counts via RAII
     614            6 :         let mut scheduled_intents = Vec::new();
     615            6 : 
     616            6 :         let empty_context = ScheduleContext::default();
     617            6 : 
     618           66 :         fn assert_scheduler_chooses(
     619           66 :             expect_node: NodeId,
     620           66 :             scheduled_intents: &mut Vec<IntentState>,
     621           66 :             scheduler: &mut Scheduler,
     622           66 :             context: &ScheduleContext,
     623           66 :         ) {
     624           66 :             let scheduled = scheduler.schedule_shard(&[], context).unwrap();
     625           66 :             let mut intent = IntentState::new();
     626           66 :             intent.set_attached(scheduler, Some(scheduled));
     627           66 :             scheduled_intents.push(intent);
     628           66 :             assert_eq!(scheduled, expect_node);
     629           66 :         }
     630            6 : 
     631            6 :         // Independent schedule calls onto empty nodes should round-robin, because each node's
     632            6 :         // utilization's shard count is updated inline.  The order is determinsitic because when all other factors are
     633            6 :         // equal, we order by node ID.
     634            6 :         assert_scheduler_chooses(
     635            6 :             NodeId(1),
     636            6 :             &mut scheduled_intents,
     637            6 :             &mut scheduler,
     638            6 :             &empty_context,
     639            6 :         );
     640            6 :         assert_scheduler_chooses(
     641            6 :             NodeId(2),
     642            6 :             &mut scheduled_intents,
     643            6 :             &mut scheduler,
     644            6 :             &empty_context,
     645            6 :         );
     646            6 :         assert_scheduler_chooses(
     647            6 :             NodeId(3),
     648            6 :             &mut scheduled_intents,
     649            6 :             &mut scheduler,
     650            6 :             &empty_context,
     651            6 :         );
     652            6 : 
     653            6 :         // Manually setting utilization higher should cause schedule calls to round-robin the other nodes
     654            6 :         // which have equal utilization.
     655            6 :         nodes
     656            6 :             .get_mut(&NodeId(1))
     657            6 :             .unwrap()
     658            6 :             .set_availability(NodeAvailability::Active(test_utilization::simple(
     659            6 :                 10,
     660            6 :                 1024 * 1024 * 1024,
     661            6 :             )));
     662            6 :         scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
     663            6 : 
     664            6 :         assert_scheduler_chooses(
     665            6 :             NodeId(2),
     666            6 :             &mut scheduled_intents,
     667            6 :             &mut scheduler,
     668            6 :             &empty_context,
     669            6 :         );
     670            6 :         assert_scheduler_chooses(
     671            6 :             NodeId(3),
     672            6 :             &mut scheduled_intents,
     673            6 :             &mut scheduler,
     674            6 :             &empty_context,
     675            6 :         );
     676            6 :         assert_scheduler_chooses(
     677            6 :             NodeId(2),
     678            6 :             &mut scheduled_intents,
     679            6 :             &mut scheduler,
     680            6 :             &empty_context,
     681            6 :         );
     682            6 :         assert_scheduler_chooses(
     683            6 :             NodeId(3),
     684            6 :             &mut scheduled_intents,
     685            6 :             &mut scheduler,
     686            6 :             &empty_context,
     687            6 :         );
     688            6 : 
     689            6 :         // The scheduler should prefer nodes with lower affinity score,
     690            6 :         // even if they have higher utilization (as long as they aren't utilized at >100%)
     691            6 :         let mut context_prefer_node1 = ScheduleContext::default();
     692            6 :         context_prefer_node1.avoid(&[NodeId(2), NodeId(3)]);
     693            6 :         assert_scheduler_chooses(
     694            6 :             NodeId(1),
     695            6 :             &mut scheduled_intents,
     696            6 :             &mut scheduler,
     697            6 :             &context_prefer_node1,
     698            6 :         );
     699            6 :         assert_scheduler_chooses(
     700            6 :             NodeId(1),
     701            6 :             &mut scheduled_intents,
     702            6 :             &mut scheduler,
     703            6 :             &context_prefer_node1,
     704            6 :         );
     705            6 : 
     706            6 :         // If a node is over-utilized, it will not be used even if affinity scores prefer it
     707            6 :         nodes
     708            6 :             .get_mut(&NodeId(1))
     709            6 :             .unwrap()
     710            6 :             .set_availability(NodeAvailability::Active(test_utilization::simple(
     711            6 :                 20000,
     712            6 :                 1024 * 1024 * 1024,
     713            6 :             )));
     714            6 :         scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
     715            6 :         assert_scheduler_chooses(
     716            6 :             NodeId(2),
     717            6 :             &mut scheduled_intents,
     718            6 :             &mut scheduler,
     719            6 :             &context_prefer_node1,
     720            6 :         );
     721            6 :         assert_scheduler_chooses(
     722            6 :             NodeId(3),
     723            6 :             &mut scheduled_intents,
     724            6 :             &mut scheduler,
     725            6 :             &context_prefer_node1,
     726            6 :         );
     727              : 
     728           72 :         for mut intent in scheduled_intents {
     729           66 :             intent.clear(&mut scheduler);
     730           66 :         }
     731            6 :     }
     732              : }
        

Generated by: LCOV version 2.1-beta