LCOV - code coverage report
Current view: top level - storage_controller/src - scheduler.rs (source / functions) Coverage Total Hit
Test: 36bb8dd7c7efcb53483d1a7d9f7cb33e8406dcf0.info Lines: 80.3 % 254 204
Test Date: 2024-04-08 10:22:05 Functions: 57.8 % 45 26

            Line data    Source code
       1              : use crate::{node::Node, tenant_shard::TenantShard};
       2              : use pageserver_api::controller_api::UtilizationScore;
       3              : use serde::Serialize;
       4              : use std::collections::HashMap;
       5              : use utils::{http::error::ApiError, id::NodeId};
       6              : 
       7              : /// Scenarios in which we cannot find a suitable location for a tenant shard
       8            0 : #[derive(thiserror::Error, Debug)]
       9              : pub enum ScheduleError {
      10              :     #[error("No pageservers found")]
      11              :     NoPageservers,
      12              :     #[error("No pageserver found matching constraint")]
      13              :     ImpossibleConstraint,
      14              : }
      15              : 
      16              : impl From<ScheduleError> for ApiError {
      17            0 :     fn from(value: ScheduleError) -> Self {
      18            0 :         ApiError::Conflict(format!("Scheduling error: {}", value))
      19            0 :     }
      20              : }
      21              : 
      22              : #[derive(Serialize, Eq, PartialEq)]
      23              : pub enum MaySchedule {
      24              :     Yes(UtilizationScore),
      25              :     No,
      26              : }
      27              : 
      28              : #[derive(Serialize)]
      29              : struct SchedulerNode {
      30              :     /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
      31              :     shard_count: usize,
      32              : 
      33              :     /// Whether this node is currently elegible to have new shards scheduled (this is derived
      34              :     /// from a node's availability state and scheduling policy).
      35              :     may_schedule: MaySchedule,
      36              : }
      37              : 
      38              : impl PartialEq for SchedulerNode {
      39            6 :     fn eq(&self, other: &Self) -> bool {
      40            6 :         let may_schedule_matches = matches!(
      41            6 :             (&self.may_schedule, &other.may_schedule),
      42              :             (MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
      43              :         );
      44              : 
      45            6 :         may_schedule_matches && self.shard_count == other.shard_count
      46            6 :     }
      47              : }
      48              : 
      49              : impl Eq for SchedulerNode {}
      50              : 
      51              : /// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
      52              : /// on which to run.
      53              : ///
      54              : /// The type has no persistent state of its own: this is all populated at startup.  The Serialize
      55              : /// impl is only for debug dumps.
      56              : #[derive(Serialize)]
      57              : pub(crate) struct Scheduler {
      58              :     nodes: HashMap<NodeId, SchedulerNode>,
      59              : }
      60              : 
      61              : /// Score for soft constraint scheduling: lower scores are preferred to higher scores.
      62              : ///
      63              : /// For example, we may set an affinity score based on the number of shards from the same
      64              : /// tenant already on a node, to implicitly prefer to balance out shards.
      65              : #[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
      66              : pub(crate) struct AffinityScore(pub(crate) usize);
      67              : 
      68              : impl AffinityScore {
      69              :     /// If we have no anti-affinity at all toward a node, this is its score.  It means
      70              :     /// the scheduler has a free choice amongst nodes with this score, and may pick a node
      71              :     /// based on other information such as total utilization.
      72              :     pub(crate) const FREE: Self = Self(0);
      73              : 
      74          188 :     pub(crate) fn inc(&mut self) {
      75          188 :         self.0 += 1;
      76          188 :     }
      77              : }
      78              : 
      79              : impl std::ops::Add for AffinityScore {
      80              :     type Output = Self;
      81              : 
      82           54 :     fn add(self, rhs: Self) -> Self::Output {
      83           54 :         Self(self.0 + rhs.0)
      84           54 :     }
      85              : }
      86              : 
      87              : // For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
      88              : // it for many shards in the same tenant.
      89              : #[derive(Debug, Default)]
      90              : pub(crate) struct ScheduleContext {
      91              :     /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
      92              :     pub(crate) nodes: HashMap<NodeId, AffinityScore>,
      93              : 
      94              :     /// Specifically how many _attached_ locations are on each node
      95              :     pub(crate) attached_nodes: HashMap<NodeId, usize>,
      96              : }
      97              : 
      98              : impl ScheduleContext {
      99              :     /// Input is a list of nodes we would like to avoid using again within this context.  The more
     100              :     /// times a node is passed into this call, the less inclined we are to use it.
     101           96 :     pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
     102          284 :         for node_id in nodes {
     103          188 :             let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
     104          188 :             entry.inc()
     105              :         }
     106           96 :     }
     107              : 
     108           94 :     pub(crate) fn push_attached(&mut self, node_id: NodeId) {
     109           94 :         let entry = self.attached_nodes.entry(node_id).or_default();
     110           94 :         *entry += 1;
     111           94 :     }
     112              : 
     113          120 :     pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
     114          120 :         self.nodes
     115          120 :             .get(&node_id)
     116          120 :             .copied()
     117          120 :             .unwrap_or(AffinityScore::FREE)
     118          120 :     }
     119              : 
     120          120 :     pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
     121          120 :         self.attached_nodes.get(&node_id).copied().unwrap_or(0)
     122          120 :     }
     123              : }
     124              : 
     125              : impl Scheduler {
     126           14 :     pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
     127           14 :         let mut scheduler_nodes = HashMap::new();
     128           50 :         for node in nodes {
     129           36 :             scheduler_nodes.insert(
     130           36 :                 node.get_id(),
     131           36 :                 SchedulerNode {
     132           36 :                     shard_count: 0,
     133           36 :                     may_schedule: node.may_schedule(),
     134           36 :                 },
     135           36 :             );
     136           36 :         }
     137              : 
     138           14 :         Self {
     139           14 :             nodes: scheduler_nodes,
     140           14 :         }
     141           14 :     }
     142              : 
     143              :     /// For debug/support: check that our internal statistics are in sync with the state of
     144              :     /// the nodes & tenant shards.
     145              :     ///
     146              :     /// If anything is inconsistent, log details and return an error.
     147            2 :     pub(crate) fn consistency_check<'a>(
     148            2 :         &self,
     149            2 :         nodes: impl Iterator<Item = &'a Node>,
     150            2 :         shards: impl Iterator<Item = &'a TenantShard>,
     151            2 :     ) -> anyhow::Result<()> {
     152            2 :         let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
     153            8 :         for node in nodes {
     154            6 :             expect_nodes.insert(
     155            6 :                 node.get_id(),
     156            6 :                 SchedulerNode {
     157            6 :                     shard_count: 0,
     158            6 :                     may_schedule: node.may_schedule(),
     159            6 :                 },
     160            6 :             );
     161            6 :         }
     162              : 
     163            4 :         for shard in shards {
     164            2 :             if let Some(node_id) = shard.intent.get_attached() {
     165            2 :                 match expect_nodes.get_mut(node_id) {
     166            2 :                     Some(node) => node.shard_count += 1,
     167            0 :                     None => anyhow::bail!(
     168            0 :                         "Tenant {} references nonexistent node {}",
     169            0 :                         shard.tenant_shard_id,
     170            0 :                         node_id
     171            0 :                     ),
     172              :                 }
     173            0 :             }
     174              : 
     175            2 :             for node_id in shard.intent.get_secondary() {
     176            2 :                 match expect_nodes.get_mut(node_id) {
     177            2 :                     Some(node) => node.shard_count += 1,
     178            0 :                     None => anyhow::bail!(
     179            0 :                         "Tenant {} references nonexistent node {}",
     180            0 :                         shard.tenant_shard_id,
     181            0 :                         node_id
     182            0 :                     ),
     183              :                 }
     184              :             }
     185              :         }
     186              : 
     187            8 :         for (node_id, expect_node) in &expect_nodes {
     188            6 :             let Some(self_node) = self.nodes.get(node_id) else {
     189            0 :                 anyhow::bail!("Node {node_id} not found in Self")
     190              :             };
     191              : 
     192            6 :             if self_node != expect_node {
     193            0 :                 tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
     194            0 :                 tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
     195            0 :                 tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
     196              : 
     197            0 :                 anyhow::bail!("Inconsistent state on {node_id}");
     198            6 :             }
     199              :         }
     200              : 
     201            2 :         if expect_nodes.len() != self.nodes.len() {
     202              :             // We just checked that all the expected nodes are present.  If the lengths don't match,
     203              :             // it means that we have nodes in Self that are unexpected.
     204            0 :             for node_id in self.nodes.keys() {
     205            0 :                 if !expect_nodes.contains_key(node_id) {
     206            0 :                     anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
     207            0 :                 }
     208              :             }
     209            2 :         }
     210              : 
     211            2 :         Ok(())
     212            2 :     }
     213              : 
     214              :     /// Increment the reference count of a node.  This reference count is used to guide scheduling
     215              :     /// decisions, not for memory management: it represents one tenant shard whose IntentState targets
     216              :     /// this node.
     217              :     ///
     218              :     /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
     219              :     /// [`Self::new`] or [`Self::node_upsert`])
     220           60 :     pub(crate) fn node_inc_ref(&mut self, node_id: NodeId) {
     221           60 :         let Some(node) = self.nodes.get_mut(&node_id) else {
     222            0 :             tracing::error!("Scheduler missing node {node_id}");
     223            0 :             debug_assert!(false);
     224            0 :             return;
     225              :         };
     226              : 
     227           60 :         node.shard_count += 1;
     228           60 :     }
     229              : 
     230              :     /// Decrement a node's reference count.  Inverse of [`Self::node_inc_ref`].
     231           58 :     pub(crate) fn node_dec_ref(&mut self, node_id: NodeId) {
     232           58 :         let Some(node) = self.nodes.get_mut(&node_id) else {
     233            0 :             debug_assert!(false);
     234            0 :             tracing::error!("Scheduler missing node {node_id}");
     235            0 :             return;
     236              :         };
     237              : 
     238           58 :         node.shard_count -= 1;
     239           58 :     }
     240              : 
     241           10 :     pub(crate) fn node_upsert(&mut self, node: &Node) {
     242           10 :         use std::collections::hash_map::Entry::*;
     243           10 :         match self.nodes.entry(node.get_id()) {
     244            2 :             Occupied(mut entry) => {
     245            2 :                 entry.get_mut().may_schedule = node.may_schedule();
     246            2 :             }
     247            8 :             Vacant(entry) => {
     248            8 :                 entry.insert(SchedulerNode {
     249            8 :                     shard_count: 0,
     250            8 :                     may_schedule: node.may_schedule(),
     251            8 :                 });
     252            8 :             }
     253              :         }
     254           10 :     }
     255              : 
     256            0 :     pub(crate) fn node_remove(&mut self, node_id: NodeId) {
     257            0 :         if self.nodes.remove(&node_id).is_none() {
     258            0 :             tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
     259            0 :         }
     260            0 :     }
     261              : 
     262              :     /// Where we have several nodes to choose from, for example when picking a secondary location
     263              :     /// to promote to an attached location, this method may be used to pick the best choice based
     264              :     /// on the scheduler's knowledge of utilization and availability.
     265              :     ///
     266              :     /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
     267              :     /// caller can pick a node some other way.
     268           28 :     pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option<NodeId> {
     269           28 :         if nodes.is_empty() {
     270           24 :             return None;
     271            4 :         }
     272            4 : 
     273            4 :         // TODO: When the utilization score returned by the pageserver becomes meaningful,
     274            4 :         // schedule based on that instead of the shard count.
     275            4 :         let node = nodes
     276            4 :             .iter()
     277            8 :             .map(|node_id| {
     278            8 :                 let may_schedule = self
     279            8 :                     .nodes
     280            8 :                     .get(node_id)
     281            8 :                     .map(|n| n.may_schedule != MaySchedule::No)
     282            8 :                     .unwrap_or(false);
     283            8 :                 (*node_id, may_schedule)
     284            8 :             })
     285            8 :             .max_by_key(|(_n, may_schedule)| *may_schedule);
     286            4 : 
     287            4 :         // If even the preferred node has may_schedule==false, return None
     288            4 :         node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
     289           28 :     }
     290              : 
     291              :     /// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
     292              :     /// are already in use by this shard -- we use this to avoid picking the same node
     293              :     /// as both attached and secondary location.  This is a hard constraint: if we cannot
     294              :     /// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
     295              :     ///
     296              :     /// context: we prefer to avoid using nodes identified in the context, according
     297              :     /// to their anti-affinity score.  We use this to prefeer to avoid placing shards in
     298              :     /// the same tenant on the same node.  This is a soft constraint: the context will never
     299              :     /// cause us to fail to schedule a shard.
     300           60 :     pub(crate) fn schedule_shard(
     301           60 :         &self,
     302           60 :         hard_exclude: &[NodeId],
     303           60 :         context: &ScheduleContext,
     304           60 :     ) -> Result<NodeId, ScheduleError> {
     305           60 :         if self.nodes.is_empty() {
     306            0 :             return Err(ScheduleError::NoPageservers);
     307           60 :         }
     308           60 : 
     309           60 :         let mut scores: Vec<(NodeId, AffinityScore, usize)> = self
     310           60 :             .nodes
     311           60 :             .iter()
     312          188 :             .filter_map(|(k, v)| {
     313          188 :                 if hard_exclude.contains(k) || v.may_schedule == MaySchedule::No {
     314           74 :                     None
     315              :                 } else {
     316          114 :                     Some((
     317          114 :                         *k,
     318          114 :                         context.nodes.get(k).copied().unwrap_or(AffinityScore::FREE),
     319          114 :                         v.shard_count,
     320          114 :                     ))
     321              :                 }
     322          188 :             })
     323           60 :             .collect();
     324           60 : 
     325           60 :         // Sort by, in order of precedence:
     326           60 :         //  1st: Affinity score.  We should never pick a higher-score node if a lower-score node is available
     327           60 :         //  2nd: Utilization.  Within nodes with the same affinity, use the least loaded nodes.
     328           60 :         //  3rd: Node ID.  This is a convenience to make selection deterministic in tests and empty systems.
     329          114 :         scores.sort_by_key(|i| (i.1, i.2, i.0));
     330           60 : 
     331           60 :         if scores.is_empty() {
     332              :             // After applying constraints, no pageservers were left.  We log some detail about
     333              :             // the state of nodes to help understand why this happened.  This is not logged as an error because
     334              :             // it is legitimately possible for enough nodes to be Offline to prevent scheduling a shard.
     335            0 :             tracing::info!("Scheduling failure, while excluding {hard_exclude:?}, node states:");
     336            0 :             for (node_id, node) in &self.nodes {
     337            0 :                 tracing::info!(
     338            0 :                     "Node {node_id}: may_schedule={} shards={}",
     339            0 :                     node.may_schedule != MaySchedule::No,
     340            0 :                     node.shard_count
     341            0 :                 );
     342              :             }
     343              : 
     344            0 :             return Err(ScheduleError::ImpossibleConstraint);
     345           60 :         }
     346           60 : 
     347           60 :         // Lowest score wins
     348           60 :         let node_id = scores.first().unwrap().0;
     349           60 :         tracing::info!(
     350            0 :             "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
     351            0 :             scores.iter().map(|i| i.0 .0).collect::<Vec<_>>()
     352            0 :         );
     353              : 
     354              :         // Note that we do not update shard count here to reflect the scheduling: that
     355              :         // is IntentState's job when the scheduled location is used.
     356              : 
     357           60 :         Ok(node_id)
     358           60 :     }
     359              : 
     360              :     /// Unit test access to internal state
     361              :     #[cfg(test)]
     362           12 :     pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
     363           12 :         self.nodes.get(&node_id).unwrap().shard_count
     364           12 :     }
     365              : }
     366              : 
     367              : #[cfg(test)]
     368              : pub(crate) mod test_utils {
     369              : 
     370              :     use crate::node::Node;
     371              :     use pageserver_api::controller_api::{NodeAvailability, UtilizationScore};
     372              :     use std::collections::HashMap;
     373              :     use utils::id::NodeId;
     374              :     /// Test helper: synthesize the requested number of nodes, all in active state.
     375              :     ///
     376              :     /// Node IDs start at one.
     377           14 :     pub(crate) fn make_test_nodes(n: u64) -> HashMap<NodeId, Node> {
     378           14 :         (1..n + 1)
     379           44 :             .map(|i| {
     380           44 :                 (NodeId(i), {
     381           44 :                     let mut node = Node::new(
     382           44 :                         NodeId(i),
     383           44 :                         format!("httphost-{i}"),
     384           44 :                         80 + i as u16,
     385           44 :                         format!("pghost-{i}"),
     386           44 :                         5432 + i as u16,
     387           44 :                     );
     388           44 :                     node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
     389           44 :                     assert!(node.is_available());
     390           44 :                     node
     391           44 :                 })
     392           44 :             })
     393           14 :             .collect()
     394           14 :     }
     395              : }
     396              : 
     397              : #[cfg(test)]
     398              : mod tests {
     399              :     use super::*;
     400              : 
     401              :     use crate::tenant_shard::IntentState;
     402              :     #[test]
     403            2 :     fn scheduler_basic() -> anyhow::Result<()> {
     404            2 :         let nodes = test_utils::make_test_nodes(2);
     405            2 : 
     406            2 :         let mut scheduler = Scheduler::new(nodes.values());
     407            2 :         let mut t1_intent = IntentState::new();
     408            2 :         let mut t2_intent = IntentState::new();
     409            2 : 
     410            2 :         let context = ScheduleContext::default();
     411              : 
     412            2 :         let scheduled = scheduler.schedule_shard(&[], &context)?;
     413            2 :         t1_intent.set_attached(&mut scheduler, Some(scheduled));
     414            2 :         let scheduled = scheduler.schedule_shard(&[], &context)?;
     415            2 :         t2_intent.set_attached(&mut scheduler, Some(scheduled));
     416            2 : 
     417            2 :         assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
     418            2 :         assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
     419              : 
     420            2 :         let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers(), &context)?;
     421            2 :         t1_intent.push_secondary(&mut scheduler, scheduled);
     422            2 : 
     423            2 :         assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
     424            2 :         assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 2);
     425              : 
     426            2 :         t1_intent.clear(&mut scheduler);
     427            2 :         assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
     428            2 :         assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
     429              : 
     430            2 :         if cfg!(debug_assertions) {
     431              :             // Dropping an IntentState without clearing it causes a panic in debug mode,
     432              :             // because we have failed to properly update scheduler shard counts.
     433            2 :             let result = std::panic::catch_unwind(move || {
     434            2 :                 drop(t2_intent);
     435            2 :             });
     436            2 :             assert!(result.is_err());
     437              :         } else {
     438            0 :             t2_intent.clear(&mut scheduler);
     439            0 :             assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
     440            0 :             assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 0);
     441              :         }
     442              : 
     443            2 :         Ok(())
     444            2 :     }
     445              : }
        

Generated by: LCOV version 2.1-beta