Line data Source code
1 : use crate::{node::Node, tenant_shard::TenantShard};
2 : use itertools::Itertools;
3 : use pageserver_api::models::PageserverUtilization;
4 : use serde::Serialize;
5 : use std::{collections::HashMap, fmt::Debug};
6 : use utils::{http::error::ApiError, id::NodeId};
7 :
8 : /// Scenarios in which we cannot find a suitable location for a tenant shard
9 0 : #[derive(thiserror::Error, Debug)]
10 : pub enum ScheduleError {
11 : #[error("No pageservers found")]
12 : NoPageservers,
13 : #[error("No pageserver found matching constraint")]
14 : ImpossibleConstraint,
15 : }
16 :
17 : impl From<ScheduleError> for ApiError {
18 0 : fn from(value: ScheduleError) -> Self {
19 0 : ApiError::Conflict(format!("Scheduling error: {}", value))
20 0 : }
21 : }
22 :
23 : #[derive(Serialize)]
24 : pub enum MaySchedule {
25 : Yes(PageserverUtilization),
26 : No,
27 : }
28 :
29 : #[derive(Serialize)]
30 : pub(crate) struct SchedulerNode {
31 : /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
32 : shard_count: usize,
33 : /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
34 : attached_shard_count: usize,
35 :
36 : /// Whether this node is currently elegible to have new shards scheduled (this is derived
37 : /// from a node's availability state and scheduling policy).
38 : may_schedule: MaySchedule,
39 : }
40 :
41 : pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized {
42 : fn generate(
43 : node_id: &NodeId,
44 : node: &mut SchedulerNode,
45 : context: &ScheduleContext,
46 : ) -> Option<Self>;
47 : fn is_overloaded(&self) -> bool;
48 : fn node_id(&self) -> NodeId;
49 : }
50 :
51 : pub(crate) trait ShardTag {
52 : type Score: NodeSchedulingScore;
53 : }
54 :
55 : pub(crate) struct AttachedShardTag {}
56 : impl ShardTag for AttachedShardTag {
57 : type Score = NodeAttachmentSchedulingScore;
58 : }
59 :
60 : pub(crate) struct SecondaryShardTag {}
61 : impl ShardTag for SecondaryShardTag {
62 : type Score = NodeSecondarySchedulingScore;
63 : }
64 :
65 : /// Scheduling score of a given node for shard attachments.
66 : /// Lower scores indicate more suitable nodes.
67 : /// Ordering is given by member declaration order (top to bottom).
68 : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
69 : pub(crate) struct NodeAttachmentSchedulingScore {
70 : /// The number of shards belonging to the tenant currently being
71 : /// scheduled that are attached to this node.
72 : affinity_score: AffinityScore,
73 : /// Size of [`ScheduleContext::attached_nodes`] for the current node.
74 : /// This normally tracks the number of attached shards belonging to the
75 : /// tenant being scheduled that are already on this node.
76 : attached_shards_in_context: usize,
77 : /// Utilisation score that combines shard count and disk utilisation
78 : utilization_score: u64,
79 : /// Total number of shards attached to this node. When nodes have identical utilisation, this
80 : /// acts as an anti-affinity between attached shards.
81 : total_attached_shard_count: usize,
82 : /// Convenience to make selection deterministic in tests and empty systems
83 : node_id: NodeId,
84 : }
85 :
86 : impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
87 68 : fn generate(
88 68 : node_id: &NodeId,
89 68 : node: &mut SchedulerNode,
90 68 : context: &ScheduleContext,
91 68 : ) -> Option<Self> {
92 68 : let utilization = match &mut node.may_schedule {
93 68 : MaySchedule::Yes(u) => u,
94 : MaySchedule::No => {
95 0 : return None;
96 : }
97 : };
98 :
99 68 : Some(Self {
100 68 : affinity_score: context
101 68 : .nodes
102 68 : .get(node_id)
103 68 : .copied()
104 68 : .unwrap_or(AffinityScore::FREE),
105 68 : attached_shards_in_context: context.attached_nodes.get(node_id).copied().unwrap_or(0),
106 68 : utilization_score: utilization.cached_score(),
107 68 : total_attached_shard_count: node.attached_shard_count,
108 68 : node_id: *node_id,
109 68 : })
110 68 : }
111 :
112 68 : fn is_overloaded(&self) -> bool {
113 68 : PageserverUtilization::is_overloaded(self.utilization_score)
114 68 : }
115 :
116 28 : fn node_id(&self) -> NodeId {
117 28 : self.node_id
118 28 : }
119 : }
120 :
121 : /// Scheduling score of a given node for shard secondaries.
122 : /// Lower scores indicate more suitable nodes.
123 : /// Ordering is given by member declaration order (top to bottom).
124 : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
125 : pub(crate) struct NodeSecondarySchedulingScore {
126 : /// The number of shards belonging to the tenant currently being
127 : /// scheduled that are attached to this node.
128 : affinity_score: AffinityScore,
129 : /// Utilisation score that combines shard count and disk utilisation
130 : utilization_score: u64,
131 : /// Total number of shards attached to this node. When nodes have identical utilisation, this
132 : /// acts as an anti-affinity between attached shards.
133 : total_attached_shard_count: usize,
134 : /// Convenience to make selection deterministic in tests and empty systems
135 : node_id: NodeId,
136 : }
137 :
138 : impl NodeSchedulingScore for NodeSecondarySchedulingScore {
139 40 : fn generate(
140 40 : node_id: &NodeId,
141 40 : node: &mut SchedulerNode,
142 40 : context: &ScheduleContext,
143 40 : ) -> Option<Self> {
144 40 : let utilization = match &mut node.may_schedule {
145 40 : MaySchedule::Yes(u) => u,
146 : MaySchedule::No => {
147 0 : return None;
148 : }
149 : };
150 :
151 40 : Some(Self {
152 40 : affinity_score: context
153 40 : .nodes
154 40 : .get(node_id)
155 40 : .copied()
156 40 : .unwrap_or(AffinityScore::FREE),
157 40 : utilization_score: utilization.cached_score(),
158 40 : total_attached_shard_count: node.attached_shard_count,
159 40 : node_id: *node_id,
160 40 : })
161 40 : }
162 :
163 40 : fn is_overloaded(&self) -> bool {
164 40 : PageserverUtilization::is_overloaded(self.utilization_score)
165 40 : }
166 :
167 26 : fn node_id(&self) -> NodeId {
168 26 : self.node_id
169 26 : }
170 : }
171 :
172 : impl PartialEq for SchedulerNode {
173 3 : fn eq(&self, other: &Self) -> bool {
174 3 : let may_schedule_matches = matches!(
175 3 : (&self.may_schedule, &other.may_schedule),
176 : (MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
177 : );
178 :
179 3 : may_schedule_matches
180 3 : && self.shard_count == other.shard_count
181 3 : && self.attached_shard_count == other.attached_shard_count
182 3 : }
183 : }
184 :
185 : impl Eq for SchedulerNode {}
186 :
187 : /// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
188 : /// on which to run.
189 : ///
190 : /// The type has no persistent state of its own: this is all populated at startup. The Serialize
191 : /// impl is only for debug dumps.
192 : #[derive(Serialize)]
193 : pub(crate) struct Scheduler {
194 : nodes: HashMap<NodeId, SchedulerNode>,
195 : }
196 :
197 : /// Score for soft constraint scheduling: lower scores are preferred to higher scores.
198 : ///
199 : /// For example, we may set an affinity score based on the number of shards from the same
200 : /// tenant already on a node, to implicitly prefer to balance out shards.
201 : #[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
202 : pub(crate) struct AffinityScore(pub(crate) usize);
203 :
204 : impl AffinityScore {
205 : /// If we have no anti-affinity at all toward a node, this is its score. It means
206 : /// the scheduler has a free choice amongst nodes with this score, and may pick a node
207 : /// based on other information such as total utilization.
208 : pub(crate) const FREE: Self = Self(0);
209 :
210 112 : pub(crate) fn inc(&mut self) {
211 112 : self.0 += 1;
212 112 : }
213 : }
214 :
215 : impl std::ops::Add for AffinityScore {
216 : type Output = Self;
217 :
218 24 : fn add(self, rhs: Self) -> Self::Output {
219 24 : Self(self.0 + rhs.0)
220 24 : }
221 : }
222 :
223 : /// Hint for whether this is a sincere attempt to schedule, or a speculative
224 : /// check for where we _would_ schedule (done during optimization)
225 : #[derive(Debug)]
226 : pub(crate) enum ScheduleMode {
227 : Normal,
228 : Speculative,
229 : }
230 :
231 : impl Default for ScheduleMode {
232 20 : fn default() -> Self {
233 20 : Self::Normal
234 20 : }
235 : }
236 :
237 : // For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
238 : // it for many shards in the same tenant.
239 : #[derive(Debug, Default)]
240 : pub(crate) struct ScheduleContext {
241 : /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
242 : pub(crate) nodes: HashMap<NodeId, AffinityScore>,
243 :
244 : /// Specifically how many _attached_ locations are on each node
245 : pub(crate) attached_nodes: HashMap<NodeId, usize>,
246 :
247 : pub(crate) mode: ScheduleMode,
248 : }
249 :
250 : impl ScheduleContext {
251 : /// Input is a list of nodes we would like to avoid using again within this context. The more
252 : /// times a node is passed into this call, the less inclined we are to use it.
253 57 : pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
254 169 : for node_id in nodes {
255 112 : let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
256 112 : entry.inc()
257 : }
258 57 : }
259 :
260 55 : pub(crate) fn push_attached(&mut self, node_id: NodeId) {
261 55 : let entry = self.attached_nodes.entry(node_id).or_default();
262 55 : *entry += 1;
263 55 : }
264 :
265 69 : pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
266 69 : self.nodes
267 69 : .get(&node_id)
268 69 : .copied()
269 69 : .unwrap_or(AffinityScore::FREE)
270 69 : }
271 :
272 69 : pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
273 69 : self.attached_nodes.get(&node_id).copied().unwrap_or(0)
274 69 : }
275 : }
276 :
277 : pub(crate) enum RefCountUpdate {
278 : PromoteSecondary,
279 : Attach,
280 : Detach,
281 : DemoteAttached,
282 : AddSecondary,
283 : RemoveSecondary,
284 : }
285 :
286 : impl Scheduler {
287 9 : pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
288 9 : let mut scheduler_nodes = HashMap::new();
289 30 : for node in nodes {
290 21 : scheduler_nodes.insert(
291 21 : node.get_id(),
292 21 : SchedulerNode {
293 21 : shard_count: 0,
294 21 : attached_shard_count: 0,
295 21 : may_schedule: node.may_schedule(),
296 21 : },
297 21 : );
298 21 : }
299 :
300 9 : Self {
301 9 : nodes: scheduler_nodes,
302 9 : }
303 9 : }
304 :
305 : /// For debug/support: check that our internal statistics are in sync with the state of
306 : /// the nodes & tenant shards.
307 : ///
308 : /// If anything is inconsistent, log details and return an error.
309 1 : pub(crate) fn consistency_check<'a>(
310 1 : &self,
311 1 : nodes: impl Iterator<Item = &'a Node>,
312 1 : shards: impl Iterator<Item = &'a TenantShard>,
313 1 : ) -> anyhow::Result<()> {
314 1 : let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
315 4 : for node in nodes {
316 3 : expect_nodes.insert(
317 3 : node.get_id(),
318 3 : SchedulerNode {
319 3 : shard_count: 0,
320 3 : attached_shard_count: 0,
321 3 : may_schedule: node.may_schedule(),
322 3 : },
323 3 : );
324 3 : }
325 :
326 2 : for shard in shards {
327 1 : if let Some(node_id) = shard.intent.get_attached() {
328 1 : match expect_nodes.get_mut(node_id) {
329 1 : Some(node) => {
330 1 : node.shard_count += 1;
331 1 : node.attached_shard_count += 1;
332 1 : }
333 0 : None => anyhow::bail!(
334 0 : "Tenant {} references nonexistent node {}",
335 0 : shard.tenant_shard_id,
336 0 : node_id
337 0 : ),
338 : }
339 0 : }
340 :
341 1 : for node_id in shard.intent.get_secondary() {
342 1 : match expect_nodes.get_mut(node_id) {
343 1 : Some(node) => node.shard_count += 1,
344 0 : None => anyhow::bail!(
345 0 : "Tenant {} references nonexistent node {}",
346 0 : shard.tenant_shard_id,
347 0 : node_id
348 0 : ),
349 : }
350 : }
351 : }
352 :
353 4 : for (node_id, expect_node) in &expect_nodes {
354 3 : let Some(self_node) = self.nodes.get(node_id) else {
355 0 : anyhow::bail!("Node {node_id} not found in Self")
356 : };
357 :
358 3 : if self_node != expect_node {
359 0 : tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
360 0 : tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
361 0 : tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
362 :
363 0 : anyhow::bail!("Inconsistent state on {node_id}");
364 3 : }
365 : }
366 :
367 1 : if expect_nodes.len() != self.nodes.len() {
368 : // We just checked that all the expected nodes are present. If the lengths don't match,
369 : // it means that we have nodes in Self that are unexpected.
370 0 : for node_id in self.nodes.keys() {
371 0 : if !expect_nodes.contains_key(node_id) {
372 0 : anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
373 0 : }
374 : }
375 1 : }
376 :
377 1 : Ok(())
378 1 : }
379 :
380 : /// Update the reference counts of a node. These reference counts are used to guide scheduling
381 : /// decisions, not for memory management: they represent the number of tenant shard whose IntentState
382 : /// targets this node and the number of tenants shars whose IntentState is attached to this
383 : /// node.
384 : ///
385 : /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
386 : /// [`Self::new`] or [`Self::node_upsert`])
387 123 : pub(crate) fn update_node_ref_counts(&mut self, node_id: NodeId, update: RefCountUpdate) {
388 123 : let Some(node) = self.nodes.get_mut(&node_id) else {
389 0 : debug_assert!(false);
390 0 : tracing::error!("Scheduler missing node {node_id}");
391 0 : return;
392 : };
393 :
394 123 : match update {
395 5 : RefCountUpdate::PromoteSecondary => {
396 5 : node.attached_shard_count += 1;
397 5 : }
398 32 : RefCountUpdate::Attach => {
399 32 : node.shard_count += 1;
400 32 : node.attached_shard_count += 1;
401 32 : }
402 31 : RefCountUpdate::Detach => {
403 31 : node.shard_count -= 1;
404 31 : node.attached_shard_count -= 1;
405 31 : }
406 5 : RefCountUpdate::DemoteAttached => {
407 5 : node.attached_shard_count -= 1;
408 5 : }
409 25 : RefCountUpdate::AddSecondary => {
410 25 : node.shard_count += 1;
411 25 : }
412 25 : RefCountUpdate::RemoveSecondary => {
413 25 : node.shard_count -= 1;
414 25 : }
415 : }
416 :
417 : // Maybe update PageserverUtilization
418 123 : match update {
419 : RefCountUpdate::AddSecondary | RefCountUpdate::Attach => {
420 : // Referencing the node: if this takes our shard_count above the utilzation structure's
421 : // shard count, then artifically bump it: this ensures that the scheduler immediately
422 : // recognizes that this node has more work on it, without waiting for the next heartbeat
423 : // to update the utilization.
424 57 : if let MaySchedule::Yes(utilization) = &mut node.may_schedule {
425 57 : utilization.adjust_shard_count_max(node.shard_count as u32);
426 57 : }
427 : }
428 : RefCountUpdate::PromoteSecondary
429 : | RefCountUpdate::Detach
430 : | RefCountUpdate::RemoveSecondary
431 66 : | RefCountUpdate::DemoteAttached => {
432 66 : // De-referencing the node: leave the utilization's shard_count at a stale higher
433 66 : // value until some future heartbeat after we have physically removed this shard
434 66 : // from the node: this prevents the scheduler over-optimistically trying to schedule
435 66 : // more work onto the node before earlier detaches are done.
436 66 : }
437 : }
438 123 : }
439 :
440 : // Check if the number of shards attached to a given node is lagging below
441 : // the cluster average. If that's the case, the node should be filled.
442 0 : pub(crate) fn compute_fill_requirement(&self, node_id: NodeId) -> usize {
443 0 : let Some(node) = self.nodes.get(&node_id) else {
444 0 : debug_assert!(false);
445 0 : tracing::error!("Scheduler missing node {node_id}");
446 0 : return 0;
447 : };
448 0 : assert!(!self.nodes.is_empty());
449 0 : let expected_attached_shards_per_node = self.expected_attached_shard_count();
450 :
451 0 : for (node_id, node) in self.nodes.iter() {
452 0 : tracing::trace!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
453 : }
454 :
455 0 : if node.attached_shard_count < expected_attached_shards_per_node {
456 0 : expected_attached_shards_per_node - node.attached_shard_count
457 : } else {
458 0 : 0
459 : }
460 0 : }
461 :
462 0 : pub(crate) fn expected_attached_shard_count(&self) -> usize {
463 0 : let total_attached_shards: usize =
464 0 : self.nodes.values().map(|n| n.attached_shard_count).sum();
465 0 :
466 0 : assert!(!self.nodes.is_empty());
467 0 : total_attached_shards / self.nodes.len()
468 0 : }
469 :
470 0 : pub(crate) fn nodes_by_attached_shard_count(&self) -> Vec<(NodeId, usize)> {
471 0 : self.nodes
472 0 : .iter()
473 0 : .map(|(node_id, stats)| (*node_id, stats.attached_shard_count))
474 0 : .sorted_by(|lhs, rhs| Ord::cmp(&lhs.1, &rhs.1).reverse())
475 0 : .collect()
476 0 : }
477 :
478 9 : pub(crate) fn node_upsert(&mut self, node: &Node) {
479 : use std::collections::hash_map::Entry::*;
480 9 : match self.nodes.entry(node.get_id()) {
481 3 : Occupied(mut entry) => {
482 3 : // Updates to MaySchedule are how we receive updated PageserverUtilization: adjust these values
483 3 : // to account for any shards scheduled on the controller but not yet visible to the pageserver.
484 3 : let mut may_schedule = node.may_schedule();
485 3 : match &mut may_schedule {
486 2 : MaySchedule::Yes(utilization) => {
487 2 : utilization.adjust_shard_count_max(entry.get().shard_count as u32);
488 2 : }
489 1 : MaySchedule::No => { // Nothing to tweak
490 1 : }
491 : }
492 :
493 3 : entry.get_mut().may_schedule = may_schedule;
494 : }
495 6 : Vacant(entry) => {
496 6 : entry.insert(SchedulerNode {
497 6 : shard_count: 0,
498 6 : attached_shard_count: 0,
499 6 : may_schedule: node.may_schedule(),
500 6 : });
501 6 : }
502 : }
503 9 : }
504 :
505 0 : pub(crate) fn node_remove(&mut self, node_id: NodeId) {
506 0 : if self.nodes.remove(&node_id).is_none() {
507 0 : tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
508 0 : }
509 0 : }
510 :
511 : /// Where we have several nodes to choose from, for example when picking a secondary location
512 : /// to promote to an attached location, this method may be used to pick the best choice based
513 : /// on the scheduler's knowledge of utilization and availability.
514 : ///
515 : /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
516 : /// caller can pick a node some other way.
517 30 : pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option<NodeId> {
518 30 : if nodes.is_empty() {
519 28 : return None;
520 2 : }
521 2 :
522 2 : // TODO: When the utilization score returned by the pageserver becomes meaningful,
523 2 : // schedule based on that instead of the shard count.
524 2 : let node = nodes
525 2 : .iter()
526 4 : .map(|node_id| {
527 4 : let may_schedule = self
528 4 : .nodes
529 4 : .get(node_id)
530 4 : .map(|n| !matches!(n.may_schedule, MaySchedule::No))
531 4 : .unwrap_or(false);
532 4 : (*node_id, may_schedule)
533 4 : })
534 4 : .max_by_key(|(_n, may_schedule)| *may_schedule);
535 2 :
536 2 : // If even the preferred node has may_schedule==false, return None
537 2 : node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
538 30 : }
539 :
540 : /// Compute a schedulling score for each node that the scheduler knows of
541 : /// minus a set of hard excluded nodes.
542 62 : fn compute_node_scores<Score>(
543 62 : &mut self,
544 62 : hard_exclude: &[NodeId],
545 62 : context: &ScheduleContext,
546 62 : ) -> Vec<Score>
547 62 : where
548 62 : Score: NodeSchedulingScore,
549 62 : {
550 62 : self.nodes
551 62 : .iter_mut()
552 163 : .filter_map(|(k, v)| {
553 163 : if hard_exclude.contains(k) {
554 55 : None
555 : } else {
556 108 : Score::generate(k, v, context)
557 : }
558 163 : })
559 62 : .collect()
560 62 : }
561 :
562 : /// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
563 : /// are already in use by this shard -- we use this to avoid picking the same node
564 : /// as both attached and secondary location. This is a hard constraint: if we cannot
565 : /// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
566 : ///
567 : /// context: we prefer to avoid using nodes identified in the context, according
568 : /// to their anti-affinity score. We use this to prefeer to avoid placing shards in
569 : /// the same tenant on the same node. This is a soft constraint: the context will never
570 : /// cause us to fail to schedule a shard.
571 62 : pub(crate) fn schedule_shard<Tag: ShardTag>(
572 62 : &mut self,
573 62 : hard_exclude: &[NodeId],
574 62 : context: &ScheduleContext,
575 62 : ) -> Result<NodeId, ScheduleError> {
576 62 : if self.nodes.is_empty() {
577 0 : return Err(ScheduleError::NoPageservers);
578 62 : }
579 62 :
580 62 : let mut scores = self.compute_node_scores::<Tag::Score>(hard_exclude, context);
581 62 :
582 62 : // Exclude nodes whose utilization is critically high, if there are alternatives available. This will
583 62 : // cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example
584 62 : // we may place shards in the same tenant together on the same pageserver if all other pageservers are
585 62 : // overloaded.
586 62 : let non_overloaded_scores = scores
587 62 : .iter()
588 108 : .filter(|i| !i.is_overloaded())
589 62 : .copied()
590 62 : .collect::<Vec<_>>();
591 62 : if !non_overloaded_scores.is_empty() {
592 54 : scores = non_overloaded_scores;
593 54 : }
594 :
595 : // Sort the nodes by score. The one with the lowest scores will be the preferred node.
596 : // Refer to [`NodeAttachmentSchedulingScore`] for attached locations and
597 : // [`NodeSecondarySchedulingScore`] for secondary locations to understand how the nodes
598 : // are ranked.
599 62 : scores.sort();
600 62 :
601 62 : if scores.is_empty() {
602 : // After applying constraints, no pageservers were left.
603 8 : if !matches!(context.mode, ScheduleMode::Speculative) {
604 : // If this was not a speculative attempt, log details to understand why we couldn't
605 : // schedule: this may help an engineer understand if some nodes are marked offline
606 : // in a way that's preventing progress.
607 8 : tracing::info!(
608 0 : "Scheduling failure, while excluding {hard_exclude:?}, node states:"
609 : );
610 24 : for (node_id, node) in &self.nodes {
611 16 : tracing::info!(
612 0 : "Node {node_id}: may_schedule={} shards={}",
613 0 : !matches!(node.may_schedule, MaySchedule::No),
614 : node.shard_count
615 : );
616 : }
617 0 : }
618 8 : return Err(ScheduleError::ImpossibleConstraint);
619 54 : }
620 54 :
621 54 : // Lowest score wins
622 54 : let node_id = scores.first().unwrap().node_id();
623 :
624 54 : if !matches!(context.mode, ScheduleMode::Speculative) {
625 54 : tracing::info!(
626 0 : "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
627 0 : scores.iter().map(|i| i.node_id().0).collect::<Vec<_>>()
628 : );
629 0 : }
630 :
631 : // Note that we do not update shard count here to reflect the scheduling: that
632 : // is IntentState's job when the scheduled location is used.
633 :
634 54 : Ok(node_id)
635 62 : }
636 :
637 : /// Unit test access to internal state
638 : #[cfg(test)]
639 12 : pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
640 12 : self.nodes.get(&node_id).unwrap().shard_count
641 12 : }
642 :
643 : #[cfg(test)]
644 12 : pub(crate) fn get_node_attached_shard_count(&self, node_id: NodeId) -> usize {
645 12 : self.nodes.get(&node_id).unwrap().attached_shard_count
646 12 : }
647 : }
648 :
649 : #[cfg(test)]
650 : pub(crate) mod test_utils {
651 :
652 : use crate::node::Node;
653 : use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
654 : use std::collections::HashMap;
655 : use utils::id::NodeId;
656 : /// Test helper: synthesize the requested number of nodes, all in active state.
657 : ///
658 : /// Node IDs start at one.
659 9 : pub(crate) fn make_test_nodes(n: u64) -> HashMap<NodeId, Node> {
660 9 : (1..n + 1)
661 27 : .map(|i| {
662 27 : (NodeId(i), {
663 27 : let mut node = Node::new(
664 27 : NodeId(i),
665 27 : format!("httphost-{i}"),
666 27 : 80 + i as u16,
667 27 : format!("pghost-{i}"),
668 27 : 5432 + i as u16,
669 27 : "test-az".to_string(),
670 27 : );
671 27 : node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
672 27 : assert!(node.is_available());
673 27 : node
674 27 : })
675 27 : })
676 9 : .collect()
677 9 : }
678 : }
679 :
680 : #[cfg(test)]
681 : mod tests {
682 : use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
683 :
684 : use super::*;
685 :
686 : use crate::tenant_shard::IntentState;
687 : #[test]
688 1 : fn scheduler_basic() -> anyhow::Result<()> {
689 1 : let nodes = test_utils::make_test_nodes(2);
690 1 :
691 1 : let mut scheduler = Scheduler::new(nodes.values());
692 1 : let mut t1_intent = IntentState::new();
693 1 : let mut t2_intent = IntentState::new();
694 1 :
695 1 : let context = ScheduleContext::default();
696 :
697 1 : let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &context)?;
698 1 : t1_intent.set_attached(&mut scheduler, Some(scheduled));
699 1 : let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &context)?;
700 1 : t2_intent.set_attached(&mut scheduler, Some(scheduled));
701 1 :
702 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
703 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
704 :
705 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
706 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
707 :
708 1 : let scheduled =
709 1 : scheduler.schedule_shard::<AttachedShardTag>(&t1_intent.all_pageservers(), &context)?;
710 1 : t1_intent.push_secondary(&mut scheduler, scheduled);
711 1 :
712 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
713 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
714 :
715 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
716 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
717 :
718 1 : t1_intent.clear(&mut scheduler);
719 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
720 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
721 :
722 1 : let total_attached = scheduler.get_node_attached_shard_count(NodeId(1))
723 1 : + scheduler.get_node_attached_shard_count(NodeId(2));
724 1 : assert_eq!(total_attached, 1);
725 :
726 1 : if cfg!(debug_assertions) {
727 : // Dropping an IntentState without clearing it causes a panic in debug mode,
728 : // because we have failed to properly update scheduler shard counts.
729 1 : let result = std::panic::catch_unwind(move || {
730 1 : drop(t2_intent);
731 1 : });
732 1 : assert!(result.is_err());
733 : } else {
734 0 : t2_intent.clear(&mut scheduler);
735 0 :
736 0 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
737 0 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 0);
738 :
739 0 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 0);
740 0 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0);
741 : }
742 :
743 1 : Ok(())
744 1 : }
745 :
746 : #[test]
747 : /// Test the PageserverUtilization's contribution to scheduling algorithm
748 1 : fn scheduler_utilization() {
749 1 : let mut nodes = test_utils::make_test_nodes(3);
750 1 : let mut scheduler = Scheduler::new(nodes.values());
751 1 :
752 1 : // Need to keep these alive because they contribute to shard counts via RAII
753 1 : let mut scheduled_intents = Vec::new();
754 1 :
755 1 : let empty_context = ScheduleContext::default();
756 :
757 11 : fn assert_scheduler_chooses(
758 11 : expect_node: NodeId,
759 11 : scheduled_intents: &mut Vec<IntentState>,
760 11 : scheduler: &mut Scheduler,
761 11 : context: &ScheduleContext,
762 11 : ) {
763 11 : let scheduled = scheduler
764 11 : .schedule_shard::<AttachedShardTag>(&[], context)
765 11 : .unwrap();
766 11 : let mut intent = IntentState::new();
767 11 : intent.set_attached(scheduler, Some(scheduled));
768 11 : scheduled_intents.push(intent);
769 11 : assert_eq!(scheduled, expect_node);
770 11 : }
771 :
772 : // Independent schedule calls onto empty nodes should round-robin, because each node's
773 : // utilization's shard count is updated inline. The order is determinsitic because when all other factors are
774 : // equal, we order by node ID.
775 1 : assert_scheduler_chooses(
776 1 : NodeId(1),
777 1 : &mut scheduled_intents,
778 1 : &mut scheduler,
779 1 : &empty_context,
780 1 : );
781 1 : assert_scheduler_chooses(
782 1 : NodeId(2),
783 1 : &mut scheduled_intents,
784 1 : &mut scheduler,
785 1 : &empty_context,
786 1 : );
787 1 : assert_scheduler_chooses(
788 1 : NodeId(3),
789 1 : &mut scheduled_intents,
790 1 : &mut scheduler,
791 1 : &empty_context,
792 1 : );
793 1 :
794 1 : // Manually setting utilization higher should cause schedule calls to round-robin the other nodes
795 1 : // which have equal utilization.
796 1 : nodes
797 1 : .get_mut(&NodeId(1))
798 1 : .unwrap()
799 1 : .set_availability(NodeAvailability::Active(test_utilization::simple(
800 1 : 10,
801 1 : 1024 * 1024 * 1024,
802 1 : )));
803 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
804 1 :
805 1 : assert_scheduler_chooses(
806 1 : NodeId(2),
807 1 : &mut scheduled_intents,
808 1 : &mut scheduler,
809 1 : &empty_context,
810 1 : );
811 1 : assert_scheduler_chooses(
812 1 : NodeId(3),
813 1 : &mut scheduled_intents,
814 1 : &mut scheduler,
815 1 : &empty_context,
816 1 : );
817 1 : assert_scheduler_chooses(
818 1 : NodeId(2),
819 1 : &mut scheduled_intents,
820 1 : &mut scheduler,
821 1 : &empty_context,
822 1 : );
823 1 : assert_scheduler_chooses(
824 1 : NodeId(3),
825 1 : &mut scheduled_intents,
826 1 : &mut scheduler,
827 1 : &empty_context,
828 1 : );
829 1 :
830 1 : // The scheduler should prefer nodes with lower affinity score,
831 1 : // even if they have higher utilization (as long as they aren't utilized at >100%)
832 1 : let mut context_prefer_node1 = ScheduleContext::default();
833 1 : context_prefer_node1.avoid(&[NodeId(2), NodeId(3)]);
834 1 : assert_scheduler_chooses(
835 1 : NodeId(1),
836 1 : &mut scheduled_intents,
837 1 : &mut scheduler,
838 1 : &context_prefer_node1,
839 1 : );
840 1 : assert_scheduler_chooses(
841 1 : NodeId(1),
842 1 : &mut scheduled_intents,
843 1 : &mut scheduler,
844 1 : &context_prefer_node1,
845 1 : );
846 1 :
847 1 : // If a node is over-utilized, it will not be used even if affinity scores prefer it
848 1 : nodes
849 1 : .get_mut(&NodeId(1))
850 1 : .unwrap()
851 1 : .set_availability(NodeAvailability::Active(test_utilization::simple(
852 1 : 20000,
853 1 : 1024 * 1024 * 1024,
854 1 : )));
855 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
856 1 : assert_scheduler_chooses(
857 1 : NodeId(2),
858 1 : &mut scheduled_intents,
859 1 : &mut scheduler,
860 1 : &context_prefer_node1,
861 1 : );
862 1 : assert_scheduler_chooses(
863 1 : NodeId(3),
864 1 : &mut scheduled_intents,
865 1 : &mut scheduler,
866 1 : &context_prefer_node1,
867 1 : );
868 :
869 12 : for mut intent in scheduled_intents {
870 11 : intent.clear(&mut scheduler);
871 11 : }
872 1 : }
873 : }
|