LCOV - code coverage report
Current view: top level - storage_controller/src - scheduler.rs (source / functions) Coverage Total Hit
Test: fc67f8dc6087a0b4f4f0bcd74f6e1dc25fab8cf3.info Lines: 85.6 % 522 447
Test Date: 2024-09-24 13:57:57 Functions: 72.7 % 55 40

            Line data    Source code
       1              : use crate::{node::Node, tenant_shard::TenantShard};
       2              : use itertools::Itertools;
       3              : use pageserver_api::models::PageserverUtilization;
       4              : use serde::Serialize;
       5              : use std::{collections::HashMap, fmt::Debug};
       6              : use utils::{http::error::ApiError, id::NodeId};
       7              : 
       8              : /// Scenarios in which we cannot find a suitable location for a tenant shard
       9            0 : #[derive(thiserror::Error, Debug)]
      10              : pub enum ScheduleError {
      11              :     #[error("No pageservers found")]
      12              :     NoPageservers,
      13              :     #[error("No pageserver found matching constraint")]
      14              :     ImpossibleConstraint,
      15              : }
      16              : 
      17              : impl From<ScheduleError> for ApiError {
      18            0 :     fn from(value: ScheduleError) -> Self {
      19            0 :         ApiError::Conflict(format!("Scheduling error: {}", value))
      20            0 :     }
      21              : }
      22              : 
      23              : #[derive(Serialize)]
      24              : pub enum MaySchedule {
      25              :     Yes(PageserverUtilization),
      26              :     No,
      27              : }
      28              : 
      29              : #[derive(Serialize)]
      30              : pub(crate) struct SchedulerNode {
      31              :     /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
      32              :     shard_count: usize,
      33              :     /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
      34              :     attached_shard_count: usize,
      35              : 
      36              :     /// Whether this node is currently elegible to have new shards scheduled (this is derived
      37              :     /// from a node's availability state and scheduling policy).
      38              :     may_schedule: MaySchedule,
      39              : }
      40              : 
      41              : pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized {
      42              :     fn generate(
      43              :         node_id: &NodeId,
      44              :         node: &mut SchedulerNode,
      45              :         context: &ScheduleContext,
      46              :     ) -> Option<Self>;
      47              :     fn is_overloaded(&self) -> bool;
      48              :     fn node_id(&self) -> NodeId;
      49              : }
      50              : 
      51              : pub(crate) trait ShardTag {
      52              :     type Score: NodeSchedulingScore;
      53              : }
      54              : 
      55              : pub(crate) struct AttachedShardTag {}
      56              : impl ShardTag for AttachedShardTag {
      57              :     type Score = NodeAttachmentSchedulingScore;
      58              : }
      59              : 
      60              : pub(crate) struct SecondaryShardTag {}
      61              : impl ShardTag for SecondaryShardTag {
      62              :     type Score = NodeSecondarySchedulingScore;
      63              : }
      64              : 
      65              : /// Scheduling score of a given node for shard attachments.
      66              : /// Lower scores indicate more suitable nodes.
      67              : /// Ordering is given by member declaration order (top to bottom).
      68              : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
      69              : pub(crate) struct NodeAttachmentSchedulingScore {
      70              :     /// The number of shards belonging to the tenant currently being
      71              :     /// scheduled that are attached to this node.
      72              :     affinity_score: AffinityScore,
      73              :     /// Size of [`ScheduleContext::attached_nodes`] for the current node.
      74              :     /// This normally tracks the number of attached shards belonging to the
      75              :     /// tenant being scheduled that are already on this node.
      76              :     attached_shards_in_context: usize,
      77              :     /// Utilisation score that combines shard count and disk utilisation
      78              :     utilization_score: u64,
      79              :     /// Total number of shards attached to this node. When nodes have identical utilisation, this
      80              :     /// acts as an anti-affinity between attached shards.
      81              :     total_attached_shard_count: usize,
      82              :     /// Convenience to make selection deterministic in tests and empty systems
      83              :     node_id: NodeId,
      84              : }
      85              : 
      86              : impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
      87           68 :     fn generate(
      88           68 :         node_id: &NodeId,
      89           68 :         node: &mut SchedulerNode,
      90           68 :         context: &ScheduleContext,
      91           68 :     ) -> Option<Self> {
      92           68 :         let utilization = match &mut node.may_schedule {
      93           68 :             MaySchedule::Yes(u) => u,
      94              :             MaySchedule::No => {
      95            0 :                 return None;
      96              :             }
      97              :         };
      98              : 
      99           68 :         Some(Self {
     100           68 :             affinity_score: context
     101           68 :                 .nodes
     102           68 :                 .get(node_id)
     103           68 :                 .copied()
     104           68 :                 .unwrap_or(AffinityScore::FREE),
     105           68 :             attached_shards_in_context: context.attached_nodes.get(node_id).copied().unwrap_or(0),
     106           68 :             utilization_score: utilization.cached_score(),
     107           68 :             total_attached_shard_count: node.attached_shard_count,
     108           68 :             node_id: *node_id,
     109           68 :         })
     110           68 :     }
     111              : 
     112           68 :     fn is_overloaded(&self) -> bool {
     113           68 :         PageserverUtilization::is_overloaded(self.utilization_score)
     114           68 :     }
     115              : 
     116           28 :     fn node_id(&self) -> NodeId {
     117           28 :         self.node_id
     118           28 :     }
     119              : }
     120              : 
     121              : /// Scheduling score of a given node for shard secondaries.
     122              : /// Lower scores indicate more suitable nodes.
     123              : /// Ordering is given by member declaration order (top to bottom).
     124              : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
     125              : pub(crate) struct NodeSecondarySchedulingScore {
     126              :     /// The number of shards belonging to the tenant currently being
     127              :     /// scheduled that are attached to this node.
     128              :     affinity_score: AffinityScore,
     129              :     /// Utilisation score that combines shard count and disk utilisation
     130              :     utilization_score: u64,
     131              :     /// Total number of shards attached to this node. When nodes have identical utilisation, this
     132              :     /// acts as an anti-affinity between attached shards.
     133              :     total_attached_shard_count: usize,
     134              :     /// Convenience to make selection deterministic in tests and empty systems
     135              :     node_id: NodeId,
     136              : }
     137              : 
     138              : impl NodeSchedulingScore for NodeSecondarySchedulingScore {
     139           40 :     fn generate(
     140           40 :         node_id: &NodeId,
     141           40 :         node: &mut SchedulerNode,
     142           40 :         context: &ScheduleContext,
     143           40 :     ) -> Option<Self> {
     144           40 :         let utilization = match &mut node.may_schedule {
     145           40 :             MaySchedule::Yes(u) => u,
     146              :             MaySchedule::No => {
     147            0 :                 return None;
     148              :             }
     149              :         };
     150              : 
     151           40 :         Some(Self {
     152           40 :             affinity_score: context
     153           40 :                 .nodes
     154           40 :                 .get(node_id)
     155           40 :                 .copied()
     156           40 :                 .unwrap_or(AffinityScore::FREE),
     157           40 :             utilization_score: utilization.cached_score(),
     158           40 :             total_attached_shard_count: node.attached_shard_count,
     159           40 :             node_id: *node_id,
     160           40 :         })
     161           40 :     }
     162              : 
     163           40 :     fn is_overloaded(&self) -> bool {
     164           40 :         PageserverUtilization::is_overloaded(self.utilization_score)
     165           40 :     }
     166              : 
     167           26 :     fn node_id(&self) -> NodeId {
     168           26 :         self.node_id
     169           26 :     }
     170              : }
     171              : 
     172              : impl PartialEq for SchedulerNode {
     173            3 :     fn eq(&self, other: &Self) -> bool {
     174            3 :         let may_schedule_matches = matches!(
     175            3 :             (&self.may_schedule, &other.may_schedule),
     176              :             (MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
     177              :         );
     178              : 
     179            3 :         may_schedule_matches
     180            3 :             && self.shard_count == other.shard_count
     181            3 :             && self.attached_shard_count == other.attached_shard_count
     182            3 :     }
     183              : }
     184              : 
     185              : impl Eq for SchedulerNode {}
     186              : 
     187              : /// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
     188              : /// on which to run.
     189              : ///
     190              : /// The type has no persistent state of its own: this is all populated at startup.  The Serialize
     191              : /// impl is only for debug dumps.
     192              : #[derive(Serialize)]
     193              : pub(crate) struct Scheduler {
     194              :     nodes: HashMap<NodeId, SchedulerNode>,
     195              : }
     196              : 
     197              : /// Score for soft constraint scheduling: lower scores are preferred to higher scores.
     198              : ///
     199              : /// For example, we may set an affinity score based on the number of shards from the same
     200              : /// tenant already on a node, to implicitly prefer to balance out shards.
     201              : #[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
     202              : pub(crate) struct AffinityScore(pub(crate) usize);
     203              : 
     204              : impl AffinityScore {
     205              :     /// If we have no anti-affinity at all toward a node, this is its score.  It means
     206              :     /// the scheduler has a free choice amongst nodes with this score, and may pick a node
     207              :     /// based on other information such as total utilization.
     208              :     pub(crate) const FREE: Self = Self(0);
     209              : 
     210          112 :     pub(crate) fn inc(&mut self) {
     211          112 :         self.0 += 1;
     212          112 :     }
     213              : }
     214              : 
     215              : impl std::ops::Add for AffinityScore {
     216              :     type Output = Self;
     217              : 
     218           24 :     fn add(self, rhs: Self) -> Self::Output {
     219           24 :         Self(self.0 + rhs.0)
     220           24 :     }
     221              : }
     222              : 
     223              : /// Hint for whether this is a sincere attempt to schedule, or a speculative
     224              : /// check for where we _would_ schedule (done during optimization)
     225              : #[derive(Debug)]
     226              : pub(crate) enum ScheduleMode {
     227              :     Normal,
     228              :     Speculative,
     229              : }
     230              : 
     231              : impl Default for ScheduleMode {
     232           20 :     fn default() -> Self {
     233           20 :         Self::Normal
     234           20 :     }
     235              : }
     236              : 
     237              : // For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
     238              : // it for many shards in the same tenant.
     239              : #[derive(Debug, Default)]
     240              : pub(crate) struct ScheduleContext {
     241              :     /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
     242              :     pub(crate) nodes: HashMap<NodeId, AffinityScore>,
     243              : 
     244              :     /// Specifically how many _attached_ locations are on each node
     245              :     pub(crate) attached_nodes: HashMap<NodeId, usize>,
     246              : 
     247              :     pub(crate) mode: ScheduleMode,
     248              : }
     249              : 
     250              : impl ScheduleContext {
     251              :     /// Input is a list of nodes we would like to avoid using again within this context.  The more
     252              :     /// times a node is passed into this call, the less inclined we are to use it.
     253           57 :     pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
     254          169 :         for node_id in nodes {
     255          112 :             let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
     256          112 :             entry.inc()
     257              :         }
     258           57 :     }
     259              : 
     260           55 :     pub(crate) fn push_attached(&mut self, node_id: NodeId) {
     261           55 :         let entry = self.attached_nodes.entry(node_id).or_default();
     262           55 :         *entry += 1;
     263           55 :     }
     264              : 
     265           69 :     pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
     266           69 :         self.nodes
     267           69 :             .get(&node_id)
     268           69 :             .copied()
     269           69 :             .unwrap_or(AffinityScore::FREE)
     270           69 :     }
     271              : 
     272           69 :     pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
     273           69 :         self.attached_nodes.get(&node_id).copied().unwrap_or(0)
     274           69 :     }
     275              : }
     276              : 
     277              : pub(crate) enum RefCountUpdate {
     278              :     PromoteSecondary,
     279              :     Attach,
     280              :     Detach,
     281              :     DemoteAttached,
     282              :     AddSecondary,
     283              :     RemoveSecondary,
     284              : }
     285              : 
     286              : impl Scheduler {
     287            9 :     pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
     288            9 :         let mut scheduler_nodes = HashMap::new();
     289           30 :         for node in nodes {
     290           21 :             scheduler_nodes.insert(
     291           21 :                 node.get_id(),
     292           21 :                 SchedulerNode {
     293           21 :                     shard_count: 0,
     294           21 :                     attached_shard_count: 0,
     295           21 :                     may_schedule: node.may_schedule(),
     296           21 :                 },
     297           21 :             );
     298           21 :         }
     299              : 
     300            9 :         Self {
     301            9 :             nodes: scheduler_nodes,
     302            9 :         }
     303            9 :     }
     304              : 
     305              :     /// For debug/support: check that our internal statistics are in sync with the state of
     306              :     /// the nodes & tenant shards.
     307              :     ///
     308              :     /// If anything is inconsistent, log details and return an error.
     309            1 :     pub(crate) fn consistency_check<'a>(
     310            1 :         &self,
     311            1 :         nodes: impl Iterator<Item = &'a Node>,
     312            1 :         shards: impl Iterator<Item = &'a TenantShard>,
     313            1 :     ) -> anyhow::Result<()> {
     314            1 :         let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
     315            4 :         for node in nodes {
     316            3 :             expect_nodes.insert(
     317            3 :                 node.get_id(),
     318            3 :                 SchedulerNode {
     319            3 :                     shard_count: 0,
     320            3 :                     attached_shard_count: 0,
     321            3 :                     may_schedule: node.may_schedule(),
     322            3 :                 },
     323            3 :             );
     324            3 :         }
     325              : 
     326            2 :         for shard in shards {
     327            1 :             if let Some(node_id) = shard.intent.get_attached() {
     328            1 :                 match expect_nodes.get_mut(node_id) {
     329            1 :                     Some(node) => {
     330            1 :                         node.shard_count += 1;
     331            1 :                         node.attached_shard_count += 1;
     332            1 :                     }
     333            0 :                     None => anyhow::bail!(
     334            0 :                         "Tenant {} references nonexistent node {}",
     335            0 :                         shard.tenant_shard_id,
     336            0 :                         node_id
     337            0 :                     ),
     338              :                 }
     339            0 :             }
     340              : 
     341            1 :             for node_id in shard.intent.get_secondary() {
     342            1 :                 match expect_nodes.get_mut(node_id) {
     343            1 :                     Some(node) => node.shard_count += 1,
     344            0 :                     None => anyhow::bail!(
     345            0 :                         "Tenant {} references nonexistent node {}",
     346            0 :                         shard.tenant_shard_id,
     347            0 :                         node_id
     348            0 :                     ),
     349              :                 }
     350              :             }
     351              :         }
     352              : 
     353            4 :         for (node_id, expect_node) in &expect_nodes {
     354            3 :             let Some(self_node) = self.nodes.get(node_id) else {
     355            0 :                 anyhow::bail!("Node {node_id} not found in Self")
     356              :             };
     357              : 
     358            3 :             if self_node != expect_node {
     359            0 :                 tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
     360            0 :                 tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
     361            0 :                 tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
     362              : 
     363            0 :                 anyhow::bail!("Inconsistent state on {node_id}");
     364            3 :             }
     365              :         }
     366              : 
     367            1 :         if expect_nodes.len() != self.nodes.len() {
     368              :             // We just checked that all the expected nodes are present.  If the lengths don't match,
     369              :             // it means that we have nodes in Self that are unexpected.
     370            0 :             for node_id in self.nodes.keys() {
     371            0 :                 if !expect_nodes.contains_key(node_id) {
     372            0 :                     anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
     373            0 :                 }
     374              :             }
     375            1 :         }
     376              : 
     377            1 :         Ok(())
     378            1 :     }
     379              : 
     380              :     /// Update the reference counts of a node. These reference counts are used to guide scheduling
     381              :     /// decisions, not for memory management: they represent the number of tenant shard whose IntentState
     382              :     /// targets this node and the number of tenants shars whose IntentState is attached to this
     383              :     /// node.
     384              :     ///
     385              :     /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
     386              :     /// [`Self::new`] or [`Self::node_upsert`])
     387          123 :     pub(crate) fn update_node_ref_counts(&mut self, node_id: NodeId, update: RefCountUpdate) {
     388          123 :         let Some(node) = self.nodes.get_mut(&node_id) else {
     389            0 :             debug_assert!(false);
     390            0 :             tracing::error!("Scheduler missing node {node_id}");
     391            0 :             return;
     392              :         };
     393              : 
     394          123 :         match update {
     395            5 :             RefCountUpdate::PromoteSecondary => {
     396            5 :                 node.attached_shard_count += 1;
     397            5 :             }
     398           32 :             RefCountUpdate::Attach => {
     399           32 :                 node.shard_count += 1;
     400           32 :                 node.attached_shard_count += 1;
     401           32 :             }
     402           31 :             RefCountUpdate::Detach => {
     403           31 :                 node.shard_count -= 1;
     404           31 :                 node.attached_shard_count -= 1;
     405           31 :             }
     406            5 :             RefCountUpdate::DemoteAttached => {
     407            5 :                 node.attached_shard_count -= 1;
     408            5 :             }
     409           25 :             RefCountUpdate::AddSecondary => {
     410           25 :                 node.shard_count += 1;
     411           25 :             }
     412           25 :             RefCountUpdate::RemoveSecondary => {
     413           25 :                 node.shard_count -= 1;
     414           25 :             }
     415              :         }
     416              : 
     417              :         // Maybe update PageserverUtilization
     418          123 :         match update {
     419              :             RefCountUpdate::AddSecondary | RefCountUpdate::Attach => {
     420              :                 // Referencing the node: if this takes our shard_count above the utilzation structure's
     421              :                 // shard count, then artifically bump it: this ensures that the scheduler immediately
     422              :                 // recognizes that this node has more work on it, without waiting for the next heartbeat
     423              :                 // to update the utilization.
     424           57 :                 if let MaySchedule::Yes(utilization) = &mut node.may_schedule {
     425           57 :                     utilization.adjust_shard_count_max(node.shard_count as u32);
     426           57 :                 }
     427              :             }
     428              :             RefCountUpdate::PromoteSecondary
     429              :             | RefCountUpdate::Detach
     430              :             | RefCountUpdate::RemoveSecondary
     431           66 :             | RefCountUpdate::DemoteAttached => {
     432           66 :                 // De-referencing the node: leave the utilization's shard_count at a stale higher
     433           66 :                 // value until some future heartbeat after we have physically removed this shard
     434           66 :                 // from the node: this prevents the scheduler over-optimistically trying to schedule
     435           66 :                 // more work onto the node before earlier detaches are done.
     436           66 :             }
     437              :         }
     438          123 :     }
     439              : 
     440              :     // Check if the number of shards attached to a given node is lagging below
     441              :     // the cluster average. If that's the case, the node should be filled.
     442            0 :     pub(crate) fn compute_fill_requirement(&self, node_id: NodeId) -> usize {
     443            0 :         let Some(node) = self.nodes.get(&node_id) else {
     444            0 :             debug_assert!(false);
     445            0 :             tracing::error!("Scheduler missing node {node_id}");
     446            0 :             return 0;
     447              :         };
     448            0 :         assert!(!self.nodes.is_empty());
     449            0 :         let expected_attached_shards_per_node = self.expected_attached_shard_count();
     450              : 
     451            0 :         for (node_id, node) in self.nodes.iter() {
     452            0 :             tracing::trace!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
     453              :         }
     454              : 
     455            0 :         if node.attached_shard_count < expected_attached_shards_per_node {
     456            0 :             expected_attached_shards_per_node - node.attached_shard_count
     457              :         } else {
     458            0 :             0
     459              :         }
     460            0 :     }
     461              : 
     462            0 :     pub(crate) fn expected_attached_shard_count(&self) -> usize {
     463            0 :         let total_attached_shards: usize =
     464            0 :             self.nodes.values().map(|n| n.attached_shard_count).sum();
     465            0 : 
     466            0 :         assert!(!self.nodes.is_empty());
     467            0 :         total_attached_shards / self.nodes.len()
     468            0 :     }
     469              : 
     470            0 :     pub(crate) fn nodes_by_attached_shard_count(&self) -> Vec<(NodeId, usize)> {
     471            0 :         self.nodes
     472            0 :             .iter()
     473            0 :             .map(|(node_id, stats)| (*node_id, stats.attached_shard_count))
     474            0 :             .sorted_by(|lhs, rhs| Ord::cmp(&lhs.1, &rhs.1).reverse())
     475            0 :             .collect()
     476            0 :     }
     477              : 
     478            9 :     pub(crate) fn node_upsert(&mut self, node: &Node) {
     479              :         use std::collections::hash_map::Entry::*;
     480            9 :         match self.nodes.entry(node.get_id()) {
     481            3 :             Occupied(mut entry) => {
     482            3 :                 // Updates to MaySchedule are how we receive updated PageserverUtilization: adjust these values
     483            3 :                 // to account for any shards scheduled on the controller but not yet visible to the pageserver.
     484            3 :                 let mut may_schedule = node.may_schedule();
     485            3 :                 match &mut may_schedule {
     486            2 :                     MaySchedule::Yes(utilization) => {
     487            2 :                         utilization.adjust_shard_count_max(entry.get().shard_count as u32);
     488            2 :                     }
     489            1 :                     MaySchedule::No => { // Nothing to tweak
     490            1 :                     }
     491              :                 }
     492              : 
     493            3 :                 entry.get_mut().may_schedule = may_schedule;
     494              :             }
     495            6 :             Vacant(entry) => {
     496            6 :                 entry.insert(SchedulerNode {
     497            6 :                     shard_count: 0,
     498            6 :                     attached_shard_count: 0,
     499            6 :                     may_schedule: node.may_schedule(),
     500            6 :                 });
     501            6 :             }
     502              :         }
     503            9 :     }
     504              : 
     505            0 :     pub(crate) fn node_remove(&mut self, node_id: NodeId) {
     506            0 :         if self.nodes.remove(&node_id).is_none() {
     507            0 :             tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
     508            0 :         }
     509            0 :     }
     510              : 
     511              :     /// Where we have several nodes to choose from, for example when picking a secondary location
     512              :     /// to promote to an attached location, this method may be used to pick the best choice based
     513              :     /// on the scheduler's knowledge of utilization and availability.
     514              :     ///
     515              :     /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
     516              :     /// caller can pick a node some other way.
     517           30 :     pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option<NodeId> {
     518           30 :         if nodes.is_empty() {
     519           28 :             return None;
     520            2 :         }
     521            2 : 
     522            2 :         // TODO: When the utilization score returned by the pageserver becomes meaningful,
     523            2 :         // schedule based on that instead of the shard count.
     524            2 :         let node = nodes
     525            2 :             .iter()
     526            4 :             .map(|node_id| {
     527            4 :                 let may_schedule = self
     528            4 :                     .nodes
     529            4 :                     .get(node_id)
     530            4 :                     .map(|n| !matches!(n.may_schedule, MaySchedule::No))
     531            4 :                     .unwrap_or(false);
     532            4 :                 (*node_id, may_schedule)
     533            4 :             })
     534            4 :             .max_by_key(|(_n, may_schedule)| *may_schedule);
     535            2 : 
     536            2 :         // If even the preferred node has may_schedule==false, return None
     537            2 :         node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
     538           30 :     }
     539              : 
     540              :     /// Compute a schedulling score for each node that the scheduler knows of
     541              :     /// minus a set of hard excluded nodes.
     542           62 :     fn compute_node_scores<Score>(
     543           62 :         &mut self,
     544           62 :         hard_exclude: &[NodeId],
     545           62 :         context: &ScheduleContext,
     546           62 :     ) -> Vec<Score>
     547           62 :     where
     548           62 :         Score: NodeSchedulingScore,
     549           62 :     {
     550           62 :         self.nodes
     551           62 :             .iter_mut()
     552          163 :             .filter_map(|(k, v)| {
     553          163 :                 if hard_exclude.contains(k) {
     554           55 :                     None
     555              :                 } else {
     556          108 :                     Score::generate(k, v, context)
     557              :                 }
     558          163 :             })
     559           62 :             .collect()
     560           62 :     }
     561              : 
     562              :     /// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
     563              :     /// are already in use by this shard -- we use this to avoid picking the same node
     564              :     /// as both attached and secondary location.  This is a hard constraint: if we cannot
     565              :     /// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
     566              :     ///
     567              :     /// context: we prefer to avoid using nodes identified in the context, according
     568              :     /// to their anti-affinity score.  We use this to prefeer to avoid placing shards in
     569              :     /// the same tenant on the same node.  This is a soft constraint: the context will never
     570              :     /// cause us to fail to schedule a shard.
     571           62 :     pub(crate) fn schedule_shard<Tag: ShardTag>(
     572           62 :         &mut self,
     573           62 :         hard_exclude: &[NodeId],
     574           62 :         context: &ScheduleContext,
     575           62 :     ) -> Result<NodeId, ScheduleError> {
     576           62 :         if self.nodes.is_empty() {
     577            0 :             return Err(ScheduleError::NoPageservers);
     578           62 :         }
     579           62 : 
     580           62 :         let mut scores = self.compute_node_scores::<Tag::Score>(hard_exclude, context);
     581           62 : 
     582           62 :         // Exclude nodes whose utilization is critically high, if there are alternatives available.  This will
     583           62 :         // cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example
     584           62 :         // we may place shards in the same tenant together on the same pageserver if all other pageservers are
     585           62 :         // overloaded.
     586           62 :         let non_overloaded_scores = scores
     587           62 :             .iter()
     588          108 :             .filter(|i| !i.is_overloaded())
     589           62 :             .copied()
     590           62 :             .collect::<Vec<_>>();
     591           62 :         if !non_overloaded_scores.is_empty() {
     592           54 :             scores = non_overloaded_scores;
     593           54 :         }
     594              : 
     595              :         // Sort the nodes by score. The one with the lowest scores will be the preferred node.
     596              :         // Refer to [`NodeAttachmentSchedulingScore`] for attached locations and
     597              :         // [`NodeSecondarySchedulingScore`] for secondary locations to understand how the nodes
     598              :         // are ranked.
     599           62 :         scores.sort();
     600           62 : 
     601           62 :         if scores.is_empty() {
     602              :             // After applying constraints, no pageservers were left.
     603            8 :             if !matches!(context.mode, ScheduleMode::Speculative) {
     604              :                 // If this was not a speculative attempt, log details to understand why we couldn't
     605              :                 // schedule: this may help an engineer understand if some nodes are marked offline
     606              :                 // in a way that's preventing progress.
     607            8 :                 tracing::info!(
     608            0 :                     "Scheduling failure, while excluding {hard_exclude:?}, node states:"
     609              :                 );
     610           24 :                 for (node_id, node) in &self.nodes {
     611           16 :                     tracing::info!(
     612            0 :                         "Node {node_id}: may_schedule={} shards={}",
     613            0 :                         !matches!(node.may_schedule, MaySchedule::No),
     614              :                         node.shard_count
     615              :                     );
     616              :                 }
     617            0 :             }
     618            8 :             return Err(ScheduleError::ImpossibleConstraint);
     619           54 :         }
     620           54 : 
     621           54 :         // Lowest score wins
     622           54 :         let node_id = scores.first().unwrap().node_id();
     623              : 
     624           54 :         if !matches!(context.mode, ScheduleMode::Speculative) {
     625           54 :             tracing::info!(
     626            0 :             "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
     627            0 :             scores.iter().map(|i| i.node_id().0).collect::<Vec<_>>()
     628              :         );
     629            0 :         }
     630              : 
     631              :         // Note that we do not update shard count here to reflect the scheduling: that
     632              :         // is IntentState's job when the scheduled location is used.
     633              : 
     634           54 :         Ok(node_id)
     635           62 :     }
     636              : 
     637              :     /// Unit test access to internal state
     638              :     #[cfg(test)]
     639           12 :     pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
     640           12 :         self.nodes.get(&node_id).unwrap().shard_count
     641           12 :     }
     642              : 
     643              :     #[cfg(test)]
     644           12 :     pub(crate) fn get_node_attached_shard_count(&self, node_id: NodeId) -> usize {
     645           12 :         self.nodes.get(&node_id).unwrap().attached_shard_count
     646           12 :     }
     647              : }
     648              : 
     649              : #[cfg(test)]
     650              : pub(crate) mod test_utils {
     651              : 
     652              :     use crate::node::Node;
     653              :     use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
     654              :     use std::collections::HashMap;
     655              :     use utils::id::NodeId;
     656              :     /// Test helper: synthesize the requested number of nodes, all in active state.
     657              :     ///
     658              :     /// Node IDs start at one.
     659            9 :     pub(crate) fn make_test_nodes(n: u64) -> HashMap<NodeId, Node> {
     660            9 :         (1..n + 1)
     661           27 :             .map(|i| {
     662           27 :                 (NodeId(i), {
     663           27 :                     let mut node = Node::new(
     664           27 :                         NodeId(i),
     665           27 :                         format!("httphost-{i}"),
     666           27 :                         80 + i as u16,
     667           27 :                         format!("pghost-{i}"),
     668           27 :                         5432 + i as u16,
     669           27 :                         "test-az".to_string(),
     670           27 :                     );
     671           27 :                     node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
     672           27 :                     assert!(node.is_available());
     673           27 :                     node
     674           27 :                 })
     675           27 :             })
     676            9 :             .collect()
     677            9 :     }
     678              : }
     679              : 
     680              : #[cfg(test)]
     681              : mod tests {
     682              :     use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
     683              : 
     684              :     use super::*;
     685              : 
     686              :     use crate::tenant_shard::IntentState;
     687              :     #[test]
     688            1 :     fn scheduler_basic() -> anyhow::Result<()> {
     689            1 :         let nodes = test_utils::make_test_nodes(2);
     690            1 : 
     691            1 :         let mut scheduler = Scheduler::new(nodes.values());
     692            1 :         let mut t1_intent = IntentState::new();
     693            1 :         let mut t2_intent = IntentState::new();
     694            1 : 
     695            1 :         let context = ScheduleContext::default();
     696              : 
     697            1 :         let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &context)?;
     698            1 :         t1_intent.set_attached(&mut scheduler, Some(scheduled));
     699            1 :         let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &context)?;
     700            1 :         t2_intent.set_attached(&mut scheduler, Some(scheduled));
     701            1 : 
     702            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
     703            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
     704              : 
     705            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
     706            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
     707              : 
     708            1 :         let scheduled =
     709            1 :             scheduler.schedule_shard::<AttachedShardTag>(&t1_intent.all_pageservers(), &context)?;
     710            1 :         t1_intent.push_secondary(&mut scheduler, scheduled);
     711            1 : 
     712            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
     713            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
     714              : 
     715            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
     716            1 :         assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
     717              : 
     718            1 :         t1_intent.clear(&mut scheduler);
     719            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
     720            1 :         assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
     721              : 
     722            1 :         let total_attached = scheduler.get_node_attached_shard_count(NodeId(1))
     723            1 :             + scheduler.get_node_attached_shard_count(NodeId(2));
     724            1 :         assert_eq!(total_attached, 1);
     725              : 
     726            1 :         if cfg!(debug_assertions) {
     727              :             // Dropping an IntentState without clearing it causes a panic in debug mode,
     728              :             // because we have failed to properly update scheduler shard counts.
     729            1 :             let result = std::panic::catch_unwind(move || {
     730            1 :                 drop(t2_intent);
     731            1 :             });
     732            1 :             assert!(result.is_err());
     733              :         } else {
     734            0 :             t2_intent.clear(&mut scheduler);
     735            0 : 
     736            0 :             assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
     737            0 :             assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 0);
     738              : 
     739            0 :             assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 0);
     740            0 :             assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0);
     741              :         }
     742              : 
     743            1 :         Ok(())
     744            1 :     }
     745              : 
     746              :     #[test]
     747              :     /// Test the PageserverUtilization's contribution to scheduling algorithm
     748            1 :     fn scheduler_utilization() {
     749            1 :         let mut nodes = test_utils::make_test_nodes(3);
     750            1 :         let mut scheduler = Scheduler::new(nodes.values());
     751            1 : 
     752            1 :         // Need to keep these alive because they contribute to shard counts via RAII
     753            1 :         let mut scheduled_intents = Vec::new();
     754            1 : 
     755            1 :         let empty_context = ScheduleContext::default();
     756              : 
     757           11 :         fn assert_scheduler_chooses(
     758           11 :             expect_node: NodeId,
     759           11 :             scheduled_intents: &mut Vec<IntentState>,
     760           11 :             scheduler: &mut Scheduler,
     761           11 :             context: &ScheduleContext,
     762           11 :         ) {
     763           11 :             let scheduled = scheduler
     764           11 :                 .schedule_shard::<AttachedShardTag>(&[], context)
     765           11 :                 .unwrap();
     766           11 :             let mut intent = IntentState::new();
     767           11 :             intent.set_attached(scheduler, Some(scheduled));
     768           11 :             scheduled_intents.push(intent);
     769           11 :             assert_eq!(scheduled, expect_node);
     770           11 :         }
     771              : 
     772              :         // Independent schedule calls onto empty nodes should round-robin, because each node's
     773              :         // utilization's shard count is updated inline.  The order is determinsitic because when all other factors are
     774              :         // equal, we order by node ID.
     775            1 :         assert_scheduler_chooses(
     776            1 :             NodeId(1),
     777            1 :             &mut scheduled_intents,
     778            1 :             &mut scheduler,
     779            1 :             &empty_context,
     780            1 :         );
     781            1 :         assert_scheduler_chooses(
     782            1 :             NodeId(2),
     783            1 :             &mut scheduled_intents,
     784            1 :             &mut scheduler,
     785            1 :             &empty_context,
     786            1 :         );
     787            1 :         assert_scheduler_chooses(
     788            1 :             NodeId(3),
     789            1 :             &mut scheduled_intents,
     790            1 :             &mut scheduler,
     791            1 :             &empty_context,
     792            1 :         );
     793            1 : 
     794            1 :         // Manually setting utilization higher should cause schedule calls to round-robin the other nodes
     795            1 :         // which have equal utilization.
     796            1 :         nodes
     797            1 :             .get_mut(&NodeId(1))
     798            1 :             .unwrap()
     799            1 :             .set_availability(NodeAvailability::Active(test_utilization::simple(
     800            1 :                 10,
     801            1 :                 1024 * 1024 * 1024,
     802            1 :             )));
     803            1 :         scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
     804            1 : 
     805            1 :         assert_scheduler_chooses(
     806            1 :             NodeId(2),
     807            1 :             &mut scheduled_intents,
     808            1 :             &mut scheduler,
     809            1 :             &empty_context,
     810            1 :         );
     811            1 :         assert_scheduler_chooses(
     812            1 :             NodeId(3),
     813            1 :             &mut scheduled_intents,
     814            1 :             &mut scheduler,
     815            1 :             &empty_context,
     816            1 :         );
     817            1 :         assert_scheduler_chooses(
     818            1 :             NodeId(2),
     819            1 :             &mut scheduled_intents,
     820            1 :             &mut scheduler,
     821            1 :             &empty_context,
     822            1 :         );
     823            1 :         assert_scheduler_chooses(
     824            1 :             NodeId(3),
     825            1 :             &mut scheduled_intents,
     826            1 :             &mut scheduler,
     827            1 :             &empty_context,
     828            1 :         );
     829            1 : 
     830            1 :         // The scheduler should prefer nodes with lower affinity score,
     831            1 :         // even if they have higher utilization (as long as they aren't utilized at >100%)
     832            1 :         let mut context_prefer_node1 = ScheduleContext::default();
     833            1 :         context_prefer_node1.avoid(&[NodeId(2), NodeId(3)]);
     834            1 :         assert_scheduler_chooses(
     835            1 :             NodeId(1),
     836            1 :             &mut scheduled_intents,
     837            1 :             &mut scheduler,
     838            1 :             &context_prefer_node1,
     839            1 :         );
     840            1 :         assert_scheduler_chooses(
     841            1 :             NodeId(1),
     842            1 :             &mut scheduled_intents,
     843            1 :             &mut scheduler,
     844            1 :             &context_prefer_node1,
     845            1 :         );
     846            1 : 
     847            1 :         // If a node is over-utilized, it will not be used even if affinity scores prefer it
     848            1 :         nodes
     849            1 :             .get_mut(&NodeId(1))
     850            1 :             .unwrap()
     851            1 :             .set_availability(NodeAvailability::Active(test_utilization::simple(
     852            1 :                 20000,
     853            1 :                 1024 * 1024 * 1024,
     854            1 :             )));
     855            1 :         scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
     856            1 :         assert_scheduler_chooses(
     857            1 :             NodeId(2),
     858            1 :             &mut scheduled_intents,
     859            1 :             &mut scheduler,
     860            1 :             &context_prefer_node1,
     861            1 :         );
     862            1 :         assert_scheduler_chooses(
     863            1 :             NodeId(3),
     864            1 :             &mut scheduled_intents,
     865            1 :             &mut scheduler,
     866            1 :             &context_prefer_node1,
     867            1 :         );
     868              : 
     869           12 :         for mut intent in scheduled_intents {
     870           11 :             intent.clear(&mut scheduler);
     871           11 :         }
     872            1 :     }
     873              : }
        

Generated by: LCOV version 2.1-beta