Line data Source code
1 : use crate::{node::Node, tenant_shard::TenantShard};
2 : use pageserver_api::controller_api::UtilizationScore;
3 : use serde::Serialize;
4 : use std::collections::HashMap;
5 : use utils::{http::error::ApiError, id::NodeId};
6 :
7 : /// Scenarios in which we cannot find a suitable location for a tenant shard
8 0 : #[derive(thiserror::Error, Debug)]
9 : pub enum ScheduleError {
10 : #[error("No pageservers found")]
11 : NoPageservers,
12 : #[error("No pageserver found matching constraint")]
13 : ImpossibleConstraint,
14 : }
15 :
16 : impl From<ScheduleError> for ApiError {
17 0 : fn from(value: ScheduleError) -> Self {
18 0 : ApiError::Conflict(format!("Scheduling error: {}", value))
19 0 : }
20 : }
21 :
22 : #[derive(Serialize, Eq, PartialEq)]
23 : pub enum MaySchedule {
24 : Yes(UtilizationScore),
25 : No,
26 : }
27 :
28 : #[derive(Serialize)]
29 : struct SchedulerNode {
30 : /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
31 : shard_count: usize,
32 :
33 : /// Whether this node is currently elegible to have new shards scheduled (this is derived
34 : /// from a node's availability state and scheduling policy).
35 : may_schedule: MaySchedule,
36 : }
37 :
38 : impl PartialEq for SchedulerNode {
39 6 : fn eq(&self, other: &Self) -> bool {
40 6 : let may_schedule_matches = matches!(
41 6 : (&self.may_schedule, &other.may_schedule),
42 : (MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
43 : );
44 :
45 6 : may_schedule_matches && self.shard_count == other.shard_count
46 6 : }
47 : }
48 :
49 : impl Eq for SchedulerNode {}
50 :
51 : /// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
52 : /// on which to run.
53 : ///
54 : /// The type has no persistent state of its own: this is all populated at startup. The Serialize
55 : /// impl is only for debug dumps.
56 : #[derive(Serialize)]
57 : pub(crate) struct Scheduler {
58 : nodes: HashMap<NodeId, SchedulerNode>,
59 : }
60 :
61 : /// Score for soft constraint scheduling: lower scores are preferred to higher scores.
62 : ///
63 : /// For example, we may set an affinity score based on the number of shards from the same
64 : /// tenant already on a node, to implicitly prefer to balance out shards.
65 : #[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
66 : pub(crate) struct AffinityScore(pub(crate) usize);
67 :
68 : impl AffinityScore {
69 : /// If we have no anti-affinity at all toward a node, this is its score. It means
70 : /// the scheduler has a free choice amongst nodes with this score, and may pick a node
71 : /// based on other information such as total utilization.
72 : pub(crate) const FREE: Self = Self(0);
73 :
74 188 : pub(crate) fn inc(&mut self) {
75 188 : self.0 += 1;
76 188 : }
77 : }
78 :
79 : impl std::ops::Add for AffinityScore {
80 : type Output = Self;
81 :
82 54 : fn add(self, rhs: Self) -> Self::Output {
83 54 : Self(self.0 + rhs.0)
84 54 : }
85 : }
86 :
87 : /// Hint for whether this is a sincere attempt to schedule, or a speculative
88 : /// check for where we _would_ schedule (done during optimization)
89 : #[derive(Debug)]
90 : pub(crate) enum ScheduleMode {
91 : Normal,
92 : Speculative,
93 : }
94 :
95 : impl Default for ScheduleMode {
96 32 : fn default() -> Self {
97 32 : Self::Normal
98 32 : }
99 : }
100 :
101 : // For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
102 : // it for many shards in the same tenant.
103 : #[derive(Debug, Default)]
104 : pub(crate) struct ScheduleContext {
105 : /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
106 : pub(crate) nodes: HashMap<NodeId, AffinityScore>,
107 :
108 : /// Specifically how many _attached_ locations are on each node
109 : pub(crate) attached_nodes: HashMap<NodeId, usize>,
110 :
111 : pub(crate) mode: ScheduleMode,
112 : }
113 :
114 : impl ScheduleContext {
115 : /// Input is a list of nodes we would like to avoid using again within this context. The more
116 : /// times a node is passed into this call, the less inclined we are to use it.
117 96 : pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
118 284 : for node_id in nodes {
119 188 : let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
120 188 : entry.inc()
121 : }
122 96 : }
123 :
124 94 : pub(crate) fn push_attached(&mut self, node_id: NodeId) {
125 94 : let entry = self.attached_nodes.entry(node_id).or_default();
126 94 : *entry += 1;
127 94 : }
128 :
129 120 : pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
130 120 : self.nodes
131 120 : .get(&node_id)
132 120 : .copied()
133 120 : .unwrap_or(AffinityScore::FREE)
134 120 : }
135 :
136 120 : pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
137 120 : self.attached_nodes.get(&node_id).copied().unwrap_or(0)
138 120 : }
139 : }
140 :
141 : impl Scheduler {
142 14 : pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
143 14 : let mut scheduler_nodes = HashMap::new();
144 50 : for node in nodes {
145 36 : scheduler_nodes.insert(
146 36 : node.get_id(),
147 36 : SchedulerNode {
148 36 : shard_count: 0,
149 36 : may_schedule: node.may_schedule(),
150 36 : },
151 36 : );
152 36 : }
153 :
154 14 : Self {
155 14 : nodes: scheduler_nodes,
156 14 : }
157 14 : }
158 :
159 : /// For debug/support: check that our internal statistics are in sync with the state of
160 : /// the nodes & tenant shards.
161 : ///
162 : /// If anything is inconsistent, log details and return an error.
163 2 : pub(crate) fn consistency_check<'a>(
164 2 : &self,
165 2 : nodes: impl Iterator<Item = &'a Node>,
166 2 : shards: impl Iterator<Item = &'a TenantShard>,
167 2 : ) -> anyhow::Result<()> {
168 2 : let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
169 8 : for node in nodes {
170 6 : expect_nodes.insert(
171 6 : node.get_id(),
172 6 : SchedulerNode {
173 6 : shard_count: 0,
174 6 : may_schedule: node.may_schedule(),
175 6 : },
176 6 : );
177 6 : }
178 :
179 4 : for shard in shards {
180 2 : if let Some(node_id) = shard.intent.get_attached() {
181 2 : match expect_nodes.get_mut(node_id) {
182 2 : Some(node) => node.shard_count += 1,
183 0 : None => anyhow::bail!(
184 0 : "Tenant {} references nonexistent node {}",
185 0 : shard.tenant_shard_id,
186 0 : node_id
187 0 : ),
188 : }
189 0 : }
190 :
191 2 : for node_id in shard.intent.get_secondary() {
192 2 : match expect_nodes.get_mut(node_id) {
193 2 : Some(node) => node.shard_count += 1,
194 0 : None => anyhow::bail!(
195 0 : "Tenant {} references nonexistent node {}",
196 0 : shard.tenant_shard_id,
197 0 : node_id
198 0 : ),
199 : }
200 : }
201 : }
202 :
203 8 : for (node_id, expect_node) in &expect_nodes {
204 6 : let Some(self_node) = self.nodes.get(node_id) else {
205 0 : anyhow::bail!("Node {node_id} not found in Self")
206 : };
207 :
208 6 : if self_node != expect_node {
209 0 : tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
210 0 : tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
211 0 : tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
212 :
213 0 : anyhow::bail!("Inconsistent state on {node_id}");
214 6 : }
215 : }
216 :
217 2 : if expect_nodes.len() != self.nodes.len() {
218 : // We just checked that all the expected nodes are present. If the lengths don't match,
219 : // it means that we have nodes in Self that are unexpected.
220 0 : for node_id in self.nodes.keys() {
221 0 : if !expect_nodes.contains_key(node_id) {
222 0 : anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
223 0 : }
224 : }
225 2 : }
226 :
227 2 : Ok(())
228 2 : }
229 :
230 : /// Increment the reference count of a node. This reference count is used to guide scheduling
231 : /// decisions, not for memory management: it represents one tenant shard whose IntentState targets
232 : /// this node.
233 : ///
234 : /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
235 : /// [`Self::new`] or [`Self::node_upsert`])
236 60 : pub(crate) fn node_inc_ref(&mut self, node_id: NodeId) {
237 60 : let Some(node) = self.nodes.get_mut(&node_id) else {
238 0 : tracing::error!("Scheduler missing node {node_id}");
239 0 : debug_assert!(false);
240 0 : return;
241 : };
242 :
243 60 : node.shard_count += 1;
244 60 : }
245 :
246 : /// Decrement a node's reference count. Inverse of [`Self::node_inc_ref`].
247 58 : pub(crate) fn node_dec_ref(&mut self, node_id: NodeId) {
248 58 : let Some(node) = self.nodes.get_mut(&node_id) else {
249 0 : debug_assert!(false);
250 0 : tracing::error!("Scheduler missing node {node_id}");
251 0 : return;
252 : };
253 :
254 58 : node.shard_count -= 1;
255 58 : }
256 :
257 10 : pub(crate) fn node_upsert(&mut self, node: &Node) {
258 10 : use std::collections::hash_map::Entry::*;
259 10 : match self.nodes.entry(node.get_id()) {
260 2 : Occupied(mut entry) => {
261 2 : entry.get_mut().may_schedule = node.may_schedule();
262 2 : }
263 8 : Vacant(entry) => {
264 8 : entry.insert(SchedulerNode {
265 8 : shard_count: 0,
266 8 : may_schedule: node.may_schedule(),
267 8 : });
268 8 : }
269 : }
270 10 : }
271 :
272 0 : pub(crate) fn node_remove(&mut self, node_id: NodeId) {
273 0 : if self.nodes.remove(&node_id).is_none() {
274 0 : tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
275 0 : }
276 0 : }
277 :
278 : /// Where we have several nodes to choose from, for example when picking a secondary location
279 : /// to promote to an attached location, this method may be used to pick the best choice based
280 : /// on the scheduler's knowledge of utilization and availability.
281 : ///
282 : /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
283 : /// caller can pick a node some other way.
284 28 : pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option<NodeId> {
285 28 : if nodes.is_empty() {
286 24 : return None;
287 4 : }
288 4 :
289 4 : // TODO: When the utilization score returned by the pageserver becomes meaningful,
290 4 : // schedule based on that instead of the shard count.
291 4 : let node = nodes
292 4 : .iter()
293 8 : .map(|node_id| {
294 8 : let may_schedule = self
295 8 : .nodes
296 8 : .get(node_id)
297 8 : .map(|n| n.may_schedule != MaySchedule::No)
298 8 : .unwrap_or(false);
299 8 : (*node_id, may_schedule)
300 8 : })
301 8 : .max_by_key(|(_n, may_schedule)| *may_schedule);
302 4 :
303 4 : // If even the preferred node has may_schedule==false, return None
304 4 : node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
305 28 : }
306 :
307 : /// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
308 : /// are already in use by this shard -- we use this to avoid picking the same node
309 : /// as both attached and secondary location. This is a hard constraint: if we cannot
310 : /// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
311 : ///
312 : /// context: we prefer to avoid using nodes identified in the context, according
313 : /// to their anti-affinity score. We use this to prefeer to avoid placing shards in
314 : /// the same tenant on the same node. This is a soft constraint: the context will never
315 : /// cause us to fail to schedule a shard.
316 60 : pub(crate) fn schedule_shard(
317 60 : &self,
318 60 : hard_exclude: &[NodeId],
319 60 : context: &ScheduleContext,
320 60 : ) -> Result<NodeId, ScheduleError> {
321 60 : if self.nodes.is_empty() {
322 0 : return Err(ScheduleError::NoPageservers);
323 60 : }
324 60 :
325 60 : let mut scores: Vec<(NodeId, AffinityScore, usize)> = self
326 60 : .nodes
327 60 : .iter()
328 188 : .filter_map(|(k, v)| {
329 188 : if hard_exclude.contains(k) || v.may_schedule == MaySchedule::No {
330 74 : None
331 : } else {
332 114 : Some((
333 114 : *k,
334 114 : context.nodes.get(k).copied().unwrap_or(AffinityScore::FREE),
335 114 : v.shard_count,
336 114 : ))
337 : }
338 188 : })
339 60 : .collect();
340 60 :
341 60 : // Sort by, in order of precedence:
342 60 : // 1st: Affinity score. We should never pick a higher-score node if a lower-score node is available
343 60 : // 2nd: Utilization. Within nodes with the same affinity, use the least loaded nodes.
344 60 : // 3rd: Node ID. This is a convenience to make selection deterministic in tests and empty systems.
345 116 : scores.sort_by_key(|i| (i.1, i.2, i.0));
346 60 :
347 60 : if scores.is_empty() {
348 : // After applying constraints, no pageservers were left.
349 0 : if !matches!(context.mode, ScheduleMode::Speculative) {
350 : // If this was not a speculative attempt, log details to understand why we couldn't
351 : // schedule: this may help an engineer understand if some nodes are marked offline
352 : // in a way that's preventing progress.
353 0 : tracing::info!(
354 0 : "Scheduling failure, while excluding {hard_exclude:?}, node states:"
355 : );
356 0 : for (node_id, node) in &self.nodes {
357 0 : tracing::info!(
358 0 : "Node {node_id}: may_schedule={} shards={}",
359 0 : node.may_schedule != MaySchedule::No,
360 : node.shard_count
361 : );
362 : }
363 0 : }
364 0 : return Err(ScheduleError::ImpossibleConstraint);
365 60 : }
366 60 :
367 60 : // Lowest score wins
368 60 : let node_id = scores.first().unwrap().0;
369 :
370 60 : if !matches!(context.mode, ScheduleMode::Speculative) {
371 60 : tracing::info!(
372 0 : "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
373 0 : scores.iter().map(|i| i.0 .0).collect::<Vec<_>>()
374 : );
375 0 : }
376 :
377 : // Note that we do not update shard count here to reflect the scheduling: that
378 : // is IntentState's job when the scheduled location is used.
379 :
380 60 : Ok(node_id)
381 60 : }
382 :
383 : /// Unit test access to internal state
384 : #[cfg(test)]
385 12 : pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
386 12 : self.nodes.get(&node_id).unwrap().shard_count
387 12 : }
388 : }
389 :
390 : #[cfg(test)]
391 : pub(crate) mod test_utils {
392 :
393 : use crate::node::Node;
394 : use pageserver_api::controller_api::{NodeAvailability, UtilizationScore};
395 : use std::collections::HashMap;
396 : use utils::id::NodeId;
397 : /// Test helper: synthesize the requested number of nodes, all in active state.
398 : ///
399 : /// Node IDs start at one.
400 14 : pub(crate) fn make_test_nodes(n: u64) -> HashMap<NodeId, Node> {
401 14 : (1..n + 1)
402 44 : .map(|i| {
403 44 : (NodeId(i), {
404 44 : let mut node = Node::new(
405 44 : NodeId(i),
406 44 : format!("httphost-{i}"),
407 44 : 80 + i as u16,
408 44 : format!("pghost-{i}"),
409 44 : 5432 + i as u16,
410 44 : );
411 44 : node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
412 44 : assert!(node.is_available());
413 44 : node
414 44 : })
415 44 : })
416 14 : .collect()
417 14 : }
418 : }
419 :
420 : #[cfg(test)]
421 : mod tests {
422 : use super::*;
423 :
424 : use crate::tenant_shard::IntentState;
425 : #[test]
426 2 : fn scheduler_basic() -> anyhow::Result<()> {
427 2 : let nodes = test_utils::make_test_nodes(2);
428 2 :
429 2 : let mut scheduler = Scheduler::new(nodes.values());
430 2 : let mut t1_intent = IntentState::new();
431 2 : let mut t2_intent = IntentState::new();
432 2 :
433 2 : let context = ScheduleContext::default();
434 :
435 2 : let scheduled = scheduler.schedule_shard(&[], &context)?;
436 2 : t1_intent.set_attached(&mut scheduler, Some(scheduled));
437 2 : let scheduled = scheduler.schedule_shard(&[], &context)?;
438 2 : t2_intent.set_attached(&mut scheduler, Some(scheduled));
439 2 :
440 2 : assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
441 2 : assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
442 :
443 2 : let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers(), &context)?;
444 2 : t1_intent.push_secondary(&mut scheduler, scheduled);
445 2 :
446 2 : assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
447 2 : assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 2);
448 :
449 2 : t1_intent.clear(&mut scheduler);
450 2 : assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
451 2 : assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
452 :
453 2 : if cfg!(debug_assertions) {
454 : // Dropping an IntentState without clearing it causes a panic in debug mode,
455 : // because we have failed to properly update scheduler shard counts.
456 2 : let result = std::panic::catch_unwind(move || {
457 2 : drop(t2_intent);
458 2 : });
459 2 : assert!(result.is_err());
460 : } else {
461 0 : t2_intent.clear(&mut scheduler);
462 0 : assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
463 0 : assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 0);
464 : }
465 :
466 2 : Ok(())
467 2 : }
468 : }
|