Line data Source code
1 : use pageserver_api::shard::TenantShardId;
2 : use std::collections::{BTreeMap, HashMap};
3 : use utils::{http::error::ApiError, id::NodeId};
4 :
5 : use crate::{node::Node, tenant_state::TenantState};
6 :
7 : /// Scenarios in which we cannot find a suitable location for a tenant shard
8 0 : #[derive(thiserror::Error, Debug)]
9 : pub enum ScheduleError {
10 : #[error("No pageservers found")]
11 : NoPageservers,
12 : #[error("No pageserver found matching constraint")]
13 : ImpossibleConstraint,
14 : }
15 :
16 : impl From<ScheduleError> for ApiError {
17 0 : fn from(value: ScheduleError) -> Self {
18 0 : ApiError::Conflict(format!("Scheduling error: {}", value))
19 0 : }
20 : }
21 :
22 : pub(crate) struct Scheduler {
23 : tenant_counts: HashMap<NodeId, usize>,
24 : }
25 :
26 : impl Scheduler {
27 1626 : pub(crate) fn new(
28 1626 : tenants: &BTreeMap<TenantShardId, TenantState>,
29 1626 : nodes: &HashMap<NodeId, Node>,
30 1626 : ) -> Self {
31 1626 : let mut tenant_counts = HashMap::new();
32 1626 : for node_id in nodes.keys() {
33 1410 : tenant_counts.insert(*node_id, 0);
34 1410 : }
35 :
36 1626 : for tenant in tenants.values() {
37 1422 : if let Some(ps) = tenant.intent.attached {
38 1421 : let entry = tenant_counts.entry(ps).or_insert(0);
39 1421 : *entry += 1;
40 1421 : }
41 : }
42 :
43 3036 : for (node_id, node) in nodes {
44 1410 : if !node.may_schedule() {
45 23 : tenant_counts.remove(node_id);
46 1387 : }
47 : }
48 :
49 1626 : Self { tenant_counts }
50 1626 : }
51 :
52 489 : pub(crate) fn schedule_shard(
53 489 : &mut self,
54 489 : hard_exclude: &[NodeId],
55 489 : ) -> Result<NodeId, ScheduleError> {
56 489 : if self.tenant_counts.is_empty() {
57 0 : return Err(ScheduleError::NoPageservers);
58 489 : }
59 489 :
60 489 : let mut tenant_counts: Vec<(NodeId, usize)> = self
61 489 : .tenant_counts
62 489 : .iter()
63 597 : .filter_map(|(k, v)| {
64 597 : if hard_exclude.contains(k) {
65 0 : None
66 : } else {
67 597 : Some((*k, *v))
68 : }
69 597 : })
70 489 : .collect();
71 489 :
72 489 : // Sort by tenant count. Nodes with the same tenant count are sorted by ID.
73 489 : tenant_counts.sort_by_key(|i| (i.1, i.0));
74 489 :
75 489 : if tenant_counts.is_empty() {
76 : // After applying constraints, no pageservers were left
77 0 : return Err(ScheduleError::ImpossibleConstraint);
78 489 : }
79 :
80 1086 : for (node_id, count) in &tenant_counts {
81 597 : tracing::info!("tenant_counts[{node_id}]={count}");
82 : }
83 :
84 489 : let node_id = tenant_counts.first().unwrap().0;
85 489 : tracing::info!("scheduler selected node {node_id}");
86 489 : *self.tenant_counts.get_mut(&node_id).unwrap() += 1;
87 489 : Ok(node_id)
88 489 : }
89 : }
|