Line data Source code
1 : use pageserver_api::shard::TenantShardId;
2 : use std::collections::{BTreeMap, HashMap};
3 : use utils::{http::error::ApiError, id::NodeId};
4 :
5 : use crate::{node::Node, tenant_state::TenantState};
6 :
7 : /// Scenarios in which we cannot find a suitable location for a tenant shard
8 0 : #[derive(thiserror::Error, Debug)]
9 : pub enum ScheduleError {
10 : #[error("No pageservers found")]
11 : NoPageservers,
12 : #[error("No pageserver found matching constraint")]
13 : ImpossibleConstraint,
14 : }
15 :
16 : impl From<ScheduleError> for ApiError {
17 0 : fn from(value: ScheduleError) -> Self {
18 0 : ApiError::Conflict(format!("Scheduling error: {}", value))
19 0 : }
20 : }
21 :
22 : pub(crate) struct Scheduler {
23 : tenant_counts: HashMap<NodeId, usize>,
24 : }
25 :
26 : impl Scheduler {
27 1632 : pub(crate) fn new(
28 1632 : tenants: &BTreeMap<TenantShardId, TenantState>,
29 1632 : nodes: &HashMap<NodeId, Node>,
30 1632 : ) -> Self {
31 1632 : let mut tenant_counts = HashMap::new();
32 1632 : for node_id in nodes.keys() {
33 1433 : tenant_counts.insert(*node_id, 0);
34 1433 : }
35 :
36 1632 : for tenant in tenants.values() {
37 1424 : if let Some(ps) = tenant.intent.attached {
38 1423 : let entry = tenant_counts.entry(ps).or_insert(0);
39 1423 : *entry += 1;
40 1423 : }
41 : }
42 :
43 3065 : for (node_id, node) in nodes {
44 1433 : if !node.may_schedule() {
45 17 : tenant_counts.remove(node_id);
46 1416 : }
47 : }
48 :
49 1632 : Self { tenant_counts }
50 1632 : }
51 :
52 497 : pub(crate) fn schedule_shard(
53 497 : &mut self,
54 497 : hard_exclude: &[NodeId],
55 497 : ) -> Result<NodeId, ScheduleError> {
56 497 : if self.tenant_counts.is_empty() {
57 0 : return Err(ScheduleError::NoPageservers);
58 497 : }
59 497 :
60 497 : let mut tenant_counts: Vec<(NodeId, usize)> = self
61 497 : .tenant_counts
62 497 : .iter()
63 637 : .filter_map(|(k, v)| {
64 637 : if hard_exclude.contains(k) {
65 0 : None
66 : } else {
67 637 : Some((*k, *v))
68 : }
69 637 : })
70 497 : .collect();
71 497 :
72 497 : // Sort by tenant count. Nodes with the same tenant count are sorted by ID.
73 497 : tenant_counts.sort_by_key(|i| (i.1, i.0));
74 497 :
75 497 : if tenant_counts.is_empty() {
76 : // After applying constraints, no pageservers were left
77 0 : return Err(ScheduleError::ImpossibleConstraint);
78 497 : }
79 :
80 1134 : for (node_id, count) in &tenant_counts {
81 637 : tracing::info!("tenant_counts[{node_id}]={count}");
82 : }
83 :
84 497 : let node_id = tenant_counts.first().unwrap().0;
85 497 : tracing::info!("scheduler selected node {node_id}");
86 497 : *self.tenant_counts.get_mut(&node_id).unwrap() += 1;
87 497 : Ok(node_id)
88 497 : }
89 : }
|