Line data Source code
1 : use crate::{node::Node, tenant_shard::TenantShard};
2 : use itertools::Itertools;
3 : use pageserver_api::controller_api::UtilizationScore;
4 : use serde::Serialize;
5 : use std::collections::HashMap;
6 : use utils::{http::error::ApiError, id::NodeId};
7 :
8 : /// Scenarios in which we cannot find a suitable location for a tenant shard
9 0 : #[derive(thiserror::Error, Debug)]
10 : pub enum ScheduleError {
11 : #[error("No pageservers found")]
12 : NoPageservers,
13 : #[error("No pageserver found matching constraint")]
14 : ImpossibleConstraint,
15 : }
16 :
17 : impl From<ScheduleError> for ApiError {
18 0 : fn from(value: ScheduleError) -> Self {
19 0 : ApiError::Conflict(format!("Scheduling error: {}", value))
20 0 : }
21 : }
22 :
23 : #[derive(Serialize, Eq, PartialEq)]
24 : pub enum MaySchedule {
25 : Yes(UtilizationScore),
26 : No,
27 : }
28 :
29 : #[derive(Serialize)]
30 : struct SchedulerNode {
31 : /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
32 : shard_count: usize,
33 : /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
34 : attached_shard_count: usize,
35 :
36 : /// Whether this node is currently elegible to have new shards scheduled (this is derived
37 : /// from a node's availability state and scheduling policy).
38 : may_schedule: MaySchedule,
39 : }
40 :
41 : impl PartialEq for SchedulerNode {
42 6 : fn eq(&self, other: &Self) -> bool {
43 6 : let may_schedule_matches = matches!(
44 6 : (&self.may_schedule, &other.may_schedule),
45 : (MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
46 : );
47 :
48 6 : may_schedule_matches
49 6 : && self.shard_count == other.shard_count
50 6 : && self.attached_shard_count == other.attached_shard_count
51 6 : }
52 : }
53 :
54 : impl Eq for SchedulerNode {}
55 :
56 : /// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
57 : /// on which to run.
58 : ///
59 : /// The type has no persistent state of its own: this is all populated at startup. The Serialize
60 : /// impl is only for debug dumps.
61 : #[derive(Serialize)]
62 : pub(crate) struct Scheduler {
63 : nodes: HashMap<NodeId, SchedulerNode>,
64 : }
65 :
66 : /// Score for soft constraint scheduling: lower scores are preferred to higher scores.
67 : ///
68 : /// For example, we may set an affinity score based on the number of shards from the same
69 : /// tenant already on a node, to implicitly prefer to balance out shards.
70 : #[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
71 : pub(crate) struct AffinityScore(pub(crate) usize);
72 :
73 : impl AffinityScore {
74 : /// If we have no anti-affinity at all toward a node, this is its score. It means
75 : /// the scheduler has a free choice amongst nodes with this score, and may pick a node
76 : /// based on other information such as total utilization.
77 : pub(crate) const FREE: Self = Self(0);
78 :
79 156 : pub(crate) fn inc(&mut self) {
80 156 : self.0 += 1;
81 156 : }
82 : }
83 :
84 : impl std::ops::Add for AffinityScore {
85 : type Output = Self;
86 :
87 40 : fn add(self, rhs: Self) -> Self::Output {
88 40 : Self(self.0 + rhs.0)
89 40 : }
90 : }
91 :
92 : /// Hint for whether this is a sincere attempt to schedule, or a speculative
93 : /// check for where we _would_ schedule (done during optimization)
94 : #[derive(Debug)]
95 : pub(crate) enum ScheduleMode {
96 : Normal,
97 : Speculative,
98 : }
99 :
100 : impl Default for ScheduleMode {
101 28 : fn default() -> Self {
102 28 : Self::Normal
103 28 : }
104 : }
105 :
106 : // For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
107 : // it for many shards in the same tenant.
108 : #[derive(Debug, Default)]
109 : pub(crate) struct ScheduleContext {
110 : /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
111 : pub(crate) nodes: HashMap<NodeId, AffinityScore>,
112 :
113 : /// Specifically how many _attached_ locations are on each node
114 : pub(crate) attached_nodes: HashMap<NodeId, usize>,
115 :
116 : pub(crate) mode: ScheduleMode,
117 : }
118 :
119 : impl ScheduleContext {
120 : /// Input is a list of nodes we would like to avoid using again within this context. The more
121 : /// times a node is passed into this call, the less inclined we are to use it.
122 80 : pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
123 236 : for node_id in nodes {
124 156 : let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
125 156 : entry.inc()
126 : }
127 80 : }
128 :
129 78 : pub(crate) fn push_attached(&mut self, node_id: NodeId) {
130 78 : let entry = self.attached_nodes.entry(node_id).or_default();
131 78 : *entry += 1;
132 78 : }
133 :
134 90 : pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
135 90 : self.nodes
136 90 : .get(&node_id)
137 90 : .copied()
138 90 : .unwrap_or(AffinityScore::FREE)
139 90 : }
140 :
141 90 : pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
142 90 : self.attached_nodes.get(&node_id).copied().unwrap_or(0)
143 90 : }
144 : }
145 :
146 : pub(crate) enum RefCountUpdate {
147 : PromoteSecondary,
148 : Attach,
149 : Detach,
150 : DemoteAttached,
151 : AddSecondary,
152 : RemoveSecondary,
153 : }
154 :
155 : impl Scheduler {
156 14 : pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
157 14 : let mut scheduler_nodes = HashMap::new();
158 50 : for node in nodes {
159 36 : scheduler_nodes.insert(
160 36 : node.get_id(),
161 36 : SchedulerNode {
162 36 : shard_count: 0,
163 36 : attached_shard_count: 0,
164 36 : may_schedule: node.may_schedule(),
165 36 : },
166 36 : );
167 36 : }
168 :
169 14 : Self {
170 14 : nodes: scheduler_nodes,
171 14 : }
172 14 : }
173 :
174 : /// For debug/support: check that our internal statistics are in sync with the state of
175 : /// the nodes & tenant shards.
176 : ///
177 : /// If anything is inconsistent, log details and return an error.
178 2 : pub(crate) fn consistency_check<'a>(
179 2 : &self,
180 2 : nodes: impl Iterator<Item = &'a Node>,
181 2 : shards: impl Iterator<Item = &'a TenantShard>,
182 2 : ) -> anyhow::Result<()> {
183 2 : let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
184 8 : for node in nodes {
185 6 : expect_nodes.insert(
186 6 : node.get_id(),
187 6 : SchedulerNode {
188 6 : shard_count: 0,
189 6 : attached_shard_count: 0,
190 6 : may_schedule: node.may_schedule(),
191 6 : },
192 6 : );
193 6 : }
194 :
195 4 : for shard in shards {
196 2 : if let Some(node_id) = shard.intent.get_attached() {
197 2 : match expect_nodes.get_mut(node_id) {
198 2 : Some(node) => {
199 2 : node.shard_count += 1;
200 2 : node.attached_shard_count += 1;
201 2 : }
202 0 : None => anyhow::bail!(
203 0 : "Tenant {} references nonexistent node {}",
204 0 : shard.tenant_shard_id,
205 0 : node_id
206 0 : ),
207 : }
208 0 : }
209 :
210 2 : for node_id in shard.intent.get_secondary() {
211 2 : match expect_nodes.get_mut(node_id) {
212 2 : Some(node) => node.shard_count += 1,
213 0 : None => anyhow::bail!(
214 0 : "Tenant {} references nonexistent node {}",
215 0 : shard.tenant_shard_id,
216 0 : node_id
217 0 : ),
218 : }
219 : }
220 : }
221 :
222 8 : for (node_id, expect_node) in &expect_nodes {
223 6 : let Some(self_node) = self.nodes.get(node_id) else {
224 0 : anyhow::bail!("Node {node_id} not found in Self")
225 : };
226 :
227 6 : if self_node != expect_node {
228 0 : tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
229 0 : tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
230 0 : tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
231 :
232 0 : anyhow::bail!("Inconsistent state on {node_id}");
233 6 : }
234 : }
235 :
236 2 : if expect_nodes.len() != self.nodes.len() {
237 : // We just checked that all the expected nodes are present. If the lengths don't match,
238 : // it means that we have nodes in Self that are unexpected.
239 0 : for node_id in self.nodes.keys() {
240 0 : if !expect_nodes.contains_key(node_id) {
241 0 : anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
242 0 : }
243 : }
244 2 : }
245 :
246 2 : Ok(())
247 2 : }
248 :
249 : /// Update the reference counts of a node. These reference counts are used to guide scheduling
250 : /// decisions, not for memory management: they represent the number of tenant shard whose IntentState
251 : /// targets this node and the number of tenants shars whose IntentState is attached to this
252 : /// node.
253 : ///
254 : /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
255 : /// [`Self::new`] or [`Self::node_upsert`])
256 138 : pub(crate) fn update_node_ref_counts(&mut self, node_id: NodeId, update: RefCountUpdate) {
257 138 : let Some(node) = self.nodes.get_mut(&node_id) else {
258 0 : debug_assert!(false);
259 0 : tracing::error!("Scheduler missing node {node_id}");
260 0 : return;
261 : };
262 :
263 138 : match update {
264 10 : RefCountUpdate::PromoteSecondary => {
265 10 : node.attached_shard_count += 1;
266 10 : }
267 26 : RefCountUpdate::Attach => {
268 26 : node.shard_count += 1;
269 26 : node.attached_shard_count += 1;
270 26 : }
271 24 : RefCountUpdate::Detach => {
272 24 : node.shard_count -= 1;
273 24 : node.attached_shard_count -= 1;
274 24 : }
275 10 : RefCountUpdate::DemoteAttached => {
276 10 : node.attached_shard_count -= 1;
277 10 : }
278 34 : RefCountUpdate::AddSecondary => {
279 34 : node.shard_count += 1;
280 34 : }
281 34 : RefCountUpdate::RemoveSecondary => {
282 34 : node.shard_count -= 1;
283 34 : }
284 : }
285 138 : }
286 :
287 : // Check if the number of shards attached to a given node is lagging below
288 : // the cluster average. If that's the case, the node should be filled.
289 0 : pub(crate) fn compute_fill_requirement(&self, node_id: NodeId) -> usize {
290 0 : let Some(node) = self.nodes.get(&node_id) else {
291 0 : debug_assert!(false);
292 0 : tracing::error!("Scheduler missing node {node_id}");
293 0 : return 0;
294 : };
295 0 : assert!(!self.nodes.is_empty());
296 0 : let expected_attached_shards_per_node = self.expected_attached_shard_count();
297 :
298 0 : for (node_id, node) in self.nodes.iter() {
299 0 : tracing::trace!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
300 : }
301 :
302 0 : if node.attached_shard_count < expected_attached_shards_per_node {
303 0 : expected_attached_shards_per_node - node.attached_shard_count
304 : } else {
305 0 : 0
306 : }
307 0 : }
308 :
309 0 : pub(crate) fn expected_attached_shard_count(&self) -> usize {
310 0 : let total_attached_shards: usize =
311 0 : self.nodes.values().map(|n| n.attached_shard_count).sum();
312 0 :
313 0 : assert!(!self.nodes.is_empty());
314 0 : total_attached_shards / self.nodes.len()
315 0 : }
316 :
317 0 : pub(crate) fn nodes_by_attached_shard_count(&self) -> Vec<(NodeId, usize)> {
318 0 : self.nodes
319 0 : .iter()
320 0 : .map(|(node_id, stats)| (*node_id, stats.attached_shard_count))
321 0 : .sorted_by(|lhs, rhs| Ord::cmp(&lhs.1, &rhs.1).reverse())
322 0 : .collect()
323 0 : }
324 :
325 10 : pub(crate) fn node_upsert(&mut self, node: &Node) {
326 10 : use std::collections::hash_map::Entry::*;
327 10 : match self.nodes.entry(node.get_id()) {
328 2 : Occupied(mut entry) => {
329 2 : entry.get_mut().may_schedule = node.may_schedule();
330 2 : }
331 8 : Vacant(entry) => {
332 8 : entry.insert(SchedulerNode {
333 8 : shard_count: 0,
334 8 : attached_shard_count: 0,
335 8 : may_schedule: node.may_schedule(),
336 8 : });
337 8 : }
338 : }
339 10 : }
340 :
341 0 : pub(crate) fn node_remove(&mut self, node_id: NodeId) {
342 0 : if self.nodes.remove(&node_id).is_none() {
343 0 : tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
344 0 : }
345 0 : }
346 :
347 : /// Where we have several nodes to choose from, for example when picking a secondary location
348 : /// to promote to an attached location, this method may be used to pick the best choice based
349 : /// on the scheduler's knowledge of utilization and availability.
350 : ///
351 : /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
352 : /// caller can pick a node some other way.
353 28 : pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option<NodeId> {
354 28 : if nodes.is_empty() {
355 24 : return None;
356 4 : }
357 4 :
358 4 : // TODO: When the utilization score returned by the pageserver becomes meaningful,
359 4 : // schedule based on that instead of the shard count.
360 4 : let node = nodes
361 4 : .iter()
362 8 : .map(|node_id| {
363 8 : let may_schedule = self
364 8 : .nodes
365 8 : .get(node_id)
366 8 : .map(|n| n.may_schedule != MaySchedule::No)
367 8 : .unwrap_or(false);
368 8 : (*node_id, may_schedule)
369 8 : })
370 8 : .max_by_key(|(_n, may_schedule)| *may_schedule);
371 4 :
372 4 : // If even the preferred node has may_schedule==false, return None
373 4 : node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
374 28 : }
375 :
376 : /// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
377 : /// are already in use by this shard -- we use this to avoid picking the same node
378 : /// as both attached and secondary location. This is a hard constraint: if we cannot
379 : /// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
380 : ///
381 : /// context: we prefer to avoid using nodes identified in the context, according
382 : /// to their anti-affinity score. We use this to prefeer to avoid placing shards in
383 : /// the same tenant on the same node. This is a soft constraint: the context will never
384 : /// cause us to fail to schedule a shard.
385 54 : pub(crate) fn schedule_shard(
386 54 : &self,
387 54 : hard_exclude: &[NodeId],
388 54 : context: &ScheduleContext,
389 54 : ) -> Result<NodeId, ScheduleError> {
390 54 : if self.nodes.is_empty() {
391 0 : return Err(ScheduleError::NoPageservers);
392 54 : }
393 54 :
394 54 : let mut scores: Vec<(NodeId, AffinityScore, usize, usize)> = self
395 54 : .nodes
396 54 : .iter()
397 164 : .filter_map(|(k, v)| {
398 164 : if hard_exclude.contains(k) || v.may_schedule == MaySchedule::No {
399 62 : None
400 : } else {
401 102 : Some((
402 102 : *k,
403 102 : context.nodes.get(k).copied().unwrap_or(AffinityScore::FREE),
404 102 : v.shard_count,
405 102 : v.attached_shard_count,
406 102 : ))
407 : }
408 164 : })
409 54 : .collect();
410 54 :
411 54 : // Sort by, in order of precedence:
412 54 : // 1st: Affinity score. We should never pick a higher-score node if a lower-score node is available
413 54 : // 2nd: Attached shard count. Within nodes with the same affinity, we always pick the node with
414 54 : // the least number of attached shards.
415 54 : // 3rd: Total shard count. Within nodes with the same affinity and attached shard count, use nodes
416 54 : // with the lower total shard count.
417 54 : // 4th: Node ID. This is a convenience to make selection deterministic in tests and empty systems.
418 98 : scores.sort_by_key(|i| (i.1, i.3, i.2, i.0));
419 54 :
420 54 : if scores.is_empty() {
421 : // After applying constraints, no pageservers were left.
422 0 : if !matches!(context.mode, ScheduleMode::Speculative) {
423 : // If this was not a speculative attempt, log details to understand why we couldn't
424 : // schedule: this may help an engineer understand if some nodes are marked offline
425 : // in a way that's preventing progress.
426 0 : tracing::info!(
427 0 : "Scheduling failure, while excluding {hard_exclude:?}, node states:"
428 : );
429 0 : for (node_id, node) in &self.nodes {
430 0 : tracing::info!(
431 0 : "Node {node_id}: may_schedule={} shards={}",
432 0 : node.may_schedule != MaySchedule::No,
433 : node.shard_count
434 : );
435 : }
436 0 : }
437 0 : return Err(ScheduleError::ImpossibleConstraint);
438 54 : }
439 54 :
440 54 : // Lowest score wins
441 54 : let node_id = scores.first().unwrap().0;
442 :
443 54 : if !matches!(context.mode, ScheduleMode::Speculative) {
444 54 : tracing::info!(
445 0 : "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
446 0 : scores.iter().map(|i| i.0 .0).collect::<Vec<_>>()
447 : );
448 0 : }
449 :
450 : // Note that we do not update shard count here to reflect the scheduling: that
451 : // is IntentState's job when the scheduled location is used.
452 :
453 54 : Ok(node_id)
454 54 : }
455 :
456 : /// Unit test access to internal state
457 : #[cfg(test)]
458 24 : pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
459 24 : self.nodes.get(&node_id).unwrap().shard_count
460 24 : }
461 :
462 : #[cfg(test)]
463 24 : pub(crate) fn get_node_attached_shard_count(&self, node_id: NodeId) -> usize {
464 24 : self.nodes.get(&node_id).unwrap().attached_shard_count
465 24 : }
466 : }
467 :
468 : #[cfg(test)]
469 : pub(crate) mod test_utils {
470 :
471 : use crate::node::Node;
472 : use pageserver_api::controller_api::{NodeAvailability, UtilizationScore};
473 : use std::collections::HashMap;
474 : use utils::id::NodeId;
475 : /// Test helper: synthesize the requested number of nodes, all in active state.
476 : ///
477 : /// Node IDs start at one.
478 14 : pub(crate) fn make_test_nodes(n: u64) -> HashMap<NodeId, Node> {
479 14 : (1..n + 1)
480 44 : .map(|i| {
481 44 : (NodeId(i), {
482 44 : let mut node = Node::new(
483 44 : NodeId(i),
484 44 : format!("httphost-{i}"),
485 44 : 80 + i as u16,
486 44 : format!("pghost-{i}"),
487 44 : 5432 + i as u16,
488 44 : );
489 44 : node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
490 44 : assert!(node.is_available());
491 44 : node
492 44 : })
493 44 : })
494 14 : .collect()
495 14 : }
496 : }
497 :
498 : #[cfg(test)]
499 : mod tests {
500 : use super::*;
501 :
502 : use crate::tenant_shard::IntentState;
503 : #[test]
504 2 : fn scheduler_basic() -> anyhow::Result<()> {
505 2 : let nodes = test_utils::make_test_nodes(2);
506 2 :
507 2 : let mut scheduler = Scheduler::new(nodes.values());
508 2 : let mut t1_intent = IntentState::new();
509 2 : let mut t2_intent = IntentState::new();
510 2 :
511 2 : let context = ScheduleContext::default();
512 :
513 2 : let scheduled = scheduler.schedule_shard(&[], &context)?;
514 2 : t1_intent.set_attached(&mut scheduler, Some(scheduled));
515 2 : let scheduled = scheduler.schedule_shard(&[], &context)?;
516 2 : t2_intent.set_attached(&mut scheduler, Some(scheduled));
517 2 :
518 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
519 2 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
520 :
521 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
522 2 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
523 :
524 2 : let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers(), &context)?;
525 2 : t1_intent.push_secondary(&mut scheduler, scheduled);
526 2 :
527 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
528 2 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
529 :
530 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
531 2 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
532 :
533 2 : t1_intent.clear(&mut scheduler);
534 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
535 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
536 :
537 2 : let total_attached = scheduler.get_node_attached_shard_count(NodeId(1))
538 2 : + scheduler.get_node_attached_shard_count(NodeId(2));
539 2 : assert_eq!(total_attached, 1);
540 :
541 2 : if cfg!(debug_assertions) {
542 : // Dropping an IntentState without clearing it causes a panic in debug mode,
543 : // because we have failed to properly update scheduler shard counts.
544 2 : let result = std::panic::catch_unwind(move || {
545 2 : drop(t2_intent);
546 2 : });
547 2 : assert!(result.is_err());
548 : } else {
549 0 : t2_intent.clear(&mut scheduler);
550 0 :
551 0 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
552 0 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 0);
553 :
554 0 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 0);
555 0 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0);
556 : }
557 :
558 2 : Ok(())
559 2 : }
560 : }
|