LCOV - code coverage report
Current view: top level - control_plane/attachment_service/src - scheduler.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 57.9 % 209 121
Test Date: 2024-02-29 11:57:12 Functions: 44.4 % 36 16

            Line data    Source code
       1              : use crate::{node::Node, tenant_state::TenantState};
       2              : use serde::Serialize;
       3              : use std::collections::HashMap;
       4              : use utils::{http::error::ApiError, id::NodeId};
       5              : 
       6              : /// Scenarios in which we cannot find a suitable location for a tenant shard
       7            0 : #[derive(thiserror::Error, Debug)]
       8              : pub enum ScheduleError {
       9              :     #[error("No pageservers found")]
      10              :     NoPageservers,
      11              :     #[error("No pageserver found matching constraint")]
      12              :     ImpossibleConstraint,
      13              : }
      14              : 
      15              : impl From<ScheduleError> for ApiError {
      16            0 :     fn from(value: ScheduleError) -> Self {
      17            0 :         ApiError::Conflict(format!("Scheduling error: {}", value))
      18            0 :     }
      19              : }
      20              : 
      21            0 : #[derive(Serialize, Eq, PartialEq)]
      22              : struct SchedulerNode {
      23              :     /// How many shards are currently scheduled on this node, via their [`crate::tenant_state::IntentState`].
      24              :     shard_count: usize,
      25              : 
      26              :     /// Whether this node is currently elegible to have new shards scheduled (this is derived
      27              :     /// from a node's availability state and scheduling policy).
      28              :     may_schedule: bool,
      29              : }
      30              : 
      31              : /// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
      32              : /// on which to run.
      33              : ///
      34              : /// The type has no persistent state of its own: this is all populated at startup.  The Serialize
      35              : /// impl is only for debug dumps.
      36            0 : #[derive(Serialize)]
      37              : pub(crate) struct Scheduler {
      38              :     nodes: HashMap<NodeId, SchedulerNode>,
      39              : }
      40              : 
      41              : impl Scheduler {
      42            4 :     pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
      43            4 :         let mut scheduler_nodes = HashMap::new();
      44           14 :         for node in nodes {
      45           10 :             scheduler_nodes.insert(
      46           10 :                 node.id,
      47           10 :                 SchedulerNode {
      48           10 :                     shard_count: 0,
      49           10 :                     may_schedule: node.may_schedule(),
      50           10 :                 },
      51           10 :             );
      52           10 :         }
      53              : 
      54            4 :         Self {
      55            4 :             nodes: scheduler_nodes,
      56            4 :         }
      57            4 :     }
      58              : 
      59              :     /// For debug/support: check that our internal statistics are in sync with the state of
      60              :     /// the nodes & tenant shards.
      61              :     ///
      62              :     /// If anything is inconsistent, log details and return an error.
      63            0 :     pub(crate) fn consistency_check<'a>(
      64            0 :         &self,
      65            0 :         nodes: impl Iterator<Item = &'a Node>,
      66            0 :         shards: impl Iterator<Item = &'a TenantState>,
      67            0 :     ) -> anyhow::Result<()> {
      68            0 :         let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
      69            0 :         for node in nodes {
      70            0 :             expect_nodes.insert(
      71            0 :                 node.id,
      72            0 :                 SchedulerNode {
      73            0 :                     shard_count: 0,
      74            0 :                     may_schedule: node.may_schedule(),
      75            0 :                 },
      76            0 :             );
      77            0 :         }
      78              : 
      79            0 :         for shard in shards {
      80            0 :             if let Some(node_id) = shard.intent.get_attached() {
      81            0 :                 match expect_nodes.get_mut(node_id) {
      82            0 :                     Some(node) => node.shard_count += 1,
      83            0 :                     None => anyhow::bail!(
      84            0 :                         "Tenant {} references nonexistent node {}",
      85            0 :                         shard.tenant_shard_id,
      86            0 :                         node_id
      87            0 :                     ),
      88              :                 }
      89            0 :             }
      90              : 
      91            0 :             for node_id in shard.intent.get_secondary() {
      92            0 :                 match expect_nodes.get_mut(node_id) {
      93            0 :                     Some(node) => node.shard_count += 1,
      94            0 :                     None => anyhow::bail!(
      95            0 :                         "Tenant {} references nonexistent node {}",
      96            0 :                         shard.tenant_shard_id,
      97            0 :                         node_id
      98            0 :                     ),
      99              :                 }
     100              :             }
     101              :         }
     102              : 
     103            0 :         for (node_id, expect_node) in &expect_nodes {
     104            0 :             let Some(self_node) = self.nodes.get(node_id) else {
     105            0 :                 anyhow::bail!("Node {node_id} not found in Self")
     106              :             };
     107              : 
     108            0 :             if self_node != expect_node {
     109            0 :                 tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
     110            0 :                 tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
     111            0 :                 tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
     112              : 
     113            0 :                 anyhow::bail!("Inconsistent state on {node_id}");
     114            0 :             }
     115              :         }
     116              : 
     117            0 :         if expect_nodes.len() != self.nodes.len() {
     118              :             // We just checked that all the expected nodes are present.  If the lengths don't match,
     119              :             // it means that we have nodes in Self that are unexpected.
     120            0 :             for node_id in self.nodes.keys() {
     121            0 :                 if !expect_nodes.contains_key(node_id) {
     122            0 :                     anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
     123            0 :                 }
     124              :             }
     125            0 :         }
     126              : 
     127            0 :         Ok(())
     128            0 :     }
     129              : 
     130              :     /// Increment the reference count of a node.  This reference count is used to guide scheduling
     131              :     /// decisions, not for memory management: it represents one tenant shard whose IntentState targets
     132              :     /// this node.
     133              :     ///
     134              :     /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
     135              :     /// [`Self::new`] or [`Self::node_upsert`])
     136           10 :     pub(crate) fn node_inc_ref(&mut self, node_id: NodeId) {
     137           10 :         let Some(node) = self.nodes.get_mut(&node_id) else {
     138            0 :             tracing::error!("Scheduler missing node {node_id}");
     139            0 :             debug_assert!(false);
     140            0 :             return;
     141              :         };
     142              : 
     143           10 :         node.shard_count += 1;
     144           10 :     }
     145              : 
     146              :     /// Decrement a node's reference count.  Inverse of [`Self::node_inc_ref`].
     147            8 :     pub(crate) fn node_dec_ref(&mut self, node_id: NodeId) {
     148            8 :         let Some(node) = self.nodes.get_mut(&node_id) else {
     149            0 :             debug_assert!(false);
     150            0 :             tracing::error!("Scheduler missing node {node_id}");
     151            0 :             return;
     152              :         };
     153              : 
     154            8 :         node.shard_count -= 1;
     155            8 :     }
     156              : 
     157            2 :     pub(crate) fn node_upsert(&mut self, node: &Node) {
     158            2 :         use std::collections::hash_map::Entry::*;
     159            2 :         match self.nodes.entry(node.id) {
     160            2 :             Occupied(mut entry) => {
     161            2 :                 entry.get_mut().may_schedule = node.may_schedule();
     162            2 :             }
     163            0 :             Vacant(entry) => {
     164            0 :                 entry.insert(SchedulerNode {
     165            0 :                     shard_count: 0,
     166            0 :                     may_schedule: node.may_schedule(),
     167            0 :                 });
     168            0 :             }
     169              :         }
     170            2 :     }
     171              : 
     172            0 :     pub(crate) fn node_remove(&mut self, node_id: NodeId) {
     173            0 :         if self.nodes.remove(&node_id).is_none() {
     174            0 :             tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
     175            0 :         }
     176            0 :     }
     177              : 
     178              :     /// Where we have several nodes to choose from, for example when picking a secondary location
     179              :     /// to promote to an attached location, this method may be used to pick the best choice based
     180              :     /// on the scheduler's knowledge of utilization and availability.
     181              :     ///
     182              :     /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
     183              :     /// caller can pick a node some other way.
     184            4 :     pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option<NodeId> {
     185            4 :         if nodes.is_empty() {
     186            2 :             return None;
     187            2 :         }
     188            2 : 
     189            2 :         let node = nodes
     190            2 :             .iter()
     191            4 :             .map(|node_id| {
     192            4 :                 let may_schedule = self
     193            4 :                     .nodes
     194            4 :                     .get(node_id)
     195            4 :                     .map(|n| n.may_schedule)
     196            4 :                     .unwrap_or(false);
     197            4 :                 (*node_id, may_schedule)
     198            4 :             })
     199            4 :             .max_by_key(|(_n, may_schedule)| *may_schedule);
     200            2 : 
     201            2 :         // If even the preferred node has may_schedule==false, return None
     202            2 :         node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
     203            4 :     }
     204              : 
     205           10 :     pub(crate) fn schedule_shard(&self, hard_exclude: &[NodeId]) -> Result<NodeId, ScheduleError> {
     206           10 :         if self.nodes.is_empty() {
     207            0 :             return Err(ScheduleError::NoPageservers);
     208           10 :         }
     209           10 : 
     210           10 :         let mut tenant_counts: Vec<(NodeId, usize)> = self
     211           10 :             .nodes
     212           10 :             .iter()
     213           24 :             .filter_map(|(k, v)| {
     214           24 :                 if hard_exclude.contains(k) || !v.may_schedule {
     215            4 :                     None
     216              :                 } else {
     217           20 :                     Some((*k, v.shard_count))
     218              :                 }
     219           24 :             })
     220           10 :             .collect();
     221           10 : 
     222           10 :         // Sort by tenant count.  Nodes with the same tenant count are sorted by ID.
     223           22 :         tenant_counts.sort_by_key(|i| (i.1, i.0));
     224           10 : 
     225           10 :         if tenant_counts.is_empty() {
     226              :             // After applying constraints, no pageservers were left.  We log some detail about
     227              :             // the state of nodes to help understand why this happened.  This is not logged as an error because
     228              :             // it is legitimately possible for enough nodes to be Offline to prevent scheduling a shard.
     229            0 :             tracing::info!("Scheduling failure, while excluding {hard_exclude:?}, node states:");
     230            0 :             for (node_id, node) in &self.nodes {
     231            0 :                 tracing::info!(
     232            0 :                     "Node {node_id}: may_schedule={} shards={}",
     233            0 :                     node.may_schedule,
     234            0 :                     node.shard_count
     235            0 :                 );
     236              :             }
     237              : 
     238            0 :             return Err(ScheduleError::ImpossibleConstraint);
     239           10 :         }
     240           10 : 
     241           10 :         let node_id = tenant_counts.first().unwrap().0;
     242           10 :         tracing::info!(
     243            0 :             "scheduler selected node {node_id} (elegible nodes {:?}, exclude: {hard_exclude:?})",
     244            0 :             tenant_counts.iter().map(|i| i.0 .0).collect::<Vec<_>>()
     245            0 :         );
     246              : 
     247              :         // Note that we do not update shard count here to reflect the scheduling: that
     248              :         // is IntentState's job when the scheduled location is used.
     249              : 
     250           10 :         Ok(node_id)
     251           10 :     }
     252              : }
     253              : 
     254              : #[cfg(test)]
     255              : pub(crate) mod test_utils {
     256              : 
     257              :     use crate::node::Node;
     258              :     use pageserver_api::controller_api::{NodeAvailability, NodeSchedulingPolicy};
     259              :     use std::collections::HashMap;
     260              :     use utils::id::NodeId;
     261              :     /// Test helper: synthesize the requested number of nodes, all in active state.
     262              :     ///
     263              :     /// Node IDs start at one.
     264            4 :     pub(crate) fn make_test_nodes(n: u64) -> HashMap<NodeId, Node> {
     265            4 :         (1..n + 1)
     266           10 :             .map(|i| {
     267           10 :                 (
     268           10 :                     NodeId(i),
     269           10 :                     Node {
     270           10 :                         id: NodeId(i),
     271           10 :                         availability: NodeAvailability::Active,
     272           10 :                         scheduling: NodeSchedulingPolicy::Active,
     273           10 :                         listen_http_addr: format!("httphost-{i}"),
     274           10 :                         listen_http_port: 80 + i as u16,
     275           10 :                         listen_pg_addr: format!("pghost-{i}"),
     276           10 :                         listen_pg_port: 5432 + i as u16,
     277           10 :                     },
     278           10 :                 )
     279           10 :             })
     280            4 :             .collect()
     281            4 :     }
     282              : }
     283              : 
     284              : #[cfg(test)]
     285              : mod tests {
     286              :     use super::*;
     287              :     use utils::id::NodeId;
     288              : 
     289              :     use crate::tenant_state::IntentState;
     290            2 :     #[test]
     291            2 :     fn scheduler_basic() -> anyhow::Result<()> {
     292            2 :         let nodes = test_utils::make_test_nodes(2);
     293            2 : 
     294            2 :         let mut scheduler = Scheduler::new(nodes.values());
     295            2 :         let mut t1_intent = IntentState::new();
     296            2 :         let mut t2_intent = IntentState::new();
     297              : 
     298            2 :         let scheduled = scheduler.schedule_shard(&[])?;
     299            2 :         t1_intent.set_attached(&mut scheduler, Some(scheduled));
     300            2 :         let scheduled = scheduler.schedule_shard(&[])?;
     301            2 :         t2_intent.set_attached(&mut scheduler, Some(scheduled));
     302            2 : 
     303            2 :         assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
     304            2 :         assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
     305              : 
     306            2 :         let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers())?;
     307            2 :         t1_intent.push_secondary(&mut scheduler, scheduled);
     308            2 : 
     309            2 :         assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
     310            2 :         assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 2);
     311              : 
     312            2 :         t1_intent.clear(&mut scheduler);
     313            2 :         assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
     314            2 :         assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
     315              : 
     316            2 :         if cfg!(debug_assertions) {
     317              :             // Dropping an IntentState without clearing it causes a panic in debug mode,
     318              :             // because we have failed to properly update scheduler shard counts.
     319            2 :             let result = std::panic::catch_unwind(move || {
     320            2 :                 drop(t2_intent);
     321            2 :             });
     322            2 :             assert!(result.is_err());
     323              :         } else {
     324            0 :             t2_intent.clear(&mut scheduler);
     325            0 :             assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
     326            0 :             assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 0);
     327              :         }
     328              : 
     329            2 :         Ok(())
     330            2 :     }
     331              : }
        

Generated by: LCOV version 2.1-beta