Line data Source code
1 : use crate::{node::Node, tenant_shard::TenantShard};
2 : use itertools::Itertools;
3 : use pageserver_api::{controller_api::AvailabilityZone, models::PageserverUtilization};
4 : use serde::Serialize;
5 : use std::{collections::HashMap, fmt::Debug};
6 : use utils::{http::error::ApiError, id::NodeId};
7 :
8 : /// Scenarios in which we cannot find a suitable location for a tenant shard
9 0 : #[derive(thiserror::Error, Debug)]
10 : pub enum ScheduleError {
11 : #[error("No pageservers found")]
12 : NoPageservers,
13 : #[error("No pageserver found matching constraint")]
14 : ImpossibleConstraint,
15 : }
16 :
17 : impl From<ScheduleError> for ApiError {
18 0 : fn from(value: ScheduleError) -> Self {
19 0 : ApiError::Conflict(format!("Scheduling error: {}", value))
20 0 : }
21 : }
22 :
23 : #[derive(Serialize)]
24 : pub enum MaySchedule {
25 : Yes(PageserverUtilization),
26 : No,
27 : }
28 :
29 : #[derive(Serialize)]
30 : pub(crate) struct SchedulerNode {
31 : /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
32 : shard_count: usize,
33 : /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
34 : attached_shard_count: usize,
35 : /// Availability zone id in which the node resides
36 : az: AvailabilityZone,
37 :
38 : /// Whether this node is currently elegible to have new shards scheduled (this is derived
39 : /// from a node's availability state and scheduling policy).
40 : may_schedule: MaySchedule,
41 : }
42 :
43 : pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized {
44 : fn generate(
45 : node_id: &NodeId,
46 : node: &mut SchedulerNode,
47 : preferred_az: &Option<AvailabilityZone>,
48 : context: &ScheduleContext,
49 : ) -> Option<Self>;
50 : fn is_overloaded(&self) -> bool;
51 : fn node_id(&self) -> NodeId;
52 : }
53 :
54 : pub(crate) trait ShardTag {
55 : type Score: NodeSchedulingScore;
56 : }
57 :
58 : pub(crate) struct AttachedShardTag {}
59 : impl ShardTag for AttachedShardTag {
60 : type Score = NodeAttachmentSchedulingScore;
61 : }
62 :
63 : pub(crate) struct SecondaryShardTag {}
64 : impl ShardTag for SecondaryShardTag {
65 : type Score = NodeSecondarySchedulingScore;
66 : }
67 :
68 : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
69 : enum AzMatch {
70 : Yes,
71 : No,
72 : Unknown,
73 : }
74 :
75 : impl AzMatch {
76 87626 : fn new(node_az: &AvailabilityZone, shard_preferred_az: Option<&AvailabilityZone>) -> Self {
77 87518 : match shard_preferred_az {
78 87518 : Some(preferred_az) if preferred_az == node_az => Self::Yes,
79 50008 : Some(_preferred_az) => Self::No,
80 108 : None => Self::Unknown,
81 : }
82 87626 : }
83 : }
84 :
85 : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
86 : struct AttachmentAzMatch(AzMatch);
87 :
88 : impl Ord for AttachmentAzMatch {
89 46827 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
90 46827 : // Lower scores indicate a more suitable node.
91 46827 : // Note that we prefer a node for which we don't have
92 46827 : // info to a node which we are certain doesn't match the
93 46827 : // preferred AZ of the shard.
94 93654 : let az_match_score = |az_match: &AzMatch| match az_match {
95 46786 : AzMatch::Yes => 0,
96 86 : AzMatch::Unknown => 1,
97 46782 : AzMatch::No => 2,
98 93654 : };
99 :
100 46827 : az_match_score(&self.0).cmp(&az_match_score(&other.0))
101 46827 : }
102 : }
103 :
104 : impl PartialOrd for AttachmentAzMatch {
105 46827 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
106 46827 : Some(self.cmp(other))
107 46827 : }
108 : }
109 :
110 : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
111 : struct SecondaryAzMatch(AzMatch);
112 :
113 : impl Ord for SecondaryAzMatch {
114 33096 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
115 33096 : // Lower scores indicate a more suitable node.
116 33096 : // For secondary locations we wish to avoid the preferred AZ
117 33096 : // of the shard.
118 66192 : let az_match_score = |az_match: &AzMatch| match az_match {
119 45583 : AzMatch::No => 0,
120 28 : AzMatch::Unknown => 1,
121 20581 : AzMatch::Yes => 2,
122 66192 : };
123 :
124 33096 : az_match_score(&self.0).cmp(&az_match_score(&other.0))
125 33096 : }
126 : }
127 :
128 : impl PartialOrd for SecondaryAzMatch {
129 33096 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
130 33096 : Some(self.cmp(other))
131 33096 : }
132 : }
133 :
134 : /// Scheduling score of a given node for shard attachments.
135 : /// Lower scores indicate more suitable nodes.
136 : /// Ordering is given by member declaration order (top to bottom).
137 : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
138 : pub(crate) struct NodeAttachmentSchedulingScore {
139 : /// The number of shards belonging to the tenant currently being
140 : /// scheduled that are attached to this node.
141 : affinity_score: AffinityScore,
142 : /// Flag indicating whether this node matches the preferred AZ
143 : /// of the shard. For equal affinity scores, nodes in the matching AZ
144 : /// are considered first.
145 : az_match: AttachmentAzMatch,
146 : /// Size of [`ScheduleContext::attached_nodes`] for the current node.
147 : /// This normally tracks the number of attached shards belonging to the
148 : /// tenant being scheduled that are already on this node.
149 : attached_shards_in_context: usize,
150 : /// Utilisation score that combines shard count and disk utilisation
151 : utilization_score: u64,
152 : /// Total number of shards attached to this node. When nodes have identical utilisation, this
153 : /// acts as an anti-affinity between attached shards.
154 : total_attached_shard_count: usize,
155 : /// Convenience to make selection deterministic in tests and empty systems
156 : node_id: NodeId,
157 : }
158 :
159 : impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
160 50077 : fn generate(
161 50077 : node_id: &NodeId,
162 50077 : node: &mut SchedulerNode,
163 50077 : preferred_az: &Option<AvailabilityZone>,
164 50077 : context: &ScheduleContext,
165 50077 : ) -> Option<Self> {
166 50077 : let utilization = match &mut node.may_schedule {
167 50077 : MaySchedule::Yes(u) => u,
168 : MaySchedule::No => {
169 0 : return None;
170 : }
171 : };
172 :
173 50077 : Some(Self {
174 50077 : affinity_score: context
175 50077 : .nodes
176 50077 : .get(node_id)
177 50077 : .copied()
178 50077 : .unwrap_or(AffinityScore::FREE),
179 50077 : az_match: AttachmentAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
180 50077 : attached_shards_in_context: context.attached_nodes.get(node_id).copied().unwrap_or(0),
181 50077 : utilization_score: utilization.cached_score(),
182 50077 : total_attached_shard_count: node.attached_shard_count,
183 50077 : node_id: *node_id,
184 50077 : })
185 50077 : }
186 :
187 50077 : fn is_overloaded(&self) -> bool {
188 50077 : PageserverUtilization::is_overloaded(self.utilization_score)
189 50077 : }
190 :
191 12531 : fn node_id(&self) -> NodeId {
192 12531 : self.node_id
193 12531 : }
194 : }
195 :
196 : /// Scheduling score of a given node for shard secondaries.
197 : /// Lower scores indicate more suitable nodes.
198 : /// Ordering is given by member declaration order (top to bottom).
199 : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
200 : pub(crate) struct NodeSecondarySchedulingScore {
201 : /// Flag indicating whether this node matches the preferred AZ
202 : /// of the shard. For secondary locations we wish to avoid nodes in.
203 : /// the preferred AZ of the shard, since that's where the attached location
204 : /// should be scheduled and having the secondary in the same AZ is bad for HA.
205 : az_match: SecondaryAzMatch,
206 : /// The number of shards belonging to the tenant currently being
207 : /// scheduled that are attached to this node.
208 : affinity_score: AffinityScore,
209 : /// Utilisation score that combines shard count and disk utilisation
210 : utilization_score: u64,
211 : /// Total number of shards attached to this node. When nodes have identical utilisation, this
212 : /// acts as an anti-affinity between attached shards.
213 : total_attached_shard_count: usize,
214 : /// Convenience to make selection deterministic in tests and empty systems
215 : node_id: NodeId,
216 : }
217 :
218 : impl NodeSchedulingScore for NodeSecondarySchedulingScore {
219 37549 : fn generate(
220 37549 : node_id: &NodeId,
221 37549 : node: &mut SchedulerNode,
222 37549 : preferred_az: &Option<AvailabilityZone>,
223 37549 : context: &ScheduleContext,
224 37549 : ) -> Option<Self> {
225 37549 : let utilization = match &mut node.may_schedule {
226 37549 : MaySchedule::Yes(u) => u,
227 : MaySchedule::No => {
228 0 : return None;
229 : }
230 : };
231 :
232 37549 : Some(Self {
233 37549 : az_match: SecondaryAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
234 37549 : affinity_score: context
235 37549 : .nodes
236 37549 : .get(node_id)
237 37549 : .copied()
238 37549 : .unwrap_or(AffinityScore::FREE),
239 37549 : utilization_score: utilization.cached_score(),
240 37549 : total_attached_shard_count: node.attached_shard_count,
241 37549 : node_id: *node_id,
242 37549 : })
243 37549 : }
244 :
245 37549 : fn is_overloaded(&self) -> bool {
246 37549 : PageserverUtilization::is_overloaded(self.utilization_score)
247 37549 : }
248 :
249 12529 : fn node_id(&self) -> NodeId {
250 12529 : self.node_id
251 12529 : }
252 : }
253 :
254 : impl PartialEq for SchedulerNode {
255 3 : fn eq(&self, other: &Self) -> bool {
256 3 : let may_schedule_matches = matches!(
257 3 : (&self.may_schedule, &other.may_schedule),
258 : (MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
259 : );
260 :
261 3 : may_schedule_matches
262 3 : && self.shard_count == other.shard_count
263 3 : && self.attached_shard_count == other.attached_shard_count
264 3 : && self.az == other.az
265 3 : }
266 : }
267 :
268 : impl Eq for SchedulerNode {}
269 :
270 : /// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
271 : /// on which to run.
272 : ///
273 : /// The type has no persistent state of its own: this is all populated at startup. The Serialize
274 : /// impl is only for debug dumps.
275 : #[derive(Serialize)]
276 : pub(crate) struct Scheduler {
277 : nodes: HashMap<NodeId, SchedulerNode>,
278 : }
279 :
280 : /// Score for soft constraint scheduling: lower scores are preferred to higher scores.
281 : ///
282 : /// For example, we may set an affinity score based on the number of shards from the same
283 : /// tenant already on a node, to implicitly prefer to balance out shards.
284 : #[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
285 : pub(crate) struct AffinityScore(pub(crate) usize);
286 :
287 : impl AffinityScore {
288 : /// If we have no anti-affinity at all toward a node, this is its score. It means
289 : /// the scheduler has a free choice amongst nodes with this score, and may pick a node
290 : /// based on other information such as total utilization.
291 : pub(crate) const FREE: Self = Self(0);
292 :
293 25118 : pub(crate) fn inc(&mut self) {
294 25118 : self.0 += 1;
295 25118 : }
296 : }
297 :
298 : impl std::ops::Add for AffinityScore {
299 : type Output = Self;
300 :
301 24 : fn add(self, rhs: Self) -> Self::Output {
302 24 : Self(self.0 + rhs.0)
303 24 : }
304 : }
305 :
306 : /// Hint for whether this is a sincere attempt to schedule, or a speculative
307 : /// check for where we _would_ schedule (done during optimization)
308 : #[derive(Debug)]
309 : pub(crate) enum ScheduleMode {
310 : Normal,
311 : Speculative,
312 : }
313 :
314 : impl Default for ScheduleMode {
315 5021 : fn default() -> Self {
316 5021 : Self::Normal
317 5021 : }
318 : }
319 :
320 : // For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
321 : // it for many shards in the same tenant.
322 : #[derive(Debug, Default)]
323 : pub(crate) struct ScheduleContext {
324 : /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
325 : pub(crate) nodes: HashMap<NodeId, AffinityScore>,
326 :
327 : /// Specifically how many _attached_ locations are on each node
328 : pub(crate) attached_nodes: HashMap<NodeId, usize>,
329 :
330 : pub(crate) mode: ScheduleMode,
331 : }
332 :
333 : impl ScheduleContext {
334 : /// Input is a list of nodes we would like to avoid using again within this context. The more
335 : /// times a node is passed into this call, the less inclined we are to use it.
336 12563 : pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
337 37681 : for node_id in nodes {
338 25118 : let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
339 25118 : entry.inc()
340 : }
341 12563 : }
342 :
343 12555 : pub(crate) fn push_attached(&mut self, node_id: NodeId) {
344 12555 : let entry = self.attached_nodes.entry(node_id).or_default();
345 12555 : *entry += 1;
346 12555 : }
347 :
348 69 : pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
349 69 : self.nodes
350 69 : .get(&node_id)
351 69 : .copied()
352 69 : .unwrap_or(AffinityScore::FREE)
353 69 : }
354 :
355 69 : pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
356 69 : self.attached_nodes.get(&node_id).copied().unwrap_or(0)
357 69 : }
358 : }
359 :
360 : pub(crate) enum RefCountUpdate {
361 : PromoteSecondary,
362 : Attach,
363 : Detach,
364 : DemoteAttached,
365 : AddSecondary,
366 : RemoveSecondary,
367 : }
368 :
369 : impl Scheduler {
370 60 : pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
371 60 : let mut scheduler_nodes = HashMap::new();
372 84 : for node in nodes {
373 24 : scheduler_nodes.insert(
374 24 : node.get_id(),
375 24 : SchedulerNode {
376 24 : shard_count: 0,
377 24 : attached_shard_count: 0,
378 24 : may_schedule: node.may_schedule(),
379 24 : az: node.get_availability_zone_id().clone(),
380 24 : },
381 24 : );
382 24 : }
383 :
384 60 : Self {
385 60 : nodes: scheduler_nodes,
386 60 : }
387 60 : }
388 :
389 : /// For debug/support: check that our internal statistics are in sync with the state of
390 : /// the nodes & tenant shards.
391 : ///
392 : /// If anything is inconsistent, log details and return an error.
393 1 : pub(crate) fn consistency_check<'a>(
394 1 : &self,
395 1 : nodes: impl Iterator<Item = &'a Node>,
396 1 : shards: impl Iterator<Item = &'a TenantShard>,
397 1 : ) -> anyhow::Result<()> {
398 1 : let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
399 4 : for node in nodes {
400 3 : expect_nodes.insert(
401 3 : node.get_id(),
402 3 : SchedulerNode {
403 3 : shard_count: 0,
404 3 : attached_shard_count: 0,
405 3 : may_schedule: node.may_schedule(),
406 3 : az: node.get_availability_zone_id().clone(),
407 3 : },
408 3 : );
409 3 : }
410 :
411 2 : for shard in shards {
412 1 : if let Some(node_id) = shard.intent.get_attached() {
413 1 : match expect_nodes.get_mut(node_id) {
414 1 : Some(node) => {
415 1 : node.shard_count += 1;
416 1 : node.attached_shard_count += 1;
417 1 : }
418 0 : None => anyhow::bail!(
419 0 : "Tenant {} references nonexistent node {}",
420 0 : shard.tenant_shard_id,
421 0 : node_id
422 0 : ),
423 : }
424 0 : }
425 :
426 1 : for node_id in shard.intent.get_secondary() {
427 1 : match expect_nodes.get_mut(node_id) {
428 1 : Some(node) => node.shard_count += 1,
429 0 : None => anyhow::bail!(
430 0 : "Tenant {} references nonexistent node {}",
431 0 : shard.tenant_shard_id,
432 0 : node_id
433 0 : ),
434 : }
435 : }
436 : }
437 :
438 4 : for (node_id, expect_node) in &expect_nodes {
439 3 : let Some(self_node) = self.nodes.get(node_id) else {
440 0 : anyhow::bail!("Node {node_id} not found in Self")
441 : };
442 :
443 3 : if self_node != expect_node {
444 0 : tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
445 0 : tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
446 0 : tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
447 :
448 0 : anyhow::bail!("Inconsistent state on {node_id}");
449 3 : }
450 : }
451 :
452 1 : if expect_nodes.len() != self.nodes.len() {
453 : // We just checked that all the expected nodes are present. If the lengths don't match,
454 : // it means that we have nodes in Self that are unexpected.
455 0 : for node_id in self.nodes.keys() {
456 0 : if !expect_nodes.contains_key(node_id) {
457 0 : anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
458 0 : }
459 : }
460 1 : }
461 :
462 1 : Ok(())
463 1 : }
464 :
465 : /// Update the reference counts of a node. These reference counts are used to guide scheduling
466 : /// decisions, not for memory management: they represent the number of tenant shard whose IntentState
467 : /// targets this node and the number of tenants shars whose IntentState is attached to this
468 : /// node.
469 : ///
470 : /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
471 : /// [`Self::new`] or [`Self::node_upsert`])
472 50135 : pub(crate) fn update_node_ref_counts(&mut self, node_id: NodeId, update: RefCountUpdate) {
473 50135 : let Some(node) = self.nodes.get_mut(&node_id) else {
474 0 : debug_assert!(false);
475 0 : tracing::error!("Scheduler missing node {node_id}");
476 0 : return;
477 : };
478 :
479 50135 : match update {
480 5 : RefCountUpdate::PromoteSecondary => {
481 5 : node.attached_shard_count += 1;
482 5 : }
483 12538 : RefCountUpdate::Attach => {
484 12538 : node.shard_count += 1;
485 12538 : node.attached_shard_count += 1;
486 12538 : }
487 12537 : RefCountUpdate::Detach => {
488 12537 : node.shard_count -= 1;
489 12537 : node.attached_shard_count -= 1;
490 12537 : }
491 5 : RefCountUpdate::DemoteAttached => {
492 5 : node.attached_shard_count -= 1;
493 5 : }
494 12525 : RefCountUpdate::AddSecondary => {
495 12525 : node.shard_count += 1;
496 12525 : }
497 12525 : RefCountUpdate::RemoveSecondary => {
498 12525 : node.shard_count -= 1;
499 12525 : }
500 : }
501 :
502 : // Maybe update PageserverUtilization
503 50135 : match update {
504 : RefCountUpdate::AddSecondary | RefCountUpdate::Attach => {
505 : // Referencing the node: if this takes our shard_count above the utilzation structure's
506 : // shard count, then artifically bump it: this ensures that the scheduler immediately
507 : // recognizes that this node has more work on it, without waiting for the next heartbeat
508 : // to update the utilization.
509 25063 : if let MaySchedule::Yes(utilization) = &mut node.may_schedule {
510 25063 : utilization.adjust_shard_count_max(node.shard_count as u32);
511 25063 : }
512 : }
513 : RefCountUpdate::PromoteSecondary
514 : | RefCountUpdate::Detach
515 : | RefCountUpdate::RemoveSecondary
516 25072 : | RefCountUpdate::DemoteAttached => {
517 25072 : // De-referencing the node: leave the utilization's shard_count at a stale higher
518 25072 : // value until some future heartbeat after we have physically removed this shard
519 25072 : // from the node: this prevents the scheduler over-optimistically trying to schedule
520 25072 : // more work onto the node before earlier detaches are done.
521 25072 : }
522 : }
523 50135 : }
524 :
525 : // Check if the number of shards attached to a given node is lagging below
526 : // the cluster average. If that's the case, the node should be filled.
527 0 : pub(crate) fn compute_fill_requirement(&self, node_id: NodeId) -> usize {
528 0 : let Some(node) = self.nodes.get(&node_id) else {
529 0 : debug_assert!(false);
530 0 : tracing::error!("Scheduler missing node {node_id}");
531 0 : return 0;
532 : };
533 0 : assert!(!self.nodes.is_empty());
534 0 : let expected_attached_shards_per_node = self.expected_attached_shard_count();
535 :
536 0 : for (node_id, node) in self.nodes.iter() {
537 0 : tracing::trace!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
538 : }
539 :
540 0 : if node.attached_shard_count < expected_attached_shards_per_node {
541 0 : expected_attached_shards_per_node - node.attached_shard_count
542 : } else {
543 0 : 0
544 : }
545 0 : }
546 :
547 0 : pub(crate) fn expected_attached_shard_count(&self) -> usize {
548 0 : let total_attached_shards: usize =
549 0 : self.nodes.values().map(|n| n.attached_shard_count).sum();
550 0 :
551 0 : assert!(!self.nodes.is_empty());
552 0 : total_attached_shards / self.nodes.len()
553 0 : }
554 :
555 0 : pub(crate) fn nodes_by_attached_shard_count(&self) -> Vec<(NodeId, usize)> {
556 0 : self.nodes
557 0 : .iter()
558 0 : .map(|(node_id, stats)| (*node_id, stats.attached_shard_count))
559 0 : .sorted_by(|lhs, rhs| Ord::cmp(&lhs.1, &rhs.1).reverse())
560 0 : .collect()
561 0 : }
562 :
563 209 : pub(crate) fn node_upsert(&mut self, node: &Node) {
564 : use std::collections::hash_map::Entry::*;
565 209 : match self.nodes.entry(node.get_id()) {
566 3 : Occupied(mut entry) => {
567 3 : // Updates to MaySchedule are how we receive updated PageserverUtilization: adjust these values
568 3 : // to account for any shards scheduled on the controller but not yet visible to the pageserver.
569 3 : let mut may_schedule = node.may_schedule();
570 3 : match &mut may_schedule {
571 2 : MaySchedule::Yes(utilization) => {
572 2 : utilization.adjust_shard_count_max(entry.get().shard_count as u32);
573 2 : }
574 1 : MaySchedule::No => { // Nothing to tweak
575 1 : }
576 : }
577 :
578 3 : entry.get_mut().may_schedule = may_schedule;
579 : }
580 206 : Vacant(entry) => {
581 206 : entry.insert(SchedulerNode {
582 206 : shard_count: 0,
583 206 : attached_shard_count: 0,
584 206 : may_schedule: node.may_schedule(),
585 206 : az: node.get_availability_zone_id().clone(),
586 206 : });
587 206 : }
588 : }
589 209 : }
590 :
591 0 : pub(crate) fn node_remove(&mut self, node_id: NodeId) {
592 0 : if self.nodes.remove(&node_id).is_none() {
593 0 : tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
594 0 : }
595 0 : }
596 :
597 : /// Where we have several nodes to choose from, for example when picking a secondary location
598 : /// to promote to an attached location, this method may be used to pick the best choice based
599 : /// on the scheduler's knowledge of utilization and availability.
600 : ///
601 : /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
602 : /// caller can pick a node some other way.
603 25030 : pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option<NodeId> {
604 25030 : if nodes.is_empty() {
605 25028 : return None;
606 2 : }
607 2 :
608 2 : // TODO: When the utilization score returned by the pageserver becomes meaningful,
609 2 : // schedule based on that instead of the shard count.
610 2 : let node = nodes
611 2 : .iter()
612 4 : .map(|node_id| {
613 4 : let may_schedule = self
614 4 : .nodes
615 4 : .get(node_id)
616 4 : .map(|n| !matches!(n.may_schedule, MaySchedule::No))
617 4 : .unwrap_or(false);
618 4 : (*node_id, may_schedule)
619 4 : })
620 4 : .max_by_key(|(_n, may_schedule)| *may_schedule);
621 2 :
622 2 : // If even the preferred node has may_schedule==false, return None
623 2 : node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
624 25030 : }
625 :
626 : /// Compute a schedulling score for each node that the scheduler knows of
627 : /// minus a set of hard excluded nodes.
628 25068 : fn compute_node_scores<Score>(
629 25068 : &mut self,
630 25068 : hard_exclude: &[NodeId],
631 25068 : preferred_az: &Option<AvailabilityZone>,
632 25068 : context: &ScheduleContext,
633 25068 : ) -> Vec<Score>
634 25068 : where
635 25068 : Score: NodeSchedulingScore,
636 25068 : {
637 25068 : self.nodes
638 25068 : .iter_mut()
639 100181 : .filter_map(|(k, v)| {
640 100181 : if hard_exclude.contains(k) {
641 12555 : None
642 : } else {
643 87626 : Score::generate(k, v, preferred_az, context)
644 : }
645 100181 : })
646 25068 : .collect()
647 25068 : }
648 :
649 : /// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
650 : /// are already in use by this shard -- we use this to avoid picking the same node
651 : /// as both attached and secondary location. This is a hard constraint: if we cannot
652 : /// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
653 : ///
654 : /// context: we prefer to avoid using nodes identified in the context, according
655 : /// to their anti-affinity score. We use this to prefeer to avoid placing shards in
656 : /// the same tenant on the same node. This is a soft constraint: the context will never
657 : /// cause us to fail to schedule a shard.
658 25068 : pub(crate) fn schedule_shard<Tag: ShardTag>(
659 25068 : &mut self,
660 25068 : hard_exclude: &[NodeId],
661 25068 : preferred_az: &Option<AvailabilityZone>,
662 25068 : context: &ScheduleContext,
663 25068 : ) -> Result<NodeId, ScheduleError> {
664 25068 : if self.nodes.is_empty() {
665 0 : return Err(ScheduleError::NoPageservers);
666 25068 : }
667 25068 :
668 25068 : let mut scores =
669 25068 : self.compute_node_scores::<Tag::Score>(hard_exclude, preferred_az, context);
670 25068 :
671 25068 : // Exclude nodes whose utilization is critically high, if there are alternatives available. This will
672 25068 : // cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example
673 25068 : // we may place shards in the same tenant together on the same pageserver if all other pageservers are
674 25068 : // overloaded.
675 25068 : let non_overloaded_scores = scores
676 25068 : .iter()
677 87626 : .filter(|i| !i.is_overloaded())
678 25068 : .copied()
679 25068 : .collect::<Vec<_>>();
680 25068 : if !non_overloaded_scores.is_empty() {
681 25060 : scores = non_overloaded_scores;
682 25060 : }
683 :
684 : // Sort the nodes by score. The one with the lowest scores will be the preferred node.
685 : // Refer to [`NodeAttachmentSchedulingScore`] for attached locations and
686 : // [`NodeSecondarySchedulingScore`] for secondary locations to understand how the nodes
687 : // are ranked.
688 25068 : scores.sort();
689 25068 :
690 25068 : if scores.is_empty() {
691 : // After applying constraints, no pageservers were left.
692 8 : if !matches!(context.mode, ScheduleMode::Speculative) {
693 : // If this was not a speculative attempt, log details to understand why we couldn't
694 : // schedule: this may help an engineer understand if some nodes are marked offline
695 : // in a way that's preventing progress.
696 8 : tracing::info!(
697 0 : "Scheduling failure, while excluding {hard_exclude:?}, node states:"
698 : );
699 24 : for (node_id, node) in &self.nodes {
700 16 : tracing::info!(
701 0 : "Node {node_id}: may_schedule={} shards={}",
702 0 : !matches!(node.may_schedule, MaySchedule::No),
703 : node.shard_count
704 : );
705 : }
706 0 : }
707 8 : return Err(ScheduleError::ImpossibleConstraint);
708 25060 : }
709 25060 :
710 25060 : // Lowest score wins
711 25060 : let node_id = scores.first().unwrap().node_id();
712 :
713 25060 : if !matches!(context.mode, ScheduleMode::Speculative) {
714 25060 : tracing::info!(
715 0 : "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
716 0 : scores.iter().map(|i| i.node_id().0).collect::<Vec<_>>()
717 : );
718 0 : }
719 :
720 : // Note that we do not update shard count here to reflect the scheduling: that
721 : // is IntentState's job when the scheduled location is used.
722 :
723 25060 : Ok(node_id)
724 25068 : }
725 :
726 : /// Selects any available node. This is suitable for performing background work (e.g. S3
727 : /// deletions).
728 0 : pub(crate) fn any_available_node(&mut self) -> Result<NodeId, ScheduleError> {
729 0 : self.schedule_shard::<AttachedShardTag>(&[], &None, &ScheduleContext::default())
730 0 : }
731 :
732 : /// Unit test access to internal state
733 : #[cfg(test)]
734 12 : pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
735 12 : self.nodes.get(&node_id).unwrap().shard_count
736 12 : }
737 :
738 : #[cfg(test)]
739 12 : pub(crate) fn get_node_attached_shard_count(&self, node_id: NodeId) -> usize {
740 12 : self.nodes.get(&node_id).unwrap().attached_shard_count
741 12 : }
742 : }
743 :
744 : #[cfg(test)]
745 : pub(crate) mod test_utils {
746 :
747 : use crate::node::Node;
748 : use pageserver_api::{
749 : controller_api::{AvailabilityZone, NodeAvailability},
750 : models::utilization::test_utilization,
751 : };
752 : use std::collections::HashMap;
753 : use utils::id::NodeId;
754 :
755 : /// Test helper: synthesize the requested number of nodes, all in active state.
756 : ///
757 : /// Node IDs start at one.
758 : ///
759 : /// The `azs` argument specifies the list of availability zones which will be assigned
760 : /// to nodes in round-robin fashion. If empy, a default AZ is assigned.
761 60 : pub(crate) fn make_test_nodes(n: u64, azs: &[AvailabilityZone]) -> HashMap<NodeId, Node> {
762 60 : let mut az_iter = azs.iter().cycle();
763 60 :
764 60 : (1..n + 1)
765 230 : .map(|i| {
766 230 : (NodeId(i), {
767 230 : let mut node = Node::new(
768 230 : NodeId(i),
769 230 : format!("httphost-{i}"),
770 230 : 80 + i as u16,
771 230 : format!("pghost-{i}"),
772 230 : 5432 + i as u16,
773 230 : az_iter
774 230 : .next()
775 230 : .cloned()
776 230 : .unwrap_or(AvailabilityZone("test-az".to_string())),
777 230 : );
778 230 : node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
779 230 : assert!(node.is_available());
780 230 : node
781 230 : })
782 230 : })
783 60 : .collect()
784 60 : }
785 : }
786 :
787 : #[cfg(test)]
788 : mod tests {
789 : use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
790 :
791 : use super::*;
792 :
793 : use crate::tenant_shard::IntentState;
794 : #[test]
795 1 : fn scheduler_basic() -> anyhow::Result<()> {
796 1 : let nodes = test_utils::make_test_nodes(2, &[]);
797 1 :
798 1 : let mut scheduler = Scheduler::new(nodes.values());
799 1 : let mut t1_intent = IntentState::new();
800 1 : let mut t2_intent = IntentState::new();
801 1 :
802 1 : let context = ScheduleContext::default();
803 :
804 1 : let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
805 1 : t1_intent.set_attached(&mut scheduler, Some(scheduled));
806 1 : let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
807 1 : t2_intent.set_attached(&mut scheduler, Some(scheduled));
808 1 :
809 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
810 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
811 :
812 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
813 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
814 :
815 1 : let scheduled = scheduler.schedule_shard::<AttachedShardTag>(
816 1 : &t1_intent.all_pageservers(),
817 1 : &None,
818 1 : &context,
819 1 : )?;
820 1 : t1_intent.push_secondary(&mut scheduler, scheduled);
821 1 :
822 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
823 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
824 :
825 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
826 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
827 :
828 1 : t1_intent.clear(&mut scheduler);
829 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
830 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
831 :
832 1 : let total_attached = scheduler.get_node_attached_shard_count(NodeId(1))
833 1 : + scheduler.get_node_attached_shard_count(NodeId(2));
834 1 : assert_eq!(total_attached, 1);
835 :
836 1 : if cfg!(debug_assertions) {
837 : // Dropping an IntentState without clearing it causes a panic in debug mode,
838 : // because we have failed to properly update scheduler shard counts.
839 1 : let result = std::panic::catch_unwind(move || {
840 1 : drop(t2_intent);
841 1 : });
842 1 : assert!(result.is_err());
843 : } else {
844 0 : t2_intent.clear(&mut scheduler);
845 0 :
846 0 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
847 0 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 0);
848 :
849 0 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 0);
850 0 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0);
851 : }
852 :
853 1 : Ok(())
854 1 : }
855 :
856 : #[test]
857 : /// Test the PageserverUtilization's contribution to scheduling algorithm
858 1 : fn scheduler_utilization() {
859 1 : let mut nodes = test_utils::make_test_nodes(3, &[]);
860 1 : let mut scheduler = Scheduler::new(nodes.values());
861 1 :
862 1 : // Need to keep these alive because they contribute to shard counts via RAII
863 1 : let mut scheduled_intents = Vec::new();
864 1 :
865 1 : let empty_context = ScheduleContext::default();
866 :
867 11 : fn assert_scheduler_chooses(
868 11 : expect_node: NodeId,
869 11 : scheduled_intents: &mut Vec<IntentState>,
870 11 : scheduler: &mut Scheduler,
871 11 : context: &ScheduleContext,
872 11 : ) {
873 11 : let scheduled = scheduler
874 11 : .schedule_shard::<AttachedShardTag>(&[], &None, context)
875 11 : .unwrap();
876 11 : let mut intent = IntentState::new();
877 11 : intent.set_attached(scheduler, Some(scheduled));
878 11 : scheduled_intents.push(intent);
879 11 : assert_eq!(scheduled, expect_node);
880 11 : }
881 :
882 : // Independent schedule calls onto empty nodes should round-robin, because each node's
883 : // utilization's shard count is updated inline. The order is determinsitic because when all other factors are
884 : // equal, we order by node ID.
885 1 : assert_scheduler_chooses(
886 1 : NodeId(1),
887 1 : &mut scheduled_intents,
888 1 : &mut scheduler,
889 1 : &empty_context,
890 1 : );
891 1 : assert_scheduler_chooses(
892 1 : NodeId(2),
893 1 : &mut scheduled_intents,
894 1 : &mut scheduler,
895 1 : &empty_context,
896 1 : );
897 1 : assert_scheduler_chooses(
898 1 : NodeId(3),
899 1 : &mut scheduled_intents,
900 1 : &mut scheduler,
901 1 : &empty_context,
902 1 : );
903 1 :
904 1 : // Manually setting utilization higher should cause schedule calls to round-robin the other nodes
905 1 : // which have equal utilization.
906 1 : nodes
907 1 : .get_mut(&NodeId(1))
908 1 : .unwrap()
909 1 : .set_availability(NodeAvailability::Active(test_utilization::simple(
910 1 : 10,
911 1 : 1024 * 1024 * 1024,
912 1 : )));
913 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
914 1 :
915 1 : assert_scheduler_chooses(
916 1 : NodeId(2),
917 1 : &mut scheduled_intents,
918 1 : &mut scheduler,
919 1 : &empty_context,
920 1 : );
921 1 : assert_scheduler_chooses(
922 1 : NodeId(3),
923 1 : &mut scheduled_intents,
924 1 : &mut scheduler,
925 1 : &empty_context,
926 1 : );
927 1 : assert_scheduler_chooses(
928 1 : NodeId(2),
929 1 : &mut scheduled_intents,
930 1 : &mut scheduler,
931 1 : &empty_context,
932 1 : );
933 1 : assert_scheduler_chooses(
934 1 : NodeId(3),
935 1 : &mut scheduled_intents,
936 1 : &mut scheduler,
937 1 : &empty_context,
938 1 : );
939 1 :
940 1 : // The scheduler should prefer nodes with lower affinity score,
941 1 : // even if they have higher utilization (as long as they aren't utilized at >100%)
942 1 : let mut context_prefer_node1 = ScheduleContext::default();
943 1 : context_prefer_node1.avoid(&[NodeId(2), NodeId(3)]);
944 1 : assert_scheduler_chooses(
945 1 : NodeId(1),
946 1 : &mut scheduled_intents,
947 1 : &mut scheduler,
948 1 : &context_prefer_node1,
949 1 : );
950 1 : assert_scheduler_chooses(
951 1 : NodeId(1),
952 1 : &mut scheduled_intents,
953 1 : &mut scheduler,
954 1 : &context_prefer_node1,
955 1 : );
956 1 :
957 1 : // If a node is over-utilized, it will not be used even if affinity scores prefer it
958 1 : nodes
959 1 : .get_mut(&NodeId(1))
960 1 : .unwrap()
961 1 : .set_availability(NodeAvailability::Active(test_utilization::simple(
962 1 : 20000,
963 1 : 1024 * 1024 * 1024,
964 1 : )));
965 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
966 1 : assert_scheduler_chooses(
967 1 : NodeId(2),
968 1 : &mut scheduled_intents,
969 1 : &mut scheduler,
970 1 : &context_prefer_node1,
971 1 : );
972 1 : assert_scheduler_chooses(
973 1 : NodeId(3),
974 1 : &mut scheduled_intents,
975 1 : &mut scheduler,
976 1 : &context_prefer_node1,
977 1 : );
978 :
979 12 : for mut intent in scheduled_intents {
980 11 : intent.clear(&mut scheduler);
981 11 : }
982 1 : }
983 :
984 : #[test]
985 : /// A simple test that showcases AZ-aware scheduling and its interaction with
986 : /// affinity scores.
987 1 : fn az_scheduling() {
988 1 : let az_a_tag = AvailabilityZone("az-a".to_string());
989 1 : let az_b_tag = AvailabilityZone("az-b".to_string());
990 1 :
991 1 : let nodes = test_utils::make_test_nodes(3, &[az_a_tag.clone(), az_b_tag.clone()]);
992 1 : let mut scheduler = Scheduler::new(nodes.values());
993 1 :
994 1 : // Need to keep these alive because they contribute to shard counts via RAII
995 1 : let mut scheduled_intents = Vec::new();
996 1 :
997 1 : let mut context = ScheduleContext::default();
998 :
999 6 : fn assert_scheduler_chooses<Tag: ShardTag>(
1000 6 : expect_node: NodeId,
1001 6 : preferred_az: Option<AvailabilityZone>,
1002 6 : scheduled_intents: &mut Vec<IntentState>,
1003 6 : scheduler: &mut Scheduler,
1004 6 : context: &mut ScheduleContext,
1005 6 : ) {
1006 6 : let scheduled = scheduler
1007 6 : .schedule_shard::<Tag>(&[], &preferred_az, context)
1008 6 : .unwrap();
1009 6 : let mut intent = IntentState::new();
1010 6 : intent.set_attached(scheduler, Some(scheduled));
1011 6 : scheduled_intents.push(intent);
1012 6 : assert_eq!(scheduled, expect_node);
1013 :
1014 6 : context.avoid(&[scheduled]);
1015 6 : }
1016 :
1017 1 : assert_scheduler_chooses::<AttachedShardTag>(
1018 1 : NodeId(1),
1019 1 : Some(az_a_tag.clone()),
1020 1 : &mut scheduled_intents,
1021 1 : &mut scheduler,
1022 1 : &mut context,
1023 1 : );
1024 1 :
1025 1 : // Node 2 and 3 have affinity score equal to 0, but node 3
1026 1 : // is in "az-a" so we prefer that.
1027 1 : assert_scheduler_chooses::<AttachedShardTag>(
1028 1 : NodeId(3),
1029 1 : Some(az_a_tag.clone()),
1030 1 : &mut scheduled_intents,
1031 1 : &mut scheduler,
1032 1 : &mut context,
1033 1 : );
1034 1 :
1035 1 : // Node 2 is not in "az-a", but it has the lowest affinity so we prefer that.
1036 1 : assert_scheduler_chooses::<AttachedShardTag>(
1037 1 : NodeId(2),
1038 1 : Some(az_a_tag.clone()),
1039 1 : &mut scheduled_intents,
1040 1 : &mut scheduler,
1041 1 : &mut context,
1042 1 : );
1043 1 :
1044 1 : // Avoid nodes in "az-a" for the secondary location.
1045 1 : assert_scheduler_chooses::<SecondaryShardTag>(
1046 1 : NodeId(2),
1047 1 : Some(az_a_tag.clone()),
1048 1 : &mut scheduled_intents,
1049 1 : &mut scheduler,
1050 1 : &mut context,
1051 1 : );
1052 1 :
1053 1 : // Avoid nodes in "az-b" for the secondary location.
1054 1 : // Nodes 1 and 3 are identically loaded, so prefer the lowest node id.
1055 1 : assert_scheduler_chooses::<SecondaryShardTag>(
1056 1 : NodeId(1),
1057 1 : Some(az_b_tag.clone()),
1058 1 : &mut scheduled_intents,
1059 1 : &mut scheduler,
1060 1 : &mut context,
1061 1 : );
1062 1 :
1063 1 : // Avoid nodes in "az-b" for the secondary location.
1064 1 : // Node 3 has lower affinity score than 1, so prefer that.
1065 1 : assert_scheduler_chooses::<SecondaryShardTag>(
1066 1 : NodeId(3),
1067 1 : Some(az_b_tag.clone()),
1068 1 : &mut scheduled_intents,
1069 1 : &mut scheduler,
1070 1 : &mut context,
1071 1 : );
1072 :
1073 7 : for mut intent in scheduled_intents {
1074 6 : intent.clear(&mut scheduler);
1075 6 : }
1076 1 : }
1077 : }
|