Line data Source code
1 : use crate::{node::Node, tenant_shard::TenantShard};
2 : use itertools::Itertools;
3 : use pageserver_api::{controller_api::AvailabilityZone, models::PageserverUtilization};
4 : use serde::Serialize;
5 : use std::{collections::HashMap, fmt::Debug};
6 : use utils::{http::error::ApiError, id::NodeId};
7 :
8 : /// Scenarios in which we cannot find a suitable location for a tenant shard
9 : #[derive(thiserror::Error, Debug)]
10 : pub enum ScheduleError {
11 : #[error("No pageservers found")]
12 : NoPageservers,
13 : #[error("No pageserver found matching constraint")]
14 : ImpossibleConstraint,
15 : }
16 :
17 : impl From<ScheduleError> for ApiError {
18 0 : fn from(value: ScheduleError) -> Self {
19 0 : ApiError::Conflict(format!("Scheduling error: {}", value))
20 0 : }
21 : }
22 :
23 : #[derive(Serialize)]
24 : pub enum MaySchedule {
25 : Yes(PageserverUtilization),
26 : No,
27 : }
28 :
29 : #[derive(Serialize)]
30 : pub(crate) struct SchedulerNode {
31 : /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
32 : shard_count: usize,
33 : /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
34 : attached_shard_count: usize,
35 : /// Availability zone id in which the node resides
36 : az: AvailabilityZone,
37 :
38 : /// Whether this node is currently elegible to have new shards scheduled (this is derived
39 : /// from a node's availability state and scheduling policy).
40 : may_schedule: MaySchedule,
41 : }
42 :
43 : pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized {
44 : fn generate(
45 : node_id: &NodeId,
46 : node: &mut SchedulerNode,
47 : preferred_az: &Option<AvailabilityZone>,
48 : context: &ScheduleContext,
49 : ) -> Option<Self>;
50 : fn is_overloaded(&self) -> bool;
51 : fn node_id(&self) -> NodeId;
52 : }
53 :
54 : pub(crate) trait ShardTag {
55 : type Score: NodeSchedulingScore;
56 : }
57 :
58 : pub(crate) struct AttachedShardTag {}
59 : impl ShardTag for AttachedShardTag {
60 : type Score = NodeAttachmentSchedulingScore;
61 : }
62 :
63 : pub(crate) struct SecondaryShardTag {}
64 : impl ShardTag for SecondaryShardTag {
65 : type Score = NodeSecondarySchedulingScore;
66 : }
67 :
68 : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
69 : enum AzMatch {
70 : Yes,
71 : No,
72 : Unknown,
73 : }
74 :
75 : impl AzMatch {
76 87656 : fn new(node_az: &AvailabilityZone, shard_preferred_az: Option<&AvailabilityZone>) -> Self {
77 87518 : match shard_preferred_az {
78 87518 : Some(preferred_az) if preferred_az == node_az => Self::Yes,
79 50008 : Some(_preferred_az) => Self::No,
80 138 : None => Self::Unknown,
81 : }
82 87656 : }
83 : }
84 :
85 : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
86 : struct AttachmentAzMatch(AzMatch);
87 :
88 : impl Ord for AttachmentAzMatch {
89 46817 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
90 46817 : // Lower scores indicate a more suitable node.
91 46817 : // Note that we prefer a node for which we don't have
92 46817 : // info to a node which we are certain doesn't match the
93 46817 : // preferred AZ of the shard.
94 93634 : let az_match_score = |az_match: &AzMatch| match az_match {
95 46770 : AzMatch::Yes => 0,
96 98 : AzMatch::Unknown => 1,
97 46766 : AzMatch::No => 2,
98 93634 : };
99 :
100 46817 : az_match_score(&self.0).cmp(&az_match_score(&other.0))
101 46817 : }
102 : }
103 :
104 : impl PartialOrd for AttachmentAzMatch {
105 46817 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
106 46817 : Some(self.cmp(other))
107 46817 : }
108 : }
109 :
110 : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
111 : struct SecondaryAzMatch(AzMatch);
112 :
113 : impl Ord for SecondaryAzMatch {
114 33355 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
115 33355 : // Lower scores indicate a more suitable node.
116 33355 : // For secondary locations we wish to avoid the preferred AZ
117 33355 : // of the shard.
118 66710 : let az_match_score = |az_match: &AzMatch| match az_match {
119 45836 : AzMatch::No => 0,
120 40 : AzMatch::Unknown => 1,
121 20834 : AzMatch::Yes => 2,
122 66710 : };
123 :
124 33355 : az_match_score(&self.0).cmp(&az_match_score(&other.0))
125 33355 : }
126 : }
127 :
128 : impl PartialOrd for SecondaryAzMatch {
129 33355 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
130 33355 : Some(self.cmp(other))
131 33355 : }
132 : }
133 :
134 : /// Scheduling score of a given node for shard attachments.
135 : /// Lower scores indicate more suitable nodes.
136 : /// Ordering is given by member declaration order (top to bottom).
137 : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
138 : pub(crate) struct NodeAttachmentSchedulingScore {
139 : /// The number of shards belonging to the tenant currently being
140 : /// scheduled that are attached to this node.
141 : affinity_score: AffinityScore,
142 : /// Flag indicating whether this node matches the preferred AZ
143 : /// of the shard. For equal affinity scores, nodes in the matching AZ
144 : /// are considered first.
145 : az_match: AttachmentAzMatch,
146 : /// Size of [`ScheduleContext::attached_nodes`] for the current node.
147 : /// This normally tracks the number of attached shards belonging to the
148 : /// tenant being scheduled that are already on this node.
149 : attached_shards_in_context: usize,
150 : /// Utilisation score that combines shard count and disk utilisation
151 : utilization_score: u64,
152 : /// Total number of shards attached to this node. When nodes have identical utilisation, this
153 : /// acts as an anti-affinity between attached shards.
154 : total_attached_shard_count: usize,
155 : /// Convenience to make selection deterministic in tests and empty systems
156 : node_id: NodeId,
157 : }
158 :
159 : impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
160 50095 : fn generate(
161 50095 : node_id: &NodeId,
162 50095 : node: &mut SchedulerNode,
163 50095 : preferred_az: &Option<AvailabilityZone>,
164 50095 : context: &ScheduleContext,
165 50095 : ) -> Option<Self> {
166 50095 : let utilization = match &mut node.may_schedule {
167 50095 : MaySchedule::Yes(u) => u,
168 : MaySchedule::No => {
169 0 : return None;
170 : }
171 : };
172 :
173 50095 : Some(Self {
174 50095 : affinity_score: context
175 50095 : .nodes
176 50095 : .get(node_id)
177 50095 : .copied()
178 50095 : .unwrap_or(AffinityScore::FREE),
179 50095 : az_match: AttachmentAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
180 50095 : attached_shards_in_context: context.attached_nodes.get(node_id).copied().unwrap_or(0),
181 50095 : utilization_score: utilization.cached_score(),
182 50095 : total_attached_shard_count: node.attached_shard_count,
183 50095 : node_id: *node_id,
184 50095 : })
185 50095 : }
186 :
187 50095 : fn is_overloaded(&self) -> bool {
188 50095 : PageserverUtilization::is_overloaded(self.utilization_score)
189 50095 : }
190 :
191 12537 : fn node_id(&self) -> NodeId {
192 12537 : self.node_id
193 12537 : }
194 : }
195 :
196 : /// Scheduling score of a given node for shard secondaries.
197 : /// Lower scores indicate more suitable nodes.
198 : /// Ordering is given by member declaration order (top to bottom).
199 : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
200 : pub(crate) struct NodeSecondarySchedulingScore {
201 : /// Flag indicating whether this node matches the preferred AZ
202 : /// of the shard. For secondary locations we wish to avoid nodes in.
203 : /// the preferred AZ of the shard, since that's where the attached location
204 : /// should be scheduled and having the secondary in the same AZ is bad for HA.
205 : az_match: SecondaryAzMatch,
206 : /// The number of shards belonging to the tenant currently being
207 : /// scheduled that are attached to this node.
208 : affinity_score: AffinityScore,
209 : /// Utilisation score that combines shard count and disk utilisation
210 : utilization_score: u64,
211 : /// Total number of shards attached to this node. When nodes have identical utilisation, this
212 : /// acts as an anti-affinity between attached shards.
213 : total_attached_shard_count: usize,
214 : /// Convenience to make selection deterministic in tests and empty systems
215 : node_id: NodeId,
216 : }
217 :
218 : impl NodeSchedulingScore for NodeSecondarySchedulingScore {
219 37561 : fn generate(
220 37561 : node_id: &NodeId,
221 37561 : node: &mut SchedulerNode,
222 37561 : preferred_az: &Option<AvailabilityZone>,
223 37561 : context: &ScheduleContext,
224 37561 : ) -> Option<Self> {
225 37561 : let utilization = match &mut node.may_schedule {
226 37561 : MaySchedule::Yes(u) => u,
227 : MaySchedule::No => {
228 0 : return None;
229 : }
230 : };
231 :
232 37561 : Some(Self {
233 37561 : az_match: SecondaryAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
234 37561 : affinity_score: context
235 37561 : .nodes
236 37561 : .get(node_id)
237 37561 : .copied()
238 37561 : .unwrap_or(AffinityScore::FREE),
239 37561 : utilization_score: utilization.cached_score(),
240 37561 : total_attached_shard_count: node.attached_shard_count,
241 37561 : node_id: *node_id,
242 37561 : })
243 37561 : }
244 :
245 37561 : fn is_overloaded(&self) -> bool {
246 37561 : PageserverUtilization::is_overloaded(self.utilization_score)
247 37561 : }
248 :
249 12535 : fn node_id(&self) -> NodeId {
250 12535 : self.node_id
251 12535 : }
252 : }
253 :
254 : impl PartialEq for SchedulerNode {
255 3 : fn eq(&self, other: &Self) -> bool {
256 3 : let may_schedule_matches = matches!(
257 3 : (&self.may_schedule, &other.may_schedule),
258 : (MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
259 : );
260 :
261 3 : may_schedule_matches
262 3 : && self.shard_count == other.shard_count
263 3 : && self.attached_shard_count == other.attached_shard_count
264 3 : && self.az == other.az
265 3 : }
266 : }
267 :
268 : impl Eq for SchedulerNode {}
269 :
270 : /// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
271 : /// on which to run.
272 : ///
273 : /// The type has no persistent state of its own: this is all populated at startup. The Serialize
274 : /// impl is only for debug dumps.
275 : #[derive(Serialize)]
276 : pub(crate) struct Scheduler {
277 : nodes: HashMap<NodeId, SchedulerNode>,
278 : }
279 :
280 : /// Score for soft constraint scheduling: lower scores are preferred to higher scores.
281 : ///
282 : /// For example, we may set an affinity score based on the number of shards from the same
283 : /// tenant already on a node, to implicitly prefer to balance out shards.
284 : #[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
285 : pub(crate) struct AffinityScore(pub(crate) usize);
286 :
287 : impl AffinityScore {
288 : /// If we have no anti-affinity at all toward a node, this is its score. It means
289 : /// the scheduler has a free choice amongst nodes with this score, and may pick a node
290 : /// based on other information such as total utilization.
291 : pub(crate) const FREE: Self = Self(0);
292 :
293 25142 : pub(crate) fn inc(&mut self) {
294 25142 : self.0 += 1;
295 25142 : }
296 : }
297 :
298 : impl std::ops::Add for AffinityScore {
299 : type Output = Self;
300 :
301 24 : fn add(self, rhs: Self) -> Self::Output {
302 24 : Self(self.0 + rhs.0)
303 24 : }
304 : }
305 :
306 : /// Hint for whether this is a sincere attempt to schedule, or a speculative
307 : /// check for where we _would_ schedule (done during optimization)
308 : #[derive(Debug, Clone)]
309 : pub(crate) enum ScheduleMode {
310 : Normal,
311 : Speculative,
312 : }
313 :
314 : impl Default for ScheduleMode {
315 5022 : fn default() -> Self {
316 5022 : Self::Normal
317 5022 : }
318 : }
319 :
320 : // For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
321 : // it for many shards in the same tenant.
322 : #[derive(Debug, Default, Clone)]
323 : pub(crate) struct ScheduleContext {
324 : /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
325 : pub(crate) nodes: HashMap<NodeId, AffinityScore>,
326 :
327 : /// Specifically how many _attached_ locations are on each node
328 : pub(crate) attached_nodes: HashMap<NodeId, usize>,
329 :
330 : pub(crate) mode: ScheduleMode,
331 : }
332 :
333 : impl ScheduleContext {
334 3 : pub(crate) fn new(mode: ScheduleMode) -> Self {
335 3 : Self {
336 3 : nodes: HashMap::new(),
337 3 : attached_nodes: HashMap::new(),
338 3 : mode,
339 3 : }
340 3 : }
341 :
342 : /// Input is a list of nodes we would like to avoid using again within this context. The more
343 : /// times a node is passed into this call, the less inclined we are to use it.
344 12575 : pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
345 37717 : for node_id in nodes {
346 25142 : let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
347 25142 : entry.inc()
348 : }
349 12575 : }
350 :
351 12567 : pub(crate) fn push_attached(&mut self, node_id: NodeId) {
352 12567 : let entry = self.attached_nodes.entry(node_id).or_default();
353 12567 : *entry += 1;
354 12567 : }
355 :
356 69 : pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
357 69 : self.nodes
358 69 : .get(&node_id)
359 69 : .copied()
360 69 : .unwrap_or(AffinityScore::FREE)
361 69 : }
362 :
363 69 : pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
364 69 : self.attached_nodes.get(&node_id).copied().unwrap_or(0)
365 69 : }
366 :
367 : #[cfg(test)]
368 3 : pub(crate) fn attach_count(&self) -> usize {
369 3 : self.attached_nodes.values().sum()
370 3 : }
371 : }
372 :
373 : pub(crate) enum RefCountUpdate {
374 : PromoteSecondary,
375 : Attach,
376 : Detach,
377 : DemoteAttached,
378 : AddSecondary,
379 : RemoveSecondary,
380 : }
381 :
382 : impl Scheduler {
383 62 : pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
384 62 : let mut scheduler_nodes = HashMap::new();
385 95 : for node in nodes {
386 33 : scheduler_nodes.insert(
387 33 : node.get_id(),
388 33 : SchedulerNode {
389 33 : shard_count: 0,
390 33 : attached_shard_count: 0,
391 33 : may_schedule: node.may_schedule(),
392 33 : az: node.get_availability_zone_id().clone(),
393 33 : },
394 33 : );
395 33 : }
396 :
397 62 : Self {
398 62 : nodes: scheduler_nodes,
399 62 : }
400 62 : }
401 :
402 : /// For debug/support: check that our internal statistics are in sync with the state of
403 : /// the nodes & tenant shards.
404 : ///
405 : /// If anything is inconsistent, log details and return an error.
406 1 : pub(crate) fn consistency_check<'a>(
407 1 : &self,
408 1 : nodes: impl Iterator<Item = &'a Node>,
409 1 : shards: impl Iterator<Item = &'a TenantShard>,
410 1 : ) -> anyhow::Result<()> {
411 1 : let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
412 4 : for node in nodes {
413 3 : expect_nodes.insert(
414 3 : node.get_id(),
415 3 : SchedulerNode {
416 3 : shard_count: 0,
417 3 : attached_shard_count: 0,
418 3 : may_schedule: node.may_schedule(),
419 3 : az: node.get_availability_zone_id().clone(),
420 3 : },
421 3 : );
422 3 : }
423 :
424 2 : for shard in shards {
425 1 : if let Some(node_id) = shard.intent.get_attached() {
426 1 : match expect_nodes.get_mut(node_id) {
427 1 : Some(node) => {
428 1 : node.shard_count += 1;
429 1 : node.attached_shard_count += 1;
430 1 : }
431 0 : None => anyhow::bail!(
432 0 : "Tenant {} references nonexistent node {}",
433 0 : shard.tenant_shard_id,
434 0 : node_id
435 0 : ),
436 : }
437 0 : }
438 :
439 1 : for node_id in shard.intent.get_secondary() {
440 1 : match expect_nodes.get_mut(node_id) {
441 1 : Some(node) => node.shard_count += 1,
442 0 : None => anyhow::bail!(
443 0 : "Tenant {} references nonexistent node {}",
444 0 : shard.tenant_shard_id,
445 0 : node_id
446 0 : ),
447 : }
448 : }
449 : }
450 :
451 4 : for (node_id, expect_node) in &expect_nodes {
452 3 : let Some(self_node) = self.nodes.get(node_id) else {
453 0 : anyhow::bail!("Node {node_id} not found in Self")
454 : };
455 :
456 3 : if self_node != expect_node {
457 0 : tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
458 0 : tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
459 0 : tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
460 :
461 0 : anyhow::bail!("Inconsistent state on {node_id}");
462 3 : }
463 : }
464 :
465 1 : if expect_nodes.len() != self.nodes.len() {
466 : // We just checked that all the expected nodes are present. If the lengths don't match,
467 : // it means that we have nodes in Self that are unexpected.
468 0 : for node_id in self.nodes.keys() {
469 0 : if !expect_nodes.contains_key(node_id) {
470 0 : anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
471 0 : }
472 : }
473 1 : }
474 :
475 1 : Ok(())
476 1 : }
477 :
478 : /// Update the reference counts of a node. These reference counts are used to guide scheduling
479 : /// decisions, not for memory management: they represent the number of tenant shard whose IntentState
480 : /// targets this node and the number of tenants shars whose IntentState is attached to this
481 : /// node.
482 : ///
483 : /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
484 : /// [`Self::new`] or [`Self::node_upsert`])
485 50159 : pub(crate) fn update_node_ref_counts(&mut self, node_id: NodeId, update: RefCountUpdate) {
486 50159 : let Some(node) = self.nodes.get_mut(&node_id) else {
487 0 : debug_assert!(false);
488 0 : tracing::error!("Scheduler missing node {node_id}");
489 0 : return;
490 : };
491 :
492 50159 : match update {
493 5 : RefCountUpdate::PromoteSecondary => {
494 5 : node.attached_shard_count += 1;
495 5 : }
496 12544 : RefCountUpdate::Attach => {
497 12544 : node.shard_count += 1;
498 12544 : node.attached_shard_count += 1;
499 12544 : }
500 12543 : RefCountUpdate::Detach => {
501 12543 : node.shard_count -= 1;
502 12543 : node.attached_shard_count -= 1;
503 12543 : }
504 5 : RefCountUpdate::DemoteAttached => {
505 5 : node.attached_shard_count -= 1;
506 5 : }
507 12531 : RefCountUpdate::AddSecondary => {
508 12531 : node.shard_count += 1;
509 12531 : }
510 12531 : RefCountUpdate::RemoveSecondary => {
511 12531 : node.shard_count -= 1;
512 12531 : }
513 : }
514 :
515 : // Maybe update PageserverUtilization
516 50159 : match update {
517 : RefCountUpdate::AddSecondary | RefCountUpdate::Attach => {
518 : // Referencing the node: if this takes our shard_count above the utilzation structure's
519 : // shard count, then artifically bump it: this ensures that the scheduler immediately
520 : // recognizes that this node has more work on it, without waiting for the next heartbeat
521 : // to update the utilization.
522 25075 : if let MaySchedule::Yes(utilization) = &mut node.may_schedule {
523 25075 : utilization.adjust_shard_count_max(node.shard_count as u32);
524 25075 : }
525 : }
526 : RefCountUpdate::PromoteSecondary
527 : | RefCountUpdate::Detach
528 : | RefCountUpdate::RemoveSecondary
529 25084 : | RefCountUpdate::DemoteAttached => {
530 25084 : // De-referencing the node: leave the utilization's shard_count at a stale higher
531 25084 : // value until some future heartbeat after we have physically removed this shard
532 25084 : // from the node: this prevents the scheduler over-optimistically trying to schedule
533 25084 : // more work onto the node before earlier detaches are done.
534 25084 : }
535 : }
536 50159 : }
537 :
538 : // Check if the number of shards attached to a given node is lagging below
539 : // the cluster average. If that's the case, the node should be filled.
540 0 : pub(crate) fn compute_fill_requirement(&self, node_id: NodeId) -> usize {
541 0 : let Some(node) = self.nodes.get(&node_id) else {
542 0 : debug_assert!(false);
543 0 : tracing::error!("Scheduler missing node {node_id}");
544 0 : return 0;
545 : };
546 0 : assert!(!self.nodes.is_empty());
547 0 : let expected_attached_shards_per_node = self.expected_attached_shard_count();
548 :
549 0 : for (node_id, node) in self.nodes.iter() {
550 0 : tracing::trace!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
551 : }
552 :
553 0 : if node.attached_shard_count < expected_attached_shards_per_node {
554 0 : expected_attached_shards_per_node - node.attached_shard_count
555 : } else {
556 0 : 0
557 : }
558 0 : }
559 :
560 0 : pub(crate) fn expected_attached_shard_count(&self) -> usize {
561 0 : let total_attached_shards: usize =
562 0 : self.nodes.values().map(|n| n.attached_shard_count).sum();
563 0 :
564 0 : assert!(!self.nodes.is_empty());
565 0 : total_attached_shards / self.nodes.len()
566 0 : }
567 :
568 0 : pub(crate) fn nodes_by_attached_shard_count(&self) -> Vec<(NodeId, usize)> {
569 0 : self.nodes
570 0 : .iter()
571 0 : .map(|(node_id, stats)| (*node_id, stats.attached_shard_count))
572 0 : .sorted_by(|lhs, rhs| Ord::cmp(&lhs.1, &rhs.1).reverse())
573 0 : .collect()
574 0 : }
575 :
576 211 : pub(crate) fn node_upsert(&mut self, node: &Node) {
577 : use std::collections::hash_map::Entry::*;
578 211 : match self.nodes.entry(node.get_id()) {
579 5 : Occupied(mut entry) => {
580 5 : // Updates to MaySchedule are how we receive updated PageserverUtilization: adjust these values
581 5 : // to account for any shards scheduled on the controller but not yet visible to the pageserver.
582 5 : let mut may_schedule = node.may_schedule();
583 5 : match &mut may_schedule {
584 4 : MaySchedule::Yes(utilization) => {
585 4 : utilization.adjust_shard_count_max(entry.get().shard_count as u32);
586 4 : }
587 1 : MaySchedule::No => { // Nothing to tweak
588 1 : }
589 : }
590 :
591 5 : entry.get_mut().may_schedule = may_schedule;
592 : }
593 206 : Vacant(entry) => {
594 206 : entry.insert(SchedulerNode {
595 206 : shard_count: 0,
596 206 : attached_shard_count: 0,
597 206 : may_schedule: node.may_schedule(),
598 206 : az: node.get_availability_zone_id().clone(),
599 206 : });
600 206 : }
601 : }
602 211 : }
603 :
604 0 : pub(crate) fn node_remove(&mut self, node_id: NodeId) {
605 0 : if self.nodes.remove(&node_id).is_none() {
606 0 : tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
607 0 : }
608 0 : }
609 :
610 : /// Where we have several nodes to choose from, for example when picking a secondary location
611 : /// to promote to an attached location, this method may be used to pick the best choice based
612 : /// on the scheduler's knowledge of utilization and availability.
613 : ///
614 : /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
615 : /// caller can pick a node some other way.
616 25042 : pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option<NodeId> {
617 25042 : if nodes.is_empty() {
618 25040 : return None;
619 2 : }
620 2 :
621 2 : // TODO: When the utilization score returned by the pageserver becomes meaningful,
622 2 : // schedule based on that instead of the shard count.
623 2 : let node = nodes
624 2 : .iter()
625 4 : .map(|node_id| {
626 4 : let may_schedule = self
627 4 : .nodes
628 4 : .get(node_id)
629 4 : .map(|n| !matches!(n.may_schedule, MaySchedule::No))
630 4 : .unwrap_or(false);
631 4 : (*node_id, may_schedule)
632 4 : })
633 4 : .max_by_key(|(_n, may_schedule)| *may_schedule);
634 2 :
635 2 : // If even the preferred node has may_schedule==false, return None
636 2 : node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
637 25042 : }
638 :
639 : /// Compute a schedulling score for each node that the scheduler knows of
640 : /// minus a set of hard excluded nodes.
641 25080 : fn compute_node_scores<Score>(
642 25080 : &mut self,
643 25080 : hard_exclude: &[NodeId],
644 25080 : preferred_az: &Option<AvailabilityZone>,
645 25080 : context: &ScheduleContext,
646 25080 : ) -> Vec<Score>
647 25080 : where
648 25080 : Score: NodeSchedulingScore,
649 25080 : {
650 25080 : self.nodes
651 25080 : .iter_mut()
652 100217 : .filter_map(|(k, v)| {
653 100217 : if hard_exclude.contains(k) {
654 12561 : None
655 : } else {
656 87656 : Score::generate(k, v, preferred_az, context)
657 : }
658 100217 : })
659 25080 : .collect()
660 25080 : }
661 :
662 : /// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
663 : /// are already in use by this shard -- we use this to avoid picking the same node
664 : /// as both attached and secondary location. This is a hard constraint: if we cannot
665 : /// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
666 : ///
667 : /// context: we prefer to avoid using nodes identified in the context, according
668 : /// to their anti-affinity score. We use this to prefeer to avoid placing shards in
669 : /// the same tenant on the same node. This is a soft constraint: the context will never
670 : /// cause us to fail to schedule a shard.
671 25080 : pub(crate) fn schedule_shard<Tag: ShardTag>(
672 25080 : &mut self,
673 25080 : hard_exclude: &[NodeId],
674 25080 : preferred_az: &Option<AvailabilityZone>,
675 25080 : context: &ScheduleContext,
676 25080 : ) -> Result<NodeId, ScheduleError> {
677 25080 : if self.nodes.is_empty() {
678 0 : return Err(ScheduleError::NoPageservers);
679 25080 : }
680 25080 :
681 25080 : let mut scores =
682 25080 : self.compute_node_scores::<Tag::Score>(hard_exclude, preferred_az, context);
683 25080 :
684 25080 : // Exclude nodes whose utilization is critically high, if there are alternatives available. This will
685 25080 : // cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example
686 25080 : // we may place shards in the same tenant together on the same pageserver if all other pageservers are
687 25080 : // overloaded.
688 25080 : let non_overloaded_scores = scores
689 25080 : .iter()
690 87656 : .filter(|i| !i.is_overloaded())
691 25080 : .copied()
692 25080 : .collect::<Vec<_>>();
693 25080 : if !non_overloaded_scores.is_empty() {
694 25072 : scores = non_overloaded_scores;
695 25072 : }
696 :
697 : // Sort the nodes by score. The one with the lowest scores will be the preferred node.
698 : // Refer to [`NodeAttachmentSchedulingScore`] for attached locations and
699 : // [`NodeSecondarySchedulingScore`] for secondary locations to understand how the nodes
700 : // are ranked.
701 25080 : scores.sort();
702 25080 :
703 25080 : if scores.is_empty() {
704 : // After applying constraints, no pageservers were left.
705 8 : if !matches!(context.mode, ScheduleMode::Speculative) {
706 : // If this was not a speculative attempt, log details to understand why we couldn't
707 : // schedule: this may help an engineer understand if some nodes are marked offline
708 : // in a way that's preventing progress.
709 8 : tracing::info!(
710 0 : "Scheduling failure, while excluding {hard_exclude:?}, node states:"
711 : );
712 24 : for (node_id, node) in &self.nodes {
713 16 : tracing::info!(
714 0 : "Node {node_id}: may_schedule={} shards={}",
715 0 : !matches!(node.may_schedule, MaySchedule::No),
716 : node.shard_count
717 : );
718 : }
719 0 : }
720 8 : return Err(ScheduleError::ImpossibleConstraint);
721 25072 : }
722 25072 :
723 25072 : // Lowest score wins
724 25072 : let node_id = scores.first().unwrap().node_id();
725 :
726 25072 : if !matches!(context.mode, ScheduleMode::Speculative) {
727 25072 : tracing::info!(
728 0 : "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
729 0 : scores.iter().map(|i| i.node_id().0).collect::<Vec<_>>()
730 : );
731 0 : }
732 :
733 : // Note that we do not update shard count here to reflect the scheduling: that
734 : // is IntentState's job when the scheduled location is used.
735 :
736 25072 : Ok(node_id)
737 25080 : }
738 :
739 : /// Selects any available node. This is suitable for performing background work (e.g. S3
740 : /// deletions).
741 0 : pub(crate) fn any_available_node(&mut self) -> Result<NodeId, ScheduleError> {
742 0 : self.schedule_shard::<AttachedShardTag>(&[], &None, &ScheduleContext::default())
743 0 : }
744 :
745 : /// For choosing which AZ to schedule a new shard into, use this. It will return the
746 : /// AZ with the lowest median utilization.
747 : ///
748 : /// We use an AZ-wide measure rather than simply selecting the AZ of the least-loaded
749 : /// node, because while tenants start out single sharded, when they grow and undergo
750 : /// shard-split, they will occupy space on many nodes within an AZ.
751 : ///
752 : /// We use median rather than total free space or mean utilization, because
753 : /// we wish to avoid preferring AZs that have low-load nodes resulting from
754 : /// recent replacements.
755 : ///
756 : /// The practical result is that we will pick an AZ based on its median node, and
757 : /// then actually _schedule_ the new shard onto the lowest-loaded node in that AZ.
758 3 : pub(crate) fn get_az_for_new_tenant(&self) -> Option<AvailabilityZone> {
759 3 : if self.nodes.is_empty() {
760 0 : return None;
761 3 : }
762 3 :
763 3 : let mut scores_by_az = HashMap::new();
764 21 : for (node_id, node) in &self.nodes {
765 18 : let az_scores = scores_by_az.entry(&node.az).or_insert_with(Vec::new);
766 18 : let score = match &node.may_schedule {
767 18 : MaySchedule::Yes(utilization) => utilization.score(),
768 0 : MaySchedule::No => PageserverUtilization::full().score(),
769 : };
770 18 : az_scores.push((node_id, node, score));
771 : }
772 :
773 : // Sort by utilization. Also include the node ID to break ties.
774 6 : for scores in scores_by_az.values_mut() {
775 34 : scores.sort_by_key(|i| (i.2, i.0));
776 6 : }
777 :
778 3 : let mut median_by_az = scores_by_az
779 3 : .iter()
780 6 : .map(|(az, nodes)| (*az, nodes.get(nodes.len() / 2).unwrap().2))
781 3 : .collect::<Vec<_>>();
782 3 : // Sort by utilization. Also include the AZ to break ties.
783 6 : median_by_az.sort_by_key(|i| (i.1, i.0));
784 3 :
785 3 : // Return the AZ with the lowest median utilization
786 3 : Some(median_by_az.first().unwrap().0.clone())
787 3 : }
788 :
789 : /// Unit test access to internal state
790 : #[cfg(test)]
791 12 : pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
792 12 : self.nodes.get(&node_id).unwrap().shard_count
793 12 : }
794 :
795 : #[cfg(test)]
796 12 : pub(crate) fn get_node_attached_shard_count(&self, node_id: NodeId) -> usize {
797 12 : self.nodes.get(&node_id).unwrap().attached_shard_count
798 12 : }
799 : }
800 :
801 : #[cfg(test)]
802 : pub(crate) mod test_utils {
803 :
804 : use crate::node::Node;
805 : use pageserver_api::{
806 : controller_api::{AvailabilityZone, NodeAvailability},
807 : models::utilization::test_utilization,
808 : };
809 : use std::collections::HashMap;
810 : use utils::id::NodeId;
811 :
812 : /// Test helper: synthesize the requested number of nodes, all in active state.
813 : ///
814 : /// Node IDs start at one.
815 : ///
816 : /// The `azs` argument specifies the list of availability zones which will be assigned
817 : /// to nodes in round-robin fashion. If empy, a default AZ is assigned.
818 62 : pub(crate) fn make_test_nodes(n: u64, azs: &[AvailabilityZone]) -> HashMap<NodeId, Node> {
819 62 : let mut az_iter = azs.iter().cycle();
820 62 :
821 62 : (1..n + 1)
822 239 : .map(|i| {
823 239 : (NodeId(i), {
824 239 : let mut node = Node::new(
825 239 : NodeId(i),
826 239 : format!("httphost-{i}"),
827 239 : 80 + i as u16,
828 239 : format!("pghost-{i}"),
829 239 : 5432 + i as u16,
830 239 : az_iter
831 239 : .next()
832 239 : .cloned()
833 239 : .unwrap_or(AvailabilityZone("test-az".to_string())),
834 239 : );
835 239 : node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
836 239 : assert!(node.is_available());
837 239 : node
838 239 : })
839 239 : })
840 62 : .collect()
841 62 : }
842 : }
843 :
844 : #[cfg(test)]
845 : mod tests {
846 : use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
847 :
848 : use super::*;
849 :
850 : use crate::tenant_shard::IntentState;
851 : #[test]
852 1 : fn scheduler_basic() -> anyhow::Result<()> {
853 1 : let nodes = test_utils::make_test_nodes(2, &[]);
854 1 :
855 1 : let mut scheduler = Scheduler::new(nodes.values());
856 1 : let mut t1_intent = IntentState::new();
857 1 : let mut t2_intent = IntentState::new();
858 1 :
859 1 : let context = ScheduleContext::default();
860 :
861 1 : let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
862 1 : t1_intent.set_attached(&mut scheduler, Some(scheduled));
863 1 : let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
864 1 : t2_intent.set_attached(&mut scheduler, Some(scheduled));
865 1 :
866 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
867 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
868 :
869 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
870 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
871 :
872 1 : let scheduled = scheduler.schedule_shard::<AttachedShardTag>(
873 1 : &t1_intent.all_pageservers(),
874 1 : &None,
875 1 : &context,
876 1 : )?;
877 1 : t1_intent.push_secondary(&mut scheduler, scheduled);
878 1 :
879 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
880 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
881 :
882 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
883 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
884 :
885 1 : t1_intent.clear(&mut scheduler);
886 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
887 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
888 :
889 1 : let total_attached = scheduler.get_node_attached_shard_count(NodeId(1))
890 1 : + scheduler.get_node_attached_shard_count(NodeId(2));
891 1 : assert_eq!(total_attached, 1);
892 :
893 1 : if cfg!(debug_assertions) {
894 : // Dropping an IntentState without clearing it causes a panic in debug mode,
895 : // because we have failed to properly update scheduler shard counts.
896 1 : let result = std::panic::catch_unwind(move || {
897 1 : drop(t2_intent);
898 1 : });
899 1 : assert!(result.is_err());
900 : } else {
901 0 : t2_intent.clear(&mut scheduler);
902 0 :
903 0 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
904 0 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 0);
905 :
906 0 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 0);
907 0 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0);
908 : }
909 :
910 1 : Ok(())
911 1 : }
912 :
913 : #[test]
914 : /// Test the PageserverUtilization's contribution to scheduling algorithm
915 1 : fn scheduler_utilization() {
916 1 : let mut nodes = test_utils::make_test_nodes(3, &[]);
917 1 : let mut scheduler = Scheduler::new(nodes.values());
918 1 :
919 1 : // Need to keep these alive because they contribute to shard counts via RAII
920 1 : let mut scheduled_intents = Vec::new();
921 1 :
922 1 : let empty_context = ScheduleContext::default();
923 :
924 11 : fn assert_scheduler_chooses(
925 11 : expect_node: NodeId,
926 11 : scheduled_intents: &mut Vec<IntentState>,
927 11 : scheduler: &mut Scheduler,
928 11 : context: &ScheduleContext,
929 11 : ) {
930 11 : let scheduled = scheduler
931 11 : .schedule_shard::<AttachedShardTag>(&[], &None, context)
932 11 : .unwrap();
933 11 : let mut intent = IntentState::new();
934 11 : intent.set_attached(scheduler, Some(scheduled));
935 11 : scheduled_intents.push(intent);
936 11 : assert_eq!(scheduled, expect_node);
937 11 : }
938 :
939 : // Independent schedule calls onto empty nodes should round-robin, because each node's
940 : // utilization's shard count is updated inline. The order is determinsitic because when all other factors are
941 : // equal, we order by node ID.
942 1 : assert_scheduler_chooses(
943 1 : NodeId(1),
944 1 : &mut scheduled_intents,
945 1 : &mut scheduler,
946 1 : &empty_context,
947 1 : );
948 1 : assert_scheduler_chooses(
949 1 : NodeId(2),
950 1 : &mut scheduled_intents,
951 1 : &mut scheduler,
952 1 : &empty_context,
953 1 : );
954 1 : assert_scheduler_chooses(
955 1 : NodeId(3),
956 1 : &mut scheduled_intents,
957 1 : &mut scheduler,
958 1 : &empty_context,
959 1 : );
960 1 :
961 1 : // Manually setting utilization higher should cause schedule calls to round-robin the other nodes
962 1 : // which have equal utilization.
963 1 : nodes
964 1 : .get_mut(&NodeId(1))
965 1 : .unwrap()
966 1 : .set_availability(NodeAvailability::Active(test_utilization::simple(
967 1 : 10,
968 1 : 1024 * 1024 * 1024,
969 1 : )));
970 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
971 1 :
972 1 : assert_scheduler_chooses(
973 1 : NodeId(2),
974 1 : &mut scheduled_intents,
975 1 : &mut scheduler,
976 1 : &empty_context,
977 1 : );
978 1 : assert_scheduler_chooses(
979 1 : NodeId(3),
980 1 : &mut scheduled_intents,
981 1 : &mut scheduler,
982 1 : &empty_context,
983 1 : );
984 1 : assert_scheduler_chooses(
985 1 : NodeId(2),
986 1 : &mut scheduled_intents,
987 1 : &mut scheduler,
988 1 : &empty_context,
989 1 : );
990 1 : assert_scheduler_chooses(
991 1 : NodeId(3),
992 1 : &mut scheduled_intents,
993 1 : &mut scheduler,
994 1 : &empty_context,
995 1 : );
996 1 :
997 1 : // The scheduler should prefer nodes with lower affinity score,
998 1 : // even if they have higher utilization (as long as they aren't utilized at >100%)
999 1 : let mut context_prefer_node1 = ScheduleContext::default();
1000 1 : context_prefer_node1.avoid(&[NodeId(2), NodeId(3)]);
1001 1 : assert_scheduler_chooses(
1002 1 : NodeId(1),
1003 1 : &mut scheduled_intents,
1004 1 : &mut scheduler,
1005 1 : &context_prefer_node1,
1006 1 : );
1007 1 : assert_scheduler_chooses(
1008 1 : NodeId(1),
1009 1 : &mut scheduled_intents,
1010 1 : &mut scheduler,
1011 1 : &context_prefer_node1,
1012 1 : );
1013 1 :
1014 1 : // If a node is over-utilized, it will not be used even if affinity scores prefer it
1015 1 : nodes
1016 1 : .get_mut(&NodeId(1))
1017 1 : .unwrap()
1018 1 : .set_availability(NodeAvailability::Active(test_utilization::simple(
1019 1 : 20000,
1020 1 : 1024 * 1024 * 1024,
1021 1 : )));
1022 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1023 1 : assert_scheduler_chooses(
1024 1 : NodeId(2),
1025 1 : &mut scheduled_intents,
1026 1 : &mut scheduler,
1027 1 : &context_prefer_node1,
1028 1 : );
1029 1 : assert_scheduler_chooses(
1030 1 : NodeId(3),
1031 1 : &mut scheduled_intents,
1032 1 : &mut scheduler,
1033 1 : &context_prefer_node1,
1034 1 : );
1035 :
1036 12 : for mut intent in scheduled_intents {
1037 11 : intent.clear(&mut scheduler);
1038 11 : }
1039 1 : }
1040 :
1041 : #[test]
1042 : /// A simple test that showcases AZ-aware scheduling and its interaction with
1043 : /// affinity scores.
1044 1 : fn az_scheduling() {
1045 1 : let az_a_tag = AvailabilityZone("az-a".to_string());
1046 1 : let az_b_tag = AvailabilityZone("az-b".to_string());
1047 1 :
1048 1 : let nodes = test_utils::make_test_nodes(3, &[az_a_tag.clone(), az_b_tag.clone()]);
1049 1 : let mut scheduler = Scheduler::new(nodes.values());
1050 1 :
1051 1 : // Need to keep these alive because they contribute to shard counts via RAII
1052 1 : let mut scheduled_intents = Vec::new();
1053 1 :
1054 1 : let mut context = ScheduleContext::default();
1055 :
1056 6 : fn assert_scheduler_chooses<Tag: ShardTag>(
1057 6 : expect_node: NodeId,
1058 6 : preferred_az: Option<AvailabilityZone>,
1059 6 : scheduled_intents: &mut Vec<IntentState>,
1060 6 : scheduler: &mut Scheduler,
1061 6 : context: &mut ScheduleContext,
1062 6 : ) {
1063 6 : let scheduled = scheduler
1064 6 : .schedule_shard::<Tag>(&[], &preferred_az, context)
1065 6 : .unwrap();
1066 6 : let mut intent = IntentState::new();
1067 6 : intent.set_attached(scheduler, Some(scheduled));
1068 6 : scheduled_intents.push(intent);
1069 6 : assert_eq!(scheduled, expect_node);
1070 :
1071 6 : context.avoid(&[scheduled]);
1072 6 : }
1073 :
1074 1 : assert_scheduler_chooses::<AttachedShardTag>(
1075 1 : NodeId(1),
1076 1 : Some(az_a_tag.clone()),
1077 1 : &mut scheduled_intents,
1078 1 : &mut scheduler,
1079 1 : &mut context,
1080 1 : );
1081 1 :
1082 1 : // Node 2 and 3 have affinity score equal to 0, but node 3
1083 1 : // is in "az-a" so we prefer that.
1084 1 : assert_scheduler_chooses::<AttachedShardTag>(
1085 1 : NodeId(3),
1086 1 : Some(az_a_tag.clone()),
1087 1 : &mut scheduled_intents,
1088 1 : &mut scheduler,
1089 1 : &mut context,
1090 1 : );
1091 1 :
1092 1 : // Node 2 is not in "az-a", but it has the lowest affinity so we prefer that.
1093 1 : assert_scheduler_chooses::<AttachedShardTag>(
1094 1 : NodeId(2),
1095 1 : Some(az_a_tag.clone()),
1096 1 : &mut scheduled_intents,
1097 1 : &mut scheduler,
1098 1 : &mut context,
1099 1 : );
1100 1 :
1101 1 : // Avoid nodes in "az-a" for the secondary location.
1102 1 : assert_scheduler_chooses::<SecondaryShardTag>(
1103 1 : NodeId(2),
1104 1 : Some(az_a_tag.clone()),
1105 1 : &mut scheduled_intents,
1106 1 : &mut scheduler,
1107 1 : &mut context,
1108 1 : );
1109 1 :
1110 1 : // Avoid nodes in "az-b" for the secondary location.
1111 1 : // Nodes 1 and 3 are identically loaded, so prefer the lowest node id.
1112 1 : assert_scheduler_chooses::<SecondaryShardTag>(
1113 1 : NodeId(1),
1114 1 : Some(az_b_tag.clone()),
1115 1 : &mut scheduled_intents,
1116 1 : &mut scheduler,
1117 1 : &mut context,
1118 1 : );
1119 1 :
1120 1 : // Avoid nodes in "az-b" for the secondary location.
1121 1 : // Node 3 has lower affinity score than 1, so prefer that.
1122 1 : assert_scheduler_chooses::<SecondaryShardTag>(
1123 1 : NodeId(3),
1124 1 : Some(az_b_tag.clone()),
1125 1 : &mut scheduled_intents,
1126 1 : &mut scheduler,
1127 1 : &mut context,
1128 1 : );
1129 :
1130 7 : for mut intent in scheduled_intents {
1131 6 : intent.clear(&mut scheduler);
1132 6 : }
1133 1 : }
1134 :
1135 : #[test]
1136 1 : fn az_scheduling_for_new_tenant() {
1137 1 : let az_a_tag = AvailabilityZone("az-a".to_string());
1138 1 : let az_b_tag = AvailabilityZone("az-b".to_string());
1139 1 : let nodes = test_utils::make_test_nodes(
1140 1 : 6,
1141 1 : &[
1142 1 : az_a_tag.clone(),
1143 1 : az_a_tag.clone(),
1144 1 : az_a_tag.clone(),
1145 1 : az_b_tag.clone(),
1146 1 : az_b_tag.clone(),
1147 1 : az_b_tag.clone(),
1148 1 : ],
1149 1 : );
1150 1 :
1151 1 : let mut scheduler = Scheduler::new(nodes.values());
1152 :
1153 : /// Force the utilization of a node in Scheduler's state to a particular
1154 : /// number of bytes used.
1155 2 : fn set_utilization(scheduler: &mut Scheduler, node_id: NodeId, shard_count: u32) {
1156 2 : let mut node = Node::new(
1157 2 : node_id,
1158 2 : "".to_string(),
1159 2 : 0,
1160 2 : "".to_string(),
1161 2 : 0,
1162 2 : scheduler.nodes.get(&node_id).unwrap().az.clone(),
1163 2 : );
1164 2 : node.set_availability(NodeAvailability::Active(test_utilization::simple(
1165 2 : shard_count,
1166 2 : 0,
1167 2 : )));
1168 2 : scheduler.node_upsert(&node);
1169 2 : }
1170 :
1171 : // Initial empty state. Scores are tied, scheduler prefers lower AZ ID.
1172 1 : assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone()));
1173 :
1174 : // Put some utilization on one node in AZ A: this should change nothing, as the median hasn't changed
1175 1 : set_utilization(&mut scheduler, NodeId(1), 1000000);
1176 1 : assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone()));
1177 :
1178 : // Put some utilization on a second node in AZ A: now the median has changed, so the scheduler
1179 : // should prefer the other AZ.
1180 1 : set_utilization(&mut scheduler, NodeId(2), 1000000);
1181 1 : assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_b_tag.clone()));
1182 1 : }
1183 : }
|