Line data Source code
1 : use crate::{node::Node, tenant_state::TenantState};
2 : use serde::Serialize;
3 : use std::collections::HashMap;
4 : use utils::{http::error::ApiError, id::NodeId};
5 :
6 : /// Scenarios in which we cannot find a suitable location for a tenant shard
7 0 : #[derive(thiserror::Error, Debug)]
8 : pub enum ScheduleError {
9 : #[error("No pageservers found")]
10 : NoPageservers,
11 : #[error("No pageserver found matching constraint")]
12 : ImpossibleConstraint,
13 : }
14 :
15 : impl From<ScheduleError> for ApiError {
16 0 : fn from(value: ScheduleError) -> Self {
17 0 : ApiError::Conflict(format!("Scheduling error: {}", value))
18 0 : }
19 : }
20 :
21 0 : #[derive(Serialize, Eq, PartialEq)]
22 : struct SchedulerNode {
23 : /// How many shards are currently scheduled on this node, via their [`crate::tenant_state::IntentState`].
24 : shard_count: usize,
25 :
26 : /// Whether this node is currently elegible to have new shards scheduled (this is derived
27 : /// from a node's availability state and scheduling policy).
28 : may_schedule: bool,
29 : }
30 :
31 : /// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
32 : /// on which to run.
33 : ///
34 : /// The type has no persistent state of its own: this is all populated at startup. The Serialize
35 : /// impl is only for debug dumps.
36 0 : #[derive(Serialize)]
37 : pub(crate) struct Scheduler {
38 : nodes: HashMap<NodeId, SchedulerNode>,
39 : }
40 :
41 : impl Scheduler {
42 4 : pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
43 4 : let mut scheduler_nodes = HashMap::new();
44 14 : for node in nodes {
45 10 : scheduler_nodes.insert(
46 10 : node.id,
47 10 : SchedulerNode {
48 10 : shard_count: 0,
49 10 : may_schedule: node.may_schedule(),
50 10 : },
51 10 : );
52 10 : }
53 :
54 4 : Self {
55 4 : nodes: scheduler_nodes,
56 4 : }
57 4 : }
58 :
59 : /// For debug/support: check that our internal statistics are in sync with the state of
60 : /// the nodes & tenant shards.
61 : ///
62 : /// If anything is inconsistent, log details and return an error.
63 0 : pub(crate) fn consistency_check<'a>(
64 0 : &self,
65 0 : nodes: impl Iterator<Item = &'a Node>,
66 0 : shards: impl Iterator<Item = &'a TenantState>,
67 0 : ) -> anyhow::Result<()> {
68 0 : let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
69 0 : for node in nodes {
70 0 : expect_nodes.insert(
71 0 : node.id,
72 0 : SchedulerNode {
73 0 : shard_count: 0,
74 0 : may_schedule: node.may_schedule(),
75 0 : },
76 0 : );
77 0 : }
78 :
79 0 : for shard in shards {
80 0 : if let Some(node_id) = shard.intent.get_attached() {
81 0 : match expect_nodes.get_mut(node_id) {
82 0 : Some(node) => node.shard_count += 1,
83 0 : None => anyhow::bail!(
84 0 : "Tenant {} references nonexistent node {}",
85 0 : shard.tenant_shard_id,
86 0 : node_id
87 0 : ),
88 : }
89 0 : }
90 :
91 0 : for node_id in shard.intent.get_secondary() {
92 0 : match expect_nodes.get_mut(node_id) {
93 0 : Some(node) => node.shard_count += 1,
94 0 : None => anyhow::bail!(
95 0 : "Tenant {} references nonexistent node {}",
96 0 : shard.tenant_shard_id,
97 0 : node_id
98 0 : ),
99 : }
100 : }
101 : }
102 :
103 0 : for (node_id, expect_node) in &expect_nodes {
104 0 : let Some(self_node) = self.nodes.get(node_id) else {
105 0 : anyhow::bail!("Node {node_id} not found in Self")
106 : };
107 :
108 0 : if self_node != expect_node {
109 0 : tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
110 0 : tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
111 0 : tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
112 :
113 0 : anyhow::bail!("Inconsistent state on {node_id}");
114 0 : }
115 : }
116 :
117 0 : if expect_nodes.len() != self.nodes.len() {
118 : // We just checked that all the expected nodes are present. If the lengths don't match,
119 : // it means that we have nodes in Self that are unexpected.
120 0 : for node_id in self.nodes.keys() {
121 0 : if !expect_nodes.contains_key(node_id) {
122 0 : anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
123 0 : }
124 : }
125 0 : }
126 :
127 0 : Ok(())
128 0 : }
129 :
130 : /// Increment the reference count of a node. This reference count is used to guide scheduling
131 : /// decisions, not for memory management: it represents one tenant shard whose IntentState targets
132 : /// this node.
133 : ///
134 : /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
135 : /// [`Self::new`] or [`Self::node_upsert`])
136 10 : pub(crate) fn node_inc_ref(&mut self, node_id: NodeId) {
137 10 : let Some(node) = self.nodes.get_mut(&node_id) else {
138 0 : tracing::error!("Scheduler missing node {node_id}");
139 0 : debug_assert!(false);
140 0 : return;
141 : };
142 :
143 10 : node.shard_count += 1;
144 10 : }
145 :
146 : /// Decrement a node's reference count. Inverse of [`Self::node_inc_ref`].
147 8 : pub(crate) fn node_dec_ref(&mut self, node_id: NodeId) {
148 8 : let Some(node) = self.nodes.get_mut(&node_id) else {
149 0 : debug_assert!(false);
150 0 : tracing::error!("Scheduler missing node {node_id}");
151 0 : return;
152 : };
153 :
154 8 : node.shard_count -= 1;
155 8 : }
156 :
157 2 : pub(crate) fn node_upsert(&mut self, node: &Node) {
158 2 : use std::collections::hash_map::Entry::*;
159 2 : match self.nodes.entry(node.id) {
160 2 : Occupied(mut entry) => {
161 2 : entry.get_mut().may_schedule = node.may_schedule();
162 2 : }
163 0 : Vacant(entry) => {
164 0 : entry.insert(SchedulerNode {
165 0 : shard_count: 0,
166 0 : may_schedule: node.may_schedule(),
167 0 : });
168 0 : }
169 : }
170 2 : }
171 :
172 0 : pub(crate) fn node_remove(&mut self, node_id: NodeId) {
173 0 : if self.nodes.remove(&node_id).is_none() {
174 0 : tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
175 0 : }
176 0 : }
177 :
178 : /// Where we have several nodes to choose from, for example when picking a secondary location
179 : /// to promote to an attached location, this method may be used to pick the best choice based
180 : /// on the scheduler's knowledge of utilization and availability.
181 : ///
182 : /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
183 : /// caller can pick a node some other way.
184 4 : pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option<NodeId> {
185 4 : if nodes.is_empty() {
186 2 : return None;
187 2 : }
188 2 :
189 2 : let node = nodes
190 2 : .iter()
191 4 : .map(|node_id| {
192 4 : let may_schedule = self
193 4 : .nodes
194 4 : .get(node_id)
195 4 : .map(|n| n.may_schedule)
196 4 : .unwrap_or(false);
197 4 : (*node_id, may_schedule)
198 4 : })
199 4 : .max_by_key(|(_n, may_schedule)| *may_schedule);
200 2 :
201 2 : // If even the preferred node has may_schedule==false, return None
202 2 : node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
203 4 : }
204 :
205 10 : pub(crate) fn schedule_shard(&self, hard_exclude: &[NodeId]) -> Result<NodeId, ScheduleError> {
206 10 : if self.nodes.is_empty() {
207 0 : return Err(ScheduleError::NoPageservers);
208 10 : }
209 10 :
210 10 : let mut tenant_counts: Vec<(NodeId, usize)> = self
211 10 : .nodes
212 10 : .iter()
213 24 : .filter_map(|(k, v)| {
214 24 : if hard_exclude.contains(k) || !v.may_schedule {
215 4 : None
216 : } else {
217 20 : Some((*k, v.shard_count))
218 : }
219 24 : })
220 10 : .collect();
221 10 :
222 10 : // Sort by tenant count. Nodes with the same tenant count are sorted by ID.
223 22 : tenant_counts.sort_by_key(|i| (i.1, i.0));
224 10 :
225 10 : if tenant_counts.is_empty() {
226 : // After applying constraints, no pageservers were left. We log some detail about
227 : // the state of nodes to help understand why this happened. This is not logged as an error because
228 : // it is legitimately possible for enough nodes to be Offline to prevent scheduling a shard.
229 0 : tracing::info!("Scheduling failure, while excluding {hard_exclude:?}, node states:");
230 0 : for (node_id, node) in &self.nodes {
231 0 : tracing::info!(
232 0 : "Node {node_id}: may_schedule={} shards={}",
233 0 : node.may_schedule,
234 0 : node.shard_count
235 0 : );
236 : }
237 :
238 0 : return Err(ScheduleError::ImpossibleConstraint);
239 10 : }
240 10 :
241 10 : let node_id = tenant_counts.first().unwrap().0;
242 10 : tracing::info!(
243 0 : "scheduler selected node {node_id} (elegible nodes {:?}, exclude: {hard_exclude:?})",
244 0 : tenant_counts.iter().map(|i| i.0 .0).collect::<Vec<_>>()
245 0 : );
246 :
247 : // Note that we do not update shard count here to reflect the scheduling: that
248 : // is IntentState's job when the scheduled location is used.
249 :
250 10 : Ok(node_id)
251 10 : }
252 : }
253 :
254 : #[cfg(test)]
255 : pub(crate) mod test_utils {
256 :
257 : use crate::node::Node;
258 : use pageserver_api::controller_api::{NodeAvailability, NodeSchedulingPolicy};
259 : use std::collections::HashMap;
260 : use utils::id::NodeId;
261 : /// Test helper: synthesize the requested number of nodes, all in active state.
262 : ///
263 : /// Node IDs start at one.
264 4 : pub(crate) fn make_test_nodes(n: u64) -> HashMap<NodeId, Node> {
265 4 : (1..n + 1)
266 10 : .map(|i| {
267 10 : (
268 10 : NodeId(i),
269 10 : Node {
270 10 : id: NodeId(i),
271 10 : availability: NodeAvailability::Active,
272 10 : scheduling: NodeSchedulingPolicy::Active,
273 10 : listen_http_addr: format!("httphost-{i}"),
274 10 : listen_http_port: 80 + i as u16,
275 10 : listen_pg_addr: format!("pghost-{i}"),
276 10 : listen_pg_port: 5432 + i as u16,
277 10 : },
278 10 : )
279 10 : })
280 4 : .collect()
281 4 : }
282 : }
283 :
284 : #[cfg(test)]
285 : mod tests {
286 : use super::*;
287 : use utils::id::NodeId;
288 :
289 : use crate::tenant_state::IntentState;
290 2 : #[test]
291 2 : fn scheduler_basic() -> anyhow::Result<()> {
292 2 : let nodes = test_utils::make_test_nodes(2);
293 2 :
294 2 : let mut scheduler = Scheduler::new(nodes.values());
295 2 : let mut t1_intent = IntentState::new();
296 2 : let mut t2_intent = IntentState::new();
297 :
298 2 : let scheduled = scheduler.schedule_shard(&[])?;
299 2 : t1_intent.set_attached(&mut scheduler, Some(scheduled));
300 2 : let scheduled = scheduler.schedule_shard(&[])?;
301 2 : t2_intent.set_attached(&mut scheduler, Some(scheduled));
302 2 :
303 2 : assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
304 2 : assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
305 :
306 2 : let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers())?;
307 2 : t1_intent.push_secondary(&mut scheduler, scheduled);
308 2 :
309 2 : assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
310 2 : assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 2);
311 :
312 2 : t1_intent.clear(&mut scheduler);
313 2 : assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
314 2 : assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
315 :
316 2 : if cfg!(debug_assertions) {
317 : // Dropping an IntentState without clearing it causes a panic in debug mode,
318 : // because we have failed to properly update scheduler shard counts.
319 2 : let result = std::panic::catch_unwind(move || {
320 2 : drop(t2_intent);
321 2 : });
322 2 : assert!(result.is_err());
323 : } else {
324 0 : t2_intent.clear(&mut scheduler);
325 0 : assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
326 0 : assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 0);
327 : }
328 :
329 2 : Ok(())
330 2 : }
331 : }
|