LCOV - code coverage report
Current view: top level - storage_controller/src - scheduler.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 80.3 % 259 208
Test Date: 2024-05-10 13:18:37 Functions: 79.4 % 34 27

            Line data    Source code
       1              : use crate::{node::Node, tenant_shard::TenantShard};
       2              : use pageserver_api::controller_api::UtilizationScore;
       3              : use serde::Serialize;
       4              : use std::collections::HashMap;
       5              : use utils::{http::error::ApiError, id::NodeId};
       6              : 
       7              : /// Scenarios in which we cannot find a suitable location for a tenant shard
       8            0 : #[derive(thiserror::Error, Debug)]
       9              : pub enum ScheduleError {
      10              :     #[error("No pageservers found")]
      11              :     NoPageservers,
      12              :     #[error("No pageserver found matching constraint")]
      13              :     ImpossibleConstraint,
      14              : }
      15              : 
      16              : impl From<ScheduleError> for ApiError {
      17            0 :     fn from(value: ScheduleError) -> Self {
      18            0 :         ApiError::Conflict(format!("Scheduling error: {}", value))
      19            0 :     }
      20              : }
      21              : 
      22              : #[derive(Serialize, Eq, PartialEq)]
      23              : pub enum MaySchedule {
      24              :     Yes(UtilizationScore),
      25              :     No,
      26              : }
      27              : 
      28              : #[derive(Serialize)]
      29              : struct SchedulerNode {
      30              :     /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
      31              :     shard_count: usize,
      32              : 
      33              :     /// Whether this node is currently elegible to have new shards scheduled (this is derived
      34              :     /// from a node's availability state and scheduling policy).
      35              :     may_schedule: MaySchedule,
      36              : }
      37              : 
      38              : impl PartialEq for SchedulerNode {
      39            6 :     fn eq(&self, other: &Self) -> bool {
      40            6 :         let may_schedule_matches = matches!(
      41            6 :             (&self.may_schedule, &other.may_schedule),
      42              :             (MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
      43              :         );
      44              : 
      45            6 :         may_schedule_matches && self.shard_count == other.shard_count
      46            6 :     }
      47              : }
      48              : 
      49              : impl Eq for SchedulerNode {}
      50              : 
      51              : /// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
      52              : /// on which to run.
      53              : ///
      54              : /// The type has no persistent state of its own: this is all populated at startup.  The Serialize
      55              : /// impl is only for debug dumps.
      56              : #[derive(Serialize)]
      57              : pub(crate) struct Scheduler {
      58              :     nodes: HashMap<NodeId, SchedulerNode>,
      59              : }
      60              : 
      61              : /// Score for soft constraint scheduling: lower scores are preferred to higher scores.
      62              : ///
      63              : /// For example, we may set an affinity score based on the number of shards from the same
      64              : /// tenant already on a node, to implicitly prefer to balance out shards.
      65              : #[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
      66              : pub(crate) struct AffinityScore(pub(crate) usize);
      67              : 
      68              : impl AffinityScore {
      69              :     /// If we have no anti-affinity at all toward a node, this is its score.  It means
      70              :     /// the scheduler has a free choice amongst nodes with this score, and may pick a node
      71              :     /// based on other information such as total utilization.
      72              :     pub(crate) const FREE: Self = Self(0);
      73              : 
      74          188 :     pub(crate) fn inc(&mut self) {
      75          188 :         self.0 += 1;
      76          188 :     }
      77              : }
      78              : 
      79              : impl std::ops::Add for AffinityScore {
      80              :     type Output = Self;
      81              : 
      82           54 :     fn add(self, rhs: Self) -> Self::Output {
      83           54 :         Self(self.0 + rhs.0)
      84           54 :     }
      85              : }
      86              : 
      87              : /// Hint for whether this is a sincere attempt to schedule, or a speculative
      88              : /// check for where we _would_ schedule (done during optimization)
      89              : #[derive(Debug)]
      90              : pub(crate) enum ScheduleMode {
      91              :     Normal,
      92              :     Speculative,
      93              : }
      94              : 
      95              : impl Default for ScheduleMode {
      96           32 :     fn default() -> Self {
      97           32 :         Self::Normal
      98           32 :     }
      99              : }
     100              : 
     101              : // For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
     102              : // it for many shards in the same tenant.
     103              : #[derive(Debug, Default)]
     104              : pub(crate) struct ScheduleContext {
     105              :     /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
     106              :     pub(crate) nodes: HashMap<NodeId, AffinityScore>,
     107              : 
     108              :     /// Specifically how many _attached_ locations are on each node
     109              :     pub(crate) attached_nodes: HashMap<NodeId, usize>,
     110              : 
     111              :     pub(crate) mode: ScheduleMode,
     112              : }
     113              : 
     114              : impl ScheduleContext {
     115              :     /// Input is a list of nodes we would like to avoid using again within this context.  The more
     116              :     /// times a node is passed into this call, the less inclined we are to use it.
     117           96 :     pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
     118          284 :         for node_id in nodes {
     119          188 :             let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
     120          188 :             entry.inc()
     121              :         }
     122           96 :     }
     123              : 
     124           94 :     pub(crate) fn push_attached(&mut self, node_id: NodeId) {
     125           94 :         let entry = self.attached_nodes.entry(node_id).or_default();
     126           94 :         *entry += 1;
     127           94 :     }
     128              : 
     129          120 :     pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
     130          120 :         self.nodes
     131          120 :             .get(&node_id)
     132          120 :             .copied()
     133          120 :             .unwrap_or(AffinityScore::FREE)
     134          120 :     }
     135              : 
     136          120 :     pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
     137          120 :         self.attached_nodes.get(&node_id).copied().unwrap_or(0)
     138          120 :     }
     139              : }
     140              : 
     141              : impl Scheduler {
     142           14 :     pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
     143           14 :         let mut scheduler_nodes = HashMap::new();
     144           50 :         for node in nodes {
     145           36 :             scheduler_nodes.insert(
     146           36 :                 node.get_id(),
     147           36 :                 SchedulerNode {
     148           36 :                     shard_count: 0,
     149           36 :                     may_schedule: node.may_schedule(),
     150           36 :                 },
     151           36 :             );
     152           36 :         }
     153              : 
     154           14 :         Self {
     155           14 :             nodes: scheduler_nodes,
     156           14 :         }
     157           14 :     }
     158              : 
     159              :     /// For debug/support: check that our internal statistics are in sync with the state of
     160              :     /// the nodes & tenant shards.
     161              :     ///
     162              :     /// If anything is inconsistent, log details and return an error.
     163            2 :     pub(crate) fn consistency_check<'a>(
     164            2 :         &self,
     165            2 :         nodes: impl Iterator<Item = &'a Node>,
     166            2 :         shards: impl Iterator<Item = &'a TenantShard>,
     167            2 :     ) -> anyhow::Result<()> {
     168            2 :         let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
     169            8 :         for node in nodes {
     170            6 :             expect_nodes.insert(
     171            6 :                 node.get_id(),
     172            6 :                 SchedulerNode {
     173            6 :                     shard_count: 0,
     174            6 :                     may_schedule: node.may_schedule(),
     175            6 :                 },
     176            6 :             );
     177            6 :         }
     178              : 
     179            4 :         for shard in shards {
     180            2 :             if let Some(node_id) = shard.intent.get_attached() {
     181            2 :                 match expect_nodes.get_mut(node_id) {
     182            2 :                     Some(node) => node.shard_count += 1,
     183            0 :                     None => anyhow::bail!(
     184            0 :                         "Tenant {} references nonexistent node {}",
     185            0 :                         shard.tenant_shard_id,
     186            0 :                         node_id
     187            0 :                     ),
     188              :                 }
     189            0 :             }
     190              : 
     191            2 :             for node_id in shard.intent.get_secondary() {
     192            2 :                 match expect_nodes.get_mut(node_id) {
     193            2 :                     Some(node) => node.shard_count += 1,
     194            0 :                     None => anyhow::bail!(
     195            0 :                         "Tenant {} references nonexistent node {}",
     196            0 :                         shard.tenant_shard_id,
     197            0 :                         node_id
     198            0 :                     ),
     199              :                 }
     200              :             }
     201              :         }
     202              : 
     203            8 :         for (node_id, expect_node) in &expect_nodes {
     204            6 :             let Some(self_node) = self.nodes.get(node_id) else {
     205            0 :                 anyhow::bail!("Node {node_id} not found in Self")
     206              :             };
     207              : 
     208            6 :             if self_node != expect_node {
     209            0 :                 tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
     210            0 :                 tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
     211            0 :                 tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
     212              : 
     213            0 :                 anyhow::bail!("Inconsistent state on {node_id}");
     214            6 :             }
     215              :         }
     216              : 
     217            2 :         if expect_nodes.len() != self.nodes.len() {
     218              :             // We just checked that all the expected nodes are present.  If the lengths don't match,
     219              :             // it means that we have nodes in Self that are unexpected.
     220            0 :             for node_id in self.nodes.keys() {
     221            0 :                 if !expect_nodes.contains_key(node_id) {
     222            0 :                     anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
     223            0 :                 }
     224              :             }
     225            2 :         }
     226              : 
     227            2 :         Ok(())
     228            2 :     }
     229              : 
     230              :     /// Increment the reference count of a node.  This reference count is used to guide scheduling
     231              :     /// decisions, not for memory management: it represents one tenant shard whose IntentState targets
     232              :     /// this node.
     233              :     ///
     234              :     /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
     235              :     /// [`Self::new`] or [`Self::node_upsert`])
     236           60 :     pub(crate) fn node_inc_ref(&mut self, node_id: NodeId) {
     237           60 :         let Some(node) = self.nodes.get_mut(&node_id) else {
     238            0 :             tracing::error!("Scheduler missing node {node_id}");
     239            0 :             debug_assert!(false);
     240            0 :             return;
     241              :         };
     242              : 
     243           60 :         node.shard_count += 1;
     244           60 :     }
     245              : 
     246              :     /// Decrement a node's reference count.  Inverse of [`Self::node_inc_ref`].
     247           58 :     pub(crate) fn node_dec_ref(&mut self, node_id: NodeId) {
     248           58 :         let Some(node) = self.nodes.get_mut(&node_id) else {
     249            0 :             debug_assert!(false);
     250            0 :             tracing::error!("Scheduler missing node {node_id}");
     251            0 :             return;
     252              :         };
     253              : 
     254           58 :         node.shard_count -= 1;
     255           58 :     }
     256              : 
     257           10 :     pub(crate) fn node_upsert(&mut self, node: &Node) {
     258           10 :         use std::collections::hash_map::Entry::*;
     259           10 :         match self.nodes.entry(node.get_id()) {
     260            2 :             Occupied(mut entry) => {
     261            2 :                 entry.get_mut().may_schedule = node.may_schedule();
     262            2 :             }
     263            8 :             Vacant(entry) => {
     264            8 :                 entry.insert(SchedulerNode {
     265            8 :                     shard_count: 0,
     266            8 :                     may_schedule: node.may_schedule(),
     267            8 :                 });
     268            8 :             }
     269              :         }
     270           10 :     }
     271              : 
     272            0 :     pub(crate) fn node_remove(&mut self, node_id: NodeId) {
     273            0 :         if self.nodes.remove(&node_id).is_none() {
     274            0 :             tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
     275            0 :         }
     276            0 :     }
     277              : 
     278              :     /// Where we have several nodes to choose from, for example when picking a secondary location
     279              :     /// to promote to an attached location, this method may be used to pick the best choice based
     280              :     /// on the scheduler's knowledge of utilization and availability.
     281              :     ///
     282              :     /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
     283              :     /// caller can pick a node some other way.
     284           28 :     pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option<NodeId> {
     285           28 :         if nodes.is_empty() {
     286           24 :             return None;
     287            4 :         }
     288            4 : 
     289            4 :         // TODO: When the utilization score returned by the pageserver becomes meaningful,
     290            4 :         // schedule based on that instead of the shard count.
     291            4 :         let node = nodes
     292            4 :             .iter()
     293            8 :             .map(|node_id| {
     294            8 :                 let may_schedule = self
     295            8 :                     .nodes
     296            8 :                     .get(node_id)
     297            8 :                     .map(|n| n.may_schedule != MaySchedule::No)
     298            8 :                     .unwrap_or(false);
     299            8 :                 (*node_id, may_schedule)
     300            8 :             })
     301            8 :             .max_by_key(|(_n, may_schedule)| *may_schedule);
     302            4 : 
     303            4 :         // If even the preferred node has may_schedule==false, return None
     304            4 :         node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
     305           28 :     }
     306              : 
     307              :     /// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
     308              :     /// are already in use by this shard -- we use this to avoid picking the same node
     309              :     /// as both attached and secondary location.  This is a hard constraint: if we cannot
     310              :     /// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
     311              :     ///
     312              :     /// context: we prefer to avoid using nodes identified in the context, according
     313              :     /// to their anti-affinity score.  We use this to prefeer to avoid placing shards in
     314              :     /// the same tenant on the same node.  This is a soft constraint: the context will never
     315              :     /// cause us to fail to schedule a shard.
     316           60 :     pub(crate) fn schedule_shard(
     317           60 :         &self,
     318           60 :         hard_exclude: &[NodeId],
     319           60 :         context: &ScheduleContext,
     320           60 :     ) -> Result<NodeId, ScheduleError> {
     321           60 :         if self.nodes.is_empty() {
     322            0 :             return Err(ScheduleError::NoPageservers);
     323           60 :         }
     324           60 : 
     325           60 :         let mut scores: Vec<(NodeId, AffinityScore, usize)> = self
     326           60 :             .nodes
     327           60 :             .iter()
     328          188 :             .filter_map(|(k, v)| {
     329          188 :                 if hard_exclude.contains(k) || v.may_schedule == MaySchedule::No {
     330           74 :                     None
     331              :                 } else {
     332          114 :                     Some((
     333          114 :                         *k,
     334          114 :                         context.nodes.get(k).copied().unwrap_or(AffinityScore::FREE),
     335          114 :                         v.shard_count,
     336          114 :                     ))
     337              :                 }
     338          188 :             })
     339           60 :             .collect();
     340           60 : 
     341           60 :         // Sort by, in order of precedence:
     342           60 :         //  1st: Affinity score.  We should never pick a higher-score node if a lower-score node is available
     343           60 :         //  2nd: Utilization.  Within nodes with the same affinity, use the least loaded nodes.
     344           60 :         //  3rd: Node ID.  This is a convenience to make selection deterministic in tests and empty systems.
     345          116 :         scores.sort_by_key(|i| (i.1, i.2, i.0));
     346           60 : 
     347           60 :         if scores.is_empty() {
     348              :             // After applying constraints, no pageservers were left.
     349            0 :             if !matches!(context.mode, ScheduleMode::Speculative) {
     350              :                 // If this was not a speculative attempt, log details to understand why we couldn't
     351              :                 // schedule: this may help an engineer understand if some nodes are marked offline
     352              :                 // in a way that's preventing progress.
     353            0 :                 tracing::info!(
     354            0 :                     "Scheduling failure, while excluding {hard_exclude:?}, node states:"
     355              :                 );
     356            0 :                 for (node_id, node) in &self.nodes {
     357            0 :                     tracing::info!(
     358            0 :                         "Node {node_id}: may_schedule={} shards={}",
     359            0 :                         node.may_schedule != MaySchedule::No,
     360              :                         node.shard_count
     361              :                     );
     362              :                 }
     363            0 :             }
     364            0 :             return Err(ScheduleError::ImpossibleConstraint);
     365           60 :         }
     366           60 : 
     367           60 :         // Lowest score wins
     368           60 :         let node_id = scores.first().unwrap().0;
     369              : 
     370           60 :         if !matches!(context.mode, ScheduleMode::Speculative) {
     371           60 :             tracing::info!(
     372            0 :             "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
     373            0 :             scores.iter().map(|i| i.0 .0).collect::<Vec<_>>()
     374              :         );
     375            0 :         }
     376              : 
     377              :         // Note that we do not update shard count here to reflect the scheduling: that
     378              :         // is IntentState's job when the scheduled location is used.
     379              : 
     380           60 :         Ok(node_id)
     381           60 :     }
     382              : 
     383              :     /// Unit test access to internal state
     384              :     #[cfg(test)]
     385           12 :     pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
     386           12 :         self.nodes.get(&node_id).unwrap().shard_count
     387           12 :     }
     388              : }
     389              : 
     390              : #[cfg(test)]
     391              : pub(crate) mod test_utils {
     392              : 
     393              :     use crate::node::Node;
     394              :     use pageserver_api::controller_api::{NodeAvailability, UtilizationScore};
     395              :     use std::collections::HashMap;
     396              :     use utils::id::NodeId;
     397              :     /// Test helper: synthesize the requested number of nodes, all in active state.
     398              :     ///
     399              :     /// Node IDs start at one.
     400           14 :     pub(crate) fn make_test_nodes(n: u64) -> HashMap<NodeId, Node> {
     401           14 :         (1..n + 1)
     402           44 :             .map(|i| {
     403           44 :                 (NodeId(i), {
     404           44 :                     let mut node = Node::new(
     405           44 :                         NodeId(i),
     406           44 :                         format!("httphost-{i}"),
     407           44 :                         80 + i as u16,
     408           44 :                         format!("pghost-{i}"),
     409           44 :                         5432 + i as u16,
     410           44 :                     );
     411           44 :                     node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
     412           44 :                     assert!(node.is_available());
     413           44 :                     node
     414           44 :                 })
     415           44 :             })
     416           14 :             .collect()
     417           14 :     }
     418              : }
     419              : 
     420              : #[cfg(test)]
     421              : mod tests {
     422              :     use super::*;
     423              : 
     424              :     use crate::tenant_shard::IntentState;
     425              :     #[test]
     426            2 :     fn scheduler_basic() -> anyhow::Result<()> {
     427            2 :         let nodes = test_utils::make_test_nodes(2);
     428            2 : 
     429            2 :         let mut scheduler = Scheduler::new(nodes.values());
     430            2 :         let mut t1_intent = IntentState::new();
     431            2 :         let mut t2_intent = IntentState::new();
     432            2 : 
     433            2 :         let context = ScheduleContext::default();
     434              : 
     435            2 :         let scheduled = scheduler.schedule_shard(&[], &context)?;
     436            2 :         t1_intent.set_attached(&mut scheduler, Some(scheduled));
     437            2 :         let scheduled = scheduler.schedule_shard(&[], &context)?;
     438            2 :         t2_intent.set_attached(&mut scheduler, Some(scheduled));
     439            2 : 
     440            2 :         assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
     441            2 :         assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
     442              : 
     443            2 :         let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers(), &context)?;
     444            2 :         t1_intent.push_secondary(&mut scheduler, scheduled);
     445            2 : 
     446            2 :         assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
     447            2 :         assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 2);
     448              : 
     449            2 :         t1_intent.clear(&mut scheduler);
     450            2 :         assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
     451            2 :         assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
     452              : 
     453            2 :         if cfg!(debug_assertions) {
     454              :             // Dropping an IntentState without clearing it causes a panic in debug mode,
     455              :             // because we have failed to properly update scheduler shard counts.
     456            2 :             let result = std::panic::catch_unwind(move || {
     457            2 :                 drop(t2_intent);
     458            2 :             });
     459            2 :             assert!(result.is_err());
     460              :         } else {
     461            0 :             t2_intent.clear(&mut scheduler);
     462            0 :             assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
     463            0 :             assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 0);
     464              :         }
     465              : 
     466            2 :         Ok(())
     467            2 :     }
     468              : }
        

Generated by: LCOV version 2.1-beta