Line data Source code
1 : use pageserver_api::shard::TenantShardId;
2 : use std::collections::{BTreeMap, HashMap};
3 : use utils::{http::error::ApiError, id::NodeId};
4 :
5 : use crate::{node::Node, tenant_state::TenantState};
6 :
7 : /// Scenarios in which we cannot find a suitable location for a tenant shard
8 0 : #[derive(thiserror::Error, Debug)]
9 : pub enum ScheduleError {
10 : #[error("No pageservers found")]
11 : NoPageservers,
12 : #[error("No pageserver found matching constraint")]
13 : ImpossibleConstraint,
14 : }
15 :
16 : impl From<ScheduleError> for ApiError {
17 0 : fn from(value: ScheduleError) -> Self {
18 0 : ApiError::Conflict(format!("Scheduling error: {}", value))
19 0 : }
20 : }
21 :
22 : pub(crate) struct Scheduler {
23 : tenant_counts: HashMap<NodeId, usize>,
24 : }
25 :
26 : impl Scheduler {
27 1627 : pub(crate) fn new(
28 1627 : tenants: &BTreeMap<TenantShardId, TenantState>,
29 1627 : nodes: &HashMap<NodeId, Node>,
30 1627 : ) -> Self {
31 1627 : let mut tenant_counts = HashMap::new();
32 1627 : for node_id in nodes.keys() {
33 1429 : tenant_counts.insert(*node_id, 0);
34 1429 : }
35 :
36 1627 : for tenant in tenants.values() {
37 1420 : if let Some(ps) = tenant.intent.attached {
38 1419 : let entry = tenant_counts.entry(ps).or_insert(0);
39 1419 : *entry += 1;
40 1419 : }
41 : }
42 :
43 3056 : for (node_id, node) in nodes {
44 1429 : if !node.may_schedule() {
45 17 : tenant_counts.remove(node_id);
46 1412 : }
47 : }
48 :
49 1627 : Self { tenant_counts }
50 1627 : }
51 :
52 495 : pub(crate) fn schedule_shard(
53 495 : &mut self,
54 495 : hard_exclude: &[NodeId],
55 495 : ) -> Result<NodeId, ScheduleError> {
56 495 : if self.tenant_counts.is_empty() {
57 0 : return Err(ScheduleError::NoPageservers);
58 495 : }
59 495 :
60 495 : let mut tenant_counts: Vec<(NodeId, usize)> = self
61 495 : .tenant_counts
62 495 : .iter()
63 635 : .filter_map(|(k, v)| {
64 635 : if hard_exclude.contains(k) {
65 0 : None
66 : } else {
67 635 : Some((*k, *v))
68 : }
69 635 : })
70 495 : .collect();
71 495 :
72 495 : // Sort by tenant count. Nodes with the same tenant count are sorted by ID.
73 495 : tenant_counts.sort_by_key(|i| (i.1, i.0));
74 495 :
75 495 : if tenant_counts.is_empty() {
76 : // After applying constraints, no pageservers were left
77 0 : return Err(ScheduleError::ImpossibleConstraint);
78 495 : }
79 :
80 1130 : for (node_id, count) in &tenant_counts {
81 635 : tracing::info!("tenant_counts[{node_id}]={count}");
82 : }
83 :
84 495 : let node_id = tenant_counts.first().unwrap().0;
85 495 : tracing::info!("scheduler selected node {node_id}");
86 495 : *self.tenant_counts.get_mut(&node_id).unwrap() += 1;
87 495 : Ok(node_id)
88 495 : }
89 : }
|