Line data Source code
1 : use std::collections::HashMap;
2 : use std::fmt::Debug;
3 :
4 : use http_utils::error::ApiError;
5 : use itertools::Itertools;
6 : use pageserver_api::controller_api::AvailabilityZone;
7 : use pageserver_api::models::PageserverUtilization;
8 : use serde::Serialize;
9 : use utils::id::NodeId;
10 :
11 : use crate::metrics::NodeLabelGroup;
12 : use crate::node::Node;
13 : use crate::tenant_shard::TenantShard;
14 :
15 : /// Scenarios in which we cannot find a suitable location for a tenant shard
16 : #[derive(thiserror::Error, Debug)]
17 : pub enum ScheduleError {
18 : #[error("No pageservers found")]
19 : NoPageservers,
20 : #[error("No pageserver found matching constraint")]
21 : ImpossibleConstraint,
22 : }
23 :
24 : impl From<ScheduleError> for ApiError {
25 0 : fn from(value: ScheduleError) -> Self {
26 0 : ApiError::Conflict(format!("Scheduling error: {}", value))
27 0 : }
28 : }
29 :
30 : #[derive(Serialize)]
31 : pub enum MaySchedule {
32 : Yes(PageserverUtilization),
33 : No,
34 : }
35 :
36 : #[derive(Serialize)]
37 : pub(crate) struct SchedulerNode {
38 : /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
39 : shard_count: usize,
40 : /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
41 : attached_shard_count: usize,
42 : /// How many shards have a location on this node (via [`crate::tenant_shard::IntentState`]) _and_ this node
43 : /// is in their preferred AZ (i.e. this is their 'home' location)
44 : home_shard_count: usize,
45 : /// Availability zone id in which the node resides
46 : az: AvailabilityZone,
47 :
48 : /// Whether this node is currently elegible to have new shards scheduled (this is derived
49 : /// from a node's availability state and scheduling policy).
50 : may_schedule: MaySchedule,
51 : }
52 :
53 : pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized {
54 : fn generate(
55 : node_id: &NodeId,
56 : node: &mut SchedulerNode,
57 : preferred_az: &Option<AvailabilityZone>,
58 : context: &ScheduleContext,
59 : ) -> Option<Self>;
60 :
61 : /// Return a score that drops any components based on node utilization: this is useful
62 : /// for finding scores for scheduling optimisation, when we want to avoid rescheduling
63 : /// shards due to e.g. disk usage, to avoid flapping.
64 : fn for_optimization(&self) -> Self;
65 :
66 : fn is_overloaded(&self) -> bool;
67 : fn node_id(&self) -> NodeId;
68 : }
69 :
70 : pub(crate) trait ShardTag {
71 : type Score: NodeSchedulingScore;
72 : }
73 :
74 : pub(crate) struct AttachedShardTag {}
75 : impl ShardTag for AttachedShardTag {
76 : type Score = NodeAttachmentSchedulingScore;
77 : }
78 :
79 : pub(crate) struct SecondaryShardTag {}
80 : impl ShardTag for SecondaryShardTag {
81 : type Score = NodeSecondarySchedulingScore;
82 : }
83 :
84 : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
85 : enum AzMatch {
86 : Yes,
87 : No,
88 : Unknown,
89 : }
90 :
91 : impl AzMatch {
92 91487 : fn new(node_az: &AvailabilityZone, shard_preferred_az: Option<&AvailabilityZone>) -> Self {
93 91315 : match shard_preferred_az {
94 91315 : Some(preferred_az) if preferred_az == node_az => Self::Yes,
95 52686 : Some(_preferred_az) => Self::No,
96 172 : None => Self::Unknown,
97 : }
98 91487 : }
99 : }
100 :
101 : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
102 : struct AttachmentAzMatch(AzMatch);
103 :
104 : impl Ord for AttachmentAzMatch {
105 65399 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
106 65399 : // Lower scores indicate a more suitable node.
107 65399 : // Note that we prefer a node for which we don't have
108 65399 : // info to a node which we are certain doesn't match the
109 65399 : // preferred AZ of the shard.
110 130798 : let az_match_score = |az_match: &AzMatch| match az_match {
111 64280 : AzMatch::Yes => 0,
112 162 : AzMatch::Unknown => 1,
113 66356 : AzMatch::No => 2,
114 130798 : };
115 :
116 65399 : az_match_score(&self.0).cmp(&az_match_score(&other.0))
117 65399 : }
118 : }
119 :
120 : impl PartialOrd for AttachmentAzMatch {
121 65399 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
122 65399 : Some(self.cmp(other))
123 65399 : }
124 : }
125 :
126 : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
127 : struct SecondaryAzMatch(AzMatch);
128 :
129 : impl Ord for SecondaryAzMatch {
130 35455 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
131 35455 : // Lower scores indicate a more suitable node.
132 35455 : // For secondary locations we wish to avoid the preferred AZ
133 35455 : // of the shard.
134 70910 : let az_match_score = |az_match: &AzMatch| match az_match {
135 49329 : AzMatch::No => 0,
136 24 : AzMatch::Unknown => 1,
137 21557 : AzMatch::Yes => 2,
138 70910 : };
139 :
140 35455 : az_match_score(&self.0).cmp(&az_match_score(&other.0))
141 35455 : }
142 : }
143 :
144 : impl PartialOrd for SecondaryAzMatch {
145 35451 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
146 35451 : Some(self.cmp(other))
147 35451 : }
148 : }
149 :
150 : /// Scheduling score of a given node for shard attachments.
151 : /// Lower scores indicate more suitable nodes.
152 : /// Ordering is given by member declaration order (top to bottom).
153 : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
154 : pub(crate) struct NodeAttachmentSchedulingScore {
155 : /// Flag indicating whether this node matches the preferred AZ
156 : /// of the shard. For equal affinity scores, nodes in the matching AZ
157 : /// are considered first.
158 : az_match: AttachmentAzMatch,
159 : /// The number of shards belonging to the tenant currently being
160 : /// scheduled that are attached to this node.
161 : affinity_score: AffinityScore,
162 : /// Utilisation score that combines shard count and disk utilisation
163 : utilization_score: u64,
164 : /// Total number of shards attached to this node. When nodes have identical utilisation, this
165 : /// acts as an anti-affinity between attached shards.
166 : total_attached_shard_count: usize,
167 : /// Convenience to make selection deterministic in tests and empty systems
168 : node_id: NodeId,
169 : }
170 :
171 : impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
172 52291 : fn generate(
173 52291 : node_id: &NodeId,
174 52291 : node: &mut SchedulerNode,
175 52291 : preferred_az: &Option<AvailabilityZone>,
176 52291 : context: &ScheduleContext,
177 52291 : ) -> Option<Self> {
178 52291 : let utilization = match &mut node.may_schedule {
179 52289 : MaySchedule::Yes(u) => u,
180 : MaySchedule::No => {
181 2 : return None;
182 : }
183 : };
184 :
185 52289 : Some(Self {
186 52289 : affinity_score: context
187 52289 : .nodes
188 52289 : .get(node_id)
189 52289 : .copied()
190 52289 : .unwrap_or(AffinityScore::FREE),
191 52289 : az_match: AttachmentAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
192 52289 : utilization_score: utilization.cached_score(),
193 52289 : total_attached_shard_count: node.attached_shard_count,
194 52289 : node_id: *node_id,
195 52289 : })
196 52291 : }
197 :
198 : /// For use in scheduling optimisation, where we only want to consider the aspects
199 : /// of the score that can only be resolved by moving things (such as inter-shard affinity
200 : /// and AZ affinity), and ignore aspects that reflect the total utilization of a node (which
201 : /// can fluctuate for other reasons)
202 118 : fn for_optimization(&self) -> Self {
203 118 : Self {
204 118 : utilization_score: 0,
205 118 : total_attached_shard_count: 0,
206 118 : node_id: NodeId(0),
207 118 : ..*self
208 118 : }
209 118 : }
210 :
211 52170 : fn is_overloaded(&self) -> bool {
212 52170 : PageserverUtilization::is_overloaded(self.utilization_score)
213 52170 : }
214 :
215 12880 : fn node_id(&self) -> NodeId {
216 12880 : self.node_id
217 12880 : }
218 : }
219 :
220 : /// Scheduling score of a given node for shard secondaries.
221 : /// Lower scores indicate more suitable nodes.
222 : /// Ordering is given by member declaration order (top to bottom).
223 : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
224 : pub(crate) struct NodeSecondarySchedulingScore {
225 : /// Flag indicating whether this node matches the preferred AZ
226 : /// of the shard. For secondary locations we wish to avoid nodes in.
227 : /// the preferred AZ of the shard, since that's where the attached location
228 : /// should be scheduled and having the secondary in the same AZ is bad for HA.
229 : az_match: SecondaryAzMatch,
230 : /// The number of shards belonging to the tenant currently being
231 : /// scheduled that are attached to this node.
232 : affinity_score: AffinityScore,
233 : /// Utilisation score that combines shard count and disk utilisation
234 : utilization_score: u64,
235 : /// Anti-affinity with other non-home locations: this gives the behavior that secondaries
236 : /// will spread out across the nodes in an AZ.
237 : total_non_home_shard_count: usize,
238 : /// Convenience to make selection deterministic in tests and empty systems
239 : node_id: NodeId,
240 : }
241 :
242 : impl NodeSchedulingScore for NodeSecondarySchedulingScore {
243 39199 : fn generate(
244 39199 : node_id: &NodeId,
245 39199 : node: &mut SchedulerNode,
246 39199 : preferred_az: &Option<AvailabilityZone>,
247 39199 : context: &ScheduleContext,
248 39199 : ) -> Option<Self> {
249 39199 : let utilization = match &mut node.may_schedule {
250 39198 : MaySchedule::Yes(u) => u,
251 : MaySchedule::No => {
252 1 : return None;
253 : }
254 : };
255 :
256 39198 : Some(Self {
257 39198 : az_match: SecondaryAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
258 39198 : affinity_score: context
259 39198 : .nodes
260 39198 : .get(node_id)
261 39198 : .copied()
262 39198 : .unwrap_or(AffinityScore::FREE),
263 39198 : utilization_score: utilization.cached_score(),
264 39198 : total_non_home_shard_count: (node.shard_count - node.home_shard_count),
265 39198 : node_id: *node_id,
266 39198 : })
267 39199 : }
268 :
269 14 : fn for_optimization(&self) -> Self {
270 14 : Self {
271 14 : utilization_score: 0,
272 14 : total_non_home_shard_count: 0,
273 14 : node_id: NodeId(0),
274 14 : ..*self
275 14 : }
276 14 : }
277 :
278 39173 : fn is_overloaded(&self) -> bool {
279 39173 : PageserverUtilization::is_overloaded(self.utilization_score)
280 39173 : }
281 :
282 12847 : fn node_id(&self) -> NodeId {
283 12847 : self.node_id
284 12847 : }
285 : }
286 :
287 : impl PartialEq for SchedulerNode {
288 3 : fn eq(&self, other: &Self) -> bool {
289 3 : let may_schedule_matches = matches!(
290 3 : (&self.may_schedule, &other.may_schedule),
291 : (MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
292 : );
293 :
294 3 : may_schedule_matches
295 3 : && self.shard_count == other.shard_count
296 3 : && self.attached_shard_count == other.attached_shard_count
297 3 : && self.az == other.az
298 3 : }
299 : }
300 :
301 : impl Eq for SchedulerNode {}
302 :
303 : /// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
304 : /// on which to run.
305 : ///
306 : /// The type has no persistent state of its own: this is all populated at startup. The Serialize
307 : /// impl is only for debug dumps.
308 : #[derive(Serialize)]
309 : pub(crate) struct Scheduler {
310 : nodes: HashMap<NodeId, SchedulerNode>,
311 : }
312 :
313 : /// Score for soft constraint scheduling: lower scores are preferred to higher scores.
314 : ///
315 : /// For example, we may set an affinity score based on the number of shards from the same
316 : /// tenant already on a node, to implicitly prefer to balance out shards.
317 : #[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
318 : pub(crate) struct AffinityScore(pub(crate) usize);
319 :
320 : impl AffinityScore {
321 : /// If we have no anti-affinity at all toward a node, this is its score. It means
322 : /// the scheduler has a free choice amongst nodes with this score, and may pick a node
323 : /// based on other information such as total utilization.
324 : pub(crate) const FREE: Self = Self(0);
325 :
326 25783 : pub(crate) fn inc(&mut self) {
327 25783 : self.0 += 1;
328 25783 : }
329 :
330 126 : pub(crate) fn dec(&mut self) {
331 126 : self.0 -= 1;
332 126 : }
333 : }
334 :
335 : impl std::ops::Add for AffinityScore {
336 : type Output = Self;
337 :
338 0 : fn add(self, rhs: Self) -> Self::Output {
339 0 : Self(self.0 + rhs.0)
340 0 : }
341 : }
342 :
343 : /// Hint for whether this is a sincere attempt to schedule, or a speculative
344 : /// check for where we _would_ schedule (done during optimization)
345 : #[derive(Debug, Clone)]
346 : pub(crate) enum ScheduleMode {
347 : Normal,
348 : Speculative,
349 : }
350 :
351 : impl Default for ScheduleMode {
352 5334 : fn default() -> Self {
353 5334 : Self::Normal
354 5334 : }
355 : }
356 :
357 : // For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
358 : // it for many shards in the same tenant.
359 : #[derive(Debug, Default, Clone)]
360 : pub(crate) struct ScheduleContext {
361 : /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
362 : pub(crate) nodes: HashMap<NodeId, AffinityScore>,
363 :
364 : pub(crate) mode: ScheduleMode,
365 : }
366 :
367 : impl ScheduleContext {
368 3 : pub(crate) fn new(mode: ScheduleMode) -> Self {
369 3 : Self {
370 3 : nodes: HashMap::new(),
371 3 : mode,
372 3 : }
373 3 : }
374 :
375 : /// Input is a list of nodes we would like to avoid using again within this context. The more
376 : /// times a node is passed into this call, the less inclined we are to use it.
377 12900 : pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
378 38683 : for node_id in nodes {
379 25783 : let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
380 25783 : entry.inc()
381 : }
382 12900 : }
383 :
384 : /// Remove `shard`'s contributions to this context. This is useful when considering scheduling
385 : /// this shard afresh, where we don't want it to e.g. experience anti-affinity to its current location.
386 63 : pub(crate) fn project_detach(&self, shard: &TenantShard) -> Self {
387 63 : let mut new_context = self.clone();
388 :
389 63 : if let Some(attached) = shard.intent.get_attached() {
390 60 : if let Some(score) = new_context.nodes.get_mut(attached) {
391 60 : score.dec();
392 60 : }
393 3 : }
394 :
395 68 : for secondary in shard.intent.get_secondary() {
396 68 : if let Some(score) = new_context.nodes.get_mut(secondary) {
397 66 : score.dec();
398 66 : }
399 : }
400 :
401 63 : new_context
402 63 : }
403 :
404 : /// For test, track the sum of AffinityScore values, which is effectively how many
405 : /// attached or secondary locations have been registered with this context.
406 : #[cfg(test)]
407 3 : pub(crate) fn location_count(&self) -> usize {
408 7 : self.nodes.values().map(|i| i.0).sum()
409 3 : }
410 : }
411 :
412 : pub(crate) enum RefCountUpdate<'a> {
413 : PromoteSecondary,
414 : Attach,
415 : Detach,
416 : DemoteAttached,
417 : AddSecondary,
418 : RemoveSecondary,
419 : ChangePreferredAzFrom(Option<&'a AvailabilityZone>),
420 : }
421 :
422 : impl Scheduler {
423 70 : pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
424 70 : let mut scheduler_nodes = HashMap::new();
425 137 : for node in nodes {
426 67 : scheduler_nodes.insert(
427 67 : node.get_id(),
428 67 : SchedulerNode {
429 67 : shard_count: 0,
430 67 : attached_shard_count: 0,
431 67 : home_shard_count: 0,
432 67 : may_schedule: node.may_schedule(),
433 67 : az: node.get_availability_zone_id().clone(),
434 67 : },
435 67 : );
436 67 : }
437 :
438 70 : Self {
439 70 : nodes: scheduler_nodes,
440 70 : }
441 70 : }
442 :
443 : /// For debug/support: check that our internal statistics are in sync with the state of
444 : /// the nodes & tenant shards.
445 : ///
446 : /// If anything is inconsistent, log details and return an error.
447 1 : pub(crate) fn consistency_check<'a>(
448 1 : &self,
449 1 : nodes: impl Iterator<Item = &'a Node>,
450 1 : shards: impl Iterator<Item = &'a TenantShard>,
451 1 : ) -> anyhow::Result<()> {
452 1 : let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
453 4 : for node in nodes {
454 3 : expect_nodes.insert(
455 3 : node.get_id(),
456 3 : SchedulerNode {
457 3 : shard_count: 0,
458 3 : attached_shard_count: 0,
459 3 : home_shard_count: 0,
460 3 : may_schedule: node.may_schedule(),
461 3 : az: node.get_availability_zone_id().clone(),
462 3 : },
463 3 : );
464 3 : }
465 :
466 2 : for shard in shards {
467 1 : if let Some(node_id) = shard.intent.get_attached() {
468 1 : match expect_nodes.get_mut(node_id) {
469 1 : Some(node) => {
470 1 : node.shard_count += 1;
471 1 : node.attached_shard_count += 1;
472 1 : if Some(&node.az) == shard.preferred_az() {
473 0 : node.home_shard_count += 1;
474 1 : }
475 : }
476 0 : None => anyhow::bail!(
477 0 : "Tenant {} references nonexistent node {}",
478 0 : shard.tenant_shard_id,
479 0 : node_id
480 0 : ),
481 : }
482 0 : }
483 :
484 1 : for node_id in shard.intent.get_secondary() {
485 1 : match expect_nodes.get_mut(node_id) {
486 1 : Some(node) => {
487 1 : node.shard_count += 1;
488 1 : if Some(&node.az) == shard.preferred_az() {
489 0 : node.home_shard_count += 1;
490 1 : }
491 : }
492 0 : None => anyhow::bail!(
493 0 : "Tenant {} references nonexistent node {}",
494 0 : shard.tenant_shard_id,
495 0 : node_id
496 0 : ),
497 : }
498 : }
499 : }
500 :
501 4 : for (node_id, expect_node) in &expect_nodes {
502 3 : let Some(self_node) = self.nodes.get(node_id) else {
503 0 : anyhow::bail!("Node {node_id} not found in Self")
504 : };
505 :
506 3 : if self_node != expect_node {
507 0 : tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
508 0 : tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
509 0 : tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
510 :
511 0 : anyhow::bail!("Inconsistent state on {node_id}");
512 3 : }
513 : }
514 :
515 1 : if expect_nodes.len() != self.nodes.len() {
516 : // We just checked that all the expected nodes are present. If the lengths don't match,
517 : // it means that we have nodes in Self that are unexpected.
518 0 : for node_id in self.nodes.keys() {
519 0 : if !expect_nodes.contains_key(node_id) {
520 0 : anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
521 0 : }
522 : }
523 1 : }
524 :
525 1 : Ok(())
526 1 : }
527 :
528 : /// Update the reference counts of a node. These reference counts are used to guide scheduling
529 : /// decisions, not for memory management: they represent the number of tenant shard whose IntentState
530 : /// targets this node and the number of tenants shars whose IntentState is attached to this
531 : /// node.
532 : ///
533 : /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
534 : /// [`Self::new`] or [`Self::node_upsert`])
535 51432 : pub(crate) fn update_node_ref_counts(
536 51432 : &mut self,
537 51432 : node_id: NodeId,
538 51432 : preferred_az: Option<&AvailabilityZone>,
539 51432 : update: RefCountUpdate,
540 51432 : ) {
541 51432 : let Some(node) = self.nodes.get_mut(&node_id) else {
542 0 : debug_assert!(false);
543 0 : tracing::error!("Scheduler missing node {node_id}");
544 0 : return;
545 : };
546 :
547 51432 : let is_home_az = Some(&node.az) == preferred_az;
548 51432 :
549 51432 : match update {
550 7 : RefCountUpdate::PromoteSecondary => {
551 7 : node.attached_shard_count += 1;
552 7 : }
553 : RefCountUpdate::Attach => {
554 12859 : node.shard_count += 1;
555 12859 : node.attached_shard_count += 1;
556 12859 : if is_home_az {
557 12819 : node.home_shard_count += 1;
558 12819 : }
559 : }
560 : RefCountUpdate::Detach => {
561 12857 : node.shard_count -= 1;
562 12857 : node.attached_shard_count -= 1;
563 12857 : if is_home_az {
564 12819 : node.home_shard_count -= 1;
565 12819 : }
566 : }
567 8 : RefCountUpdate::DemoteAttached => {
568 8 : node.attached_shard_count -= 1;
569 8 : }
570 : RefCountUpdate::AddSecondary => {
571 12848 : node.shard_count += 1;
572 12848 : if is_home_az {
573 6 : node.home_shard_count += 1;
574 12842 : }
575 : }
576 : RefCountUpdate::RemoveSecondary => {
577 12849 : node.shard_count -= 1;
578 12849 : if is_home_az {
579 5 : node.home_shard_count -= 1;
580 12844 : }
581 : }
582 4 : RefCountUpdate::ChangePreferredAzFrom(old_az) => {
583 4 : if Some(&node.az) == old_az {
584 2 : node.home_shard_count -= 1;
585 2 : }
586 4 : if is_home_az {
587 1 : node.home_shard_count += 1;
588 3 : }
589 : }
590 : }
591 :
592 : // Maybe update PageserverUtilization
593 51432 : match update {
594 : RefCountUpdate::AddSecondary | RefCountUpdate::Attach => {
595 : // Referencing the node: if this takes our shard_count above the utilzation structure's
596 : // shard count, then artifically bump it: this ensures that the scheduler immediately
597 : // recognizes that this node has more work on it, without waiting for the next heartbeat
598 : // to update the utilization.
599 25707 : if let MaySchedule::Yes(utilization) = &mut node.may_schedule {
600 25707 : utilization.adjust_shard_count_max(node.shard_count as u32);
601 25707 : }
602 : }
603 : RefCountUpdate::PromoteSecondary
604 : | RefCountUpdate::Detach
605 : | RefCountUpdate::RemoveSecondary
606 : | RefCountUpdate::DemoteAttached
607 25725 : | RefCountUpdate::ChangePreferredAzFrom(_) => {
608 25725 : // De-referencing the node: leave the utilization's shard_count at a stale higher
609 25725 : // value until some future heartbeat after we have physically removed this shard
610 25725 : // from the node: this prevents the scheduler over-optimistically trying to schedule
611 25725 : // more work onto the node before earlier detaches are done.
612 25725 : }
613 : }
614 51432 : }
615 :
616 : // Check if the number of shards attached to a given node is lagging below
617 : // the cluster average. If that's the case, the node should be filled.
618 0 : pub(crate) fn compute_fill_requirement(&self, node_id: NodeId) -> usize {
619 0 : let Some(node) = self.nodes.get(&node_id) else {
620 0 : debug_assert!(false);
621 0 : tracing::error!("Scheduler missing node {node_id}");
622 0 : return 0;
623 : };
624 0 : assert!(!self.nodes.is_empty());
625 0 : let expected_attached_shards_per_node = self.expected_attached_shard_count();
626 :
627 0 : for (node_id, node) in self.nodes.iter() {
628 0 : tracing::trace!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
629 : }
630 :
631 0 : if node.attached_shard_count < expected_attached_shards_per_node {
632 0 : expected_attached_shards_per_node - node.attached_shard_count
633 : } else {
634 0 : 0
635 : }
636 0 : }
637 :
638 0 : pub(crate) fn expected_attached_shard_count(&self) -> usize {
639 0 : let total_attached_shards: usize =
640 0 : self.nodes.values().map(|n| n.attached_shard_count).sum();
641 0 :
642 0 : assert!(!self.nodes.is_empty());
643 0 : total_attached_shards / self.nodes.len()
644 0 : }
645 :
646 0 : pub(crate) fn nodes_by_attached_shard_count(&self) -> Vec<(NodeId, usize)> {
647 0 : self.nodes
648 0 : .iter()
649 0 : .map(|(node_id, stats)| (*node_id, stats.attached_shard_count))
650 0 : .sorted_by(|lhs, rhs| Ord::cmp(&lhs.1, &rhs.1).reverse())
651 0 : .collect()
652 0 : }
653 :
654 215 : pub(crate) fn node_upsert(&mut self, node: &Node) {
655 : use std::collections::hash_map::Entry::*;
656 215 : match self.nodes.entry(node.get_id()) {
657 4 : Occupied(mut entry) => {
658 4 : // Updates to MaySchedule are how we receive updated PageserverUtilization: adjust these values
659 4 : // to account for any shards scheduled on the controller but not yet visible to the pageserver.
660 4 : let mut may_schedule = node.may_schedule();
661 4 : match &mut may_schedule {
662 2 : MaySchedule::Yes(utilization) => {
663 2 : utilization.adjust_shard_count_max(entry.get().shard_count as u32);
664 2 : }
665 2 : MaySchedule::No => { // Nothing to tweak
666 2 : }
667 : }
668 :
669 4 : entry.get_mut().may_schedule = may_schedule;
670 : }
671 211 : Vacant(entry) => {
672 211 : entry.insert(SchedulerNode {
673 211 : shard_count: 0,
674 211 : attached_shard_count: 0,
675 211 : home_shard_count: 0,
676 211 : may_schedule: node.may_schedule(),
677 211 : az: node.get_availability_zone_id().clone(),
678 211 : });
679 211 : }
680 : }
681 215 : }
682 :
683 0 : pub(crate) fn node_remove(&mut self, node_id: NodeId) {
684 0 : if self.nodes.remove(&node_id).is_none() {
685 0 : tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
686 0 : }
687 0 : }
688 :
689 : /// Calculate a single node's score, used in optimizer logic to compare specific
690 : /// nodes' scores.
691 146 : pub(crate) fn compute_node_score<Score>(
692 146 : &mut self,
693 146 : node_id: NodeId,
694 146 : preferred_az: &Option<AvailabilityZone>,
695 146 : context: &ScheduleContext,
696 146 : ) -> Option<Score>
697 146 : where
698 146 : Score: NodeSchedulingScore,
699 146 : {
700 146 : self.nodes
701 146 : .get_mut(&node_id)
702 146 : .and_then(|node| Score::generate(&node_id, node, preferred_az, context))
703 146 : }
704 :
705 : /// Compute a schedulling score for each node that the scheduler knows of
706 : /// minus a set of hard excluded nodes.
707 25727 : fn compute_node_scores<Score>(
708 25727 : &mut self,
709 25727 : hard_exclude: &[NodeId],
710 25727 : preferred_az: &Option<AvailabilityZone>,
711 25727 : context: &ScheduleContext,
712 25727 : ) -> Vec<Score>
713 25727 : where
714 25727 : Score: NodeSchedulingScore,
715 25727 : {
716 25727 : self.nodes
717 25727 : .iter_mut()
718 104191 : .filter_map(|(k, v)| {
719 104191 : if hard_exclude.contains(k) {
720 12847 : None
721 : } else {
722 91344 : Score::generate(k, v, preferred_az, context)
723 : }
724 104191 : })
725 25727 : .collect()
726 25727 : }
727 :
728 : /// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
729 : /// are already in use by this shard -- we use this to avoid picking the same node
730 : /// as both attached and secondary location. This is a hard constraint: if we cannot
731 : /// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
732 : ///
733 : /// context: we prefer to avoid using nodes identified in the context, according
734 : /// to their anti-affinity score. We use this to prefeer to avoid placing shards in
735 : /// the same tenant on the same node. This is a soft constraint: the context will never
736 : /// cause us to fail to schedule a shard.
737 25727 : pub(crate) fn schedule_shard<Tag: ShardTag>(
738 25727 : &mut self,
739 25727 : hard_exclude: &[NodeId],
740 25727 : preferred_az: &Option<AvailabilityZone>,
741 25727 : context: &ScheduleContext,
742 25727 : ) -> Result<NodeId, ScheduleError> {
743 25727 : if self.nodes.is_empty() {
744 0 : return Err(ScheduleError::NoPageservers);
745 25727 : }
746 25727 :
747 25727 : let mut scores =
748 25727 : self.compute_node_scores::<Tag::Score>(hard_exclude, preferred_az, context);
749 25727 :
750 25727 : // Exclude nodes whose utilization is critically high, if there are alternatives available. This will
751 25727 : // cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example
752 25727 : // we may place shards in the same tenant together on the same pageserver if all other pageservers are
753 25727 : // overloaded.
754 25727 : let non_overloaded_scores = scores
755 25727 : .iter()
756 91343 : .filter(|i| !i.is_overloaded())
757 25727 : .copied()
758 25727 : .collect::<Vec<_>>();
759 25727 : if !non_overloaded_scores.is_empty() {
760 25727 : scores = non_overloaded_scores;
761 25727 : }
762 :
763 : // Sort the nodes by score. The one with the lowest scores will be the preferred node.
764 : // Refer to [`NodeAttachmentSchedulingScore`] for attached locations and
765 : // [`NodeSecondarySchedulingScore`] for secondary locations to understand how the nodes
766 : // are ranked.
767 25727 : scores.sort();
768 25727 :
769 25727 : if scores.is_empty() {
770 : // After applying constraints, no pageservers were left.
771 0 : if !matches!(context.mode, ScheduleMode::Speculative) {
772 : // If this was not a speculative attempt, log details to understand why we couldn't
773 : // schedule: this may help an engineer understand if some nodes are marked offline
774 : // in a way that's preventing progress.
775 0 : tracing::info!(
776 0 : "Scheduling failure, while excluding {hard_exclude:?}, node states:"
777 : );
778 0 : for (node_id, node) in &self.nodes {
779 0 : tracing::info!(
780 0 : "Node {node_id}: may_schedule={} shards={}",
781 0 : !matches!(node.may_schedule, MaySchedule::No),
782 : node.shard_count
783 : );
784 : }
785 0 : }
786 0 : return Err(ScheduleError::ImpossibleConstraint);
787 25727 : }
788 25727 :
789 25727 : // Lowest score wins
790 25727 : let node_id = scores.first().unwrap().node_id();
791 :
792 25727 : if !matches!(context.mode, ScheduleMode::Speculative) {
793 25727 : tracing::info!(
794 0 : "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?}, preferred_az: {:?})",
795 0 : scores.iter().map(|i| i.node_id().0).collect::<Vec<_>>(),
796 : preferred_az,
797 : );
798 0 : }
799 :
800 : // Note that we do not update shard count here to reflect the scheduling: that
801 : // is IntentState's job when the scheduled location is used.
802 :
803 25727 : Ok(node_id)
804 25727 : }
805 :
806 : /// Selects any available node. This is suitable for performing background work (e.g. S3
807 : /// deletions).
808 0 : pub(crate) fn any_available_node(&mut self) -> Result<NodeId, ScheduleError> {
809 0 : self.schedule_shard::<AttachedShardTag>(&[], &None, &ScheduleContext::default())
810 0 : }
811 :
812 : /// For choosing which AZ to schedule a new shard into, use this. It will return the
813 : /// AZ with the the lowest number of shards currently scheduled in this AZ as their home
814 : /// location.
815 : ///
816 : /// We use an AZ-wide measure rather than simply selecting the AZ of the least-loaded
817 : /// node, because while tenants start out single sharded, when they grow and undergo
818 : /// shard-split, they will occupy space on many nodes within an AZ. It is important
819 : /// that we pick the AZ in a way that balances this _future_ load.
820 : ///
821 : /// Once we've picked an AZ, subsequent scheduling within that AZ will be driven by
822 : /// nodes' utilization scores.
823 303 : pub(crate) fn get_az_for_new_tenant(&self) -> Option<AvailabilityZone> {
824 303 : if self.nodes.is_empty() {
825 0 : return None;
826 303 : }
827 :
828 : #[derive(Default)]
829 : struct AzScore {
830 : home_shard_count: usize,
831 : scheduleable: bool,
832 : }
833 :
834 303 : let mut azs: HashMap<&AvailabilityZone, AzScore> = HashMap::new();
835 1818 : for node in self.nodes.values() {
836 1818 : let az = azs.entry(&node.az).or_default();
837 1818 : az.home_shard_count += node.home_shard_count;
838 1818 : az.scheduleable |= matches!(node.may_schedule, MaySchedule::Yes(_));
839 : }
840 :
841 : // If any AZs are schedulable, then filter out the non-schedulable ones (i.e. AZs where
842 : // all nodes are overloaded or otherwise unschedulable).
843 303 : if azs.values().any(|i| i.scheduleable) {
844 906 : azs.retain(|_, i| i.scheduleable);
845 303 : }
846 :
847 : // Find the AZ with the lowest number of shards currently allocated
848 303 : Some(
849 303 : azs.into_iter()
850 906 : .min_by_key(|i| (i.1.home_shard_count, i.0))
851 303 : .unwrap()
852 303 : .0
853 303 : .clone(),
854 303 : )
855 303 : }
856 :
857 9 : pub(crate) fn get_node_az(&self, node_id: &NodeId) -> Option<AvailabilityZone> {
858 9 : self.nodes.get(node_id).map(|n| n.az.clone())
859 9 : }
860 :
861 : /// For use when choosing a preferred secondary location: filter out nodes that are not
862 : /// available, and gather their AZs.
863 12832 : pub(crate) fn filter_usable_nodes(
864 12832 : &self,
865 12832 : nodes: &[NodeId],
866 12832 : ) -> Vec<(NodeId, Option<AvailabilityZone>)> {
867 12832 : nodes
868 12832 : .iter()
869 12832 : .filter_map(|node_id| {
870 3 : let node = self
871 3 : .nodes
872 3 : .get(node_id)
873 3 : .expect("Referenced nodes always exist");
874 3 : if matches!(node.may_schedule, MaySchedule::Yes(_)) {
875 2 : Some((*node_id, Some(node.az.clone())))
876 : } else {
877 1 : None
878 : }
879 12832 : })
880 12832 : .collect()
881 12832 : }
882 :
883 : /// Unit test access to internal state
884 : #[cfg(test)]
885 19 : pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
886 19 : self.nodes.get(&node_id).unwrap().shard_count
887 19 : }
888 :
889 : #[cfg(test)]
890 19 : pub(crate) fn get_node_attached_shard_count(&self, node_id: NodeId) -> usize {
891 19 : self.nodes.get(&node_id).unwrap().attached_shard_count
892 19 : }
893 :
894 : /// Some metrics that we only calculate periodically: this is simpler than
895 : /// rigorously updating them on every change.
896 0 : pub(crate) fn update_metrics(&self) {
897 0 : for (node_id, node) in &self.nodes {
898 0 : let node_id_str = format!("{}", node_id);
899 0 : let label_group = NodeLabelGroup {
900 0 : az: &node.az.0,
901 0 : node_id: &node_id_str,
902 0 : };
903 0 :
904 0 : crate::metrics::METRICS_REGISTRY
905 0 : .metrics_group
906 0 : .storage_controller_node_shards
907 0 : .set(label_group.clone(), node.shard_count as i64);
908 0 :
909 0 : crate::metrics::METRICS_REGISTRY
910 0 : .metrics_group
911 0 : .storage_controller_node_attached_shards
912 0 : .set(label_group.clone(), node.attached_shard_count as i64);
913 0 :
914 0 : crate::metrics::METRICS_REGISTRY
915 0 : .metrics_group
916 0 : .storage_controller_node_home_shards
917 0 : .set(label_group.clone(), node.home_shard_count as i64);
918 0 : }
919 0 : }
920 : }
921 :
922 : #[cfg(test)]
923 : pub(crate) mod test_utils {
924 :
925 : use std::collections::HashMap;
926 :
927 : use pageserver_api::controller_api::{AvailabilityZone, NodeAvailability};
928 : use pageserver_api::models::utilization::test_utilization;
929 : use utils::id::NodeId;
930 :
931 : use crate::node::Node;
932 :
933 : /// Test helper: synthesize the requested number of nodes, all in active state.
934 : ///
935 : /// Node IDs start at one.
936 : ///
937 : /// The `azs` argument specifies the list of availability zones which will be assigned
938 : /// to nodes in round-robin fashion. If empy, a default AZ is assigned.
939 70 : pub(crate) fn make_test_nodes(n: u64, azs: &[AvailabilityZone]) -> HashMap<NodeId, Node> {
940 70 : let mut az_iter = azs.iter().cycle();
941 70 :
942 70 : (1..n + 1)
943 278 : .map(|i| {
944 278 : (NodeId(i), {
945 278 : let mut node = Node::new(
946 278 : NodeId(i),
947 278 : format!("httphost-{i}"),
948 278 : 80 + i as u16,
949 278 : None,
950 278 : format!("pghost-{i}"),
951 278 : 5432 + i as u16,
952 278 : az_iter
953 278 : .next()
954 278 : .cloned()
955 278 : .unwrap_or(AvailabilityZone("test-az".to_string())),
956 278 : false,
957 278 : )
958 278 : .unwrap();
959 278 : node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
960 278 : assert!(node.is_available());
961 278 : node
962 278 : })
963 278 : })
964 70 : .collect()
965 70 : }
966 : }
967 :
968 : #[cfg(test)]
969 : mod tests {
970 : use pageserver_api::controller_api::NodeAvailability;
971 : use pageserver_api::models::utilization::test_utilization;
972 : use pageserver_api::shard::ShardIdentity;
973 : use utils::id::TenantId;
974 : use utils::shard::{ShardCount, ShardNumber, TenantShardId};
975 :
976 : use super::*;
977 : use crate::tenant_shard::IntentState;
978 : #[test]
979 1 : fn scheduler_basic() -> anyhow::Result<()> {
980 1 : let nodes = test_utils::make_test_nodes(2, &[]);
981 1 :
982 1 : let mut scheduler = Scheduler::new(nodes.values());
983 1 : let mut t1_intent = IntentState::new(None);
984 1 : let mut t2_intent = IntentState::new(None);
985 1 :
986 1 : let context = ScheduleContext::default();
987 :
988 1 : let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
989 1 : t1_intent.set_attached(&mut scheduler, Some(scheduled));
990 1 : let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
991 1 : t2_intent.set_attached(&mut scheduler, Some(scheduled));
992 1 :
993 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
994 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
995 :
996 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
997 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
998 :
999 1 : let scheduled = scheduler.schedule_shard::<AttachedShardTag>(
1000 1 : &t1_intent.all_pageservers(),
1001 1 : &None,
1002 1 : &context,
1003 1 : )?;
1004 1 : t1_intent.push_secondary(&mut scheduler, scheduled);
1005 1 :
1006 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
1007 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
1008 :
1009 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
1010 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
1011 :
1012 1 : t1_intent.clear(&mut scheduler);
1013 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
1014 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
1015 :
1016 1 : let total_attached = scheduler.get_node_attached_shard_count(NodeId(1))
1017 1 : + scheduler.get_node_attached_shard_count(NodeId(2));
1018 1 : assert_eq!(total_attached, 1);
1019 :
1020 1 : if cfg!(debug_assertions) {
1021 : // Dropping an IntentState without clearing it causes a panic in debug mode,
1022 : // because we have failed to properly update scheduler shard counts.
1023 1 : let result = std::panic::catch_unwind(move || {
1024 1 : drop(t2_intent);
1025 1 : });
1026 1 : assert!(result.is_err());
1027 : } else {
1028 0 : t2_intent.clear(&mut scheduler);
1029 0 :
1030 0 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
1031 0 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 0);
1032 :
1033 0 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 0);
1034 0 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0);
1035 : }
1036 :
1037 1 : Ok(())
1038 1 : }
1039 :
1040 : #[test]
1041 : /// Test the PageserverUtilization's contribution to scheduling algorithm
1042 1 : fn scheduler_utilization() {
1043 1 : let mut nodes = test_utils::make_test_nodes(3, &[]);
1044 1 : let mut scheduler = Scheduler::new(nodes.values());
1045 1 :
1046 1 : // Need to keep these alive because they contribute to shard counts via RAII
1047 1 : let mut scheduled_intents = Vec::new();
1048 1 :
1049 1 : let empty_context = ScheduleContext::default();
1050 :
1051 11 : fn assert_scheduler_chooses(
1052 11 : expect_node: NodeId,
1053 11 : scheduled_intents: &mut Vec<IntentState>,
1054 11 : scheduler: &mut Scheduler,
1055 11 : context: &ScheduleContext,
1056 11 : ) {
1057 11 : let scheduled = scheduler
1058 11 : .schedule_shard::<AttachedShardTag>(&[], &None, context)
1059 11 : .unwrap();
1060 11 : let mut intent = IntentState::new(None);
1061 11 : intent.set_attached(scheduler, Some(scheduled));
1062 11 : scheduled_intents.push(intent);
1063 11 : assert_eq!(scheduled, expect_node);
1064 11 : }
1065 :
1066 : // Independent schedule calls onto empty nodes should round-robin, because each node's
1067 : // utilization's shard count is updated inline. The order is determinsitic because when all other factors are
1068 : // equal, we order by node ID.
1069 1 : assert_scheduler_chooses(
1070 1 : NodeId(1),
1071 1 : &mut scheduled_intents,
1072 1 : &mut scheduler,
1073 1 : &empty_context,
1074 1 : );
1075 1 : assert_scheduler_chooses(
1076 1 : NodeId(2),
1077 1 : &mut scheduled_intents,
1078 1 : &mut scheduler,
1079 1 : &empty_context,
1080 1 : );
1081 1 : assert_scheduler_chooses(
1082 1 : NodeId(3),
1083 1 : &mut scheduled_intents,
1084 1 : &mut scheduler,
1085 1 : &empty_context,
1086 1 : );
1087 1 :
1088 1 : // Manually setting utilization higher should cause schedule calls to round-robin the other nodes
1089 1 : // which have equal utilization.
1090 1 : nodes
1091 1 : .get_mut(&NodeId(1))
1092 1 : .unwrap()
1093 1 : .set_availability(NodeAvailability::Active(test_utilization::simple(
1094 1 : 10,
1095 1 : 1024 * 1024 * 1024,
1096 1 : )));
1097 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1098 1 :
1099 1 : assert_scheduler_chooses(
1100 1 : NodeId(2),
1101 1 : &mut scheduled_intents,
1102 1 : &mut scheduler,
1103 1 : &empty_context,
1104 1 : );
1105 1 : assert_scheduler_chooses(
1106 1 : NodeId(3),
1107 1 : &mut scheduled_intents,
1108 1 : &mut scheduler,
1109 1 : &empty_context,
1110 1 : );
1111 1 : assert_scheduler_chooses(
1112 1 : NodeId(2),
1113 1 : &mut scheduled_intents,
1114 1 : &mut scheduler,
1115 1 : &empty_context,
1116 1 : );
1117 1 : assert_scheduler_chooses(
1118 1 : NodeId(3),
1119 1 : &mut scheduled_intents,
1120 1 : &mut scheduler,
1121 1 : &empty_context,
1122 1 : );
1123 1 :
1124 1 : // The scheduler should prefer nodes with lower affinity score,
1125 1 : // even if they have higher utilization (as long as they aren't utilized at >100%)
1126 1 : let mut context_prefer_node1 = ScheduleContext::default();
1127 1 : context_prefer_node1.avoid(&[NodeId(2), NodeId(3)]);
1128 1 : assert_scheduler_chooses(
1129 1 : NodeId(1),
1130 1 : &mut scheduled_intents,
1131 1 : &mut scheduler,
1132 1 : &context_prefer_node1,
1133 1 : );
1134 1 : assert_scheduler_chooses(
1135 1 : NodeId(1),
1136 1 : &mut scheduled_intents,
1137 1 : &mut scheduler,
1138 1 : &context_prefer_node1,
1139 1 : );
1140 1 :
1141 1 : // If a node is over-utilized, it will not be used even if affinity scores prefer it
1142 1 : nodes
1143 1 : .get_mut(&NodeId(1))
1144 1 : .unwrap()
1145 1 : .set_availability(NodeAvailability::Active(test_utilization::simple(
1146 1 : 20000,
1147 1 : 1024 * 1024 * 1024,
1148 1 : )));
1149 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1150 1 : assert_scheduler_chooses(
1151 1 : NodeId(2),
1152 1 : &mut scheduled_intents,
1153 1 : &mut scheduler,
1154 1 : &context_prefer_node1,
1155 1 : );
1156 1 : assert_scheduler_chooses(
1157 1 : NodeId(3),
1158 1 : &mut scheduled_intents,
1159 1 : &mut scheduler,
1160 1 : &context_prefer_node1,
1161 1 : );
1162 :
1163 12 : for mut intent in scheduled_intents {
1164 11 : intent.clear(&mut scheduler);
1165 11 : }
1166 1 : }
1167 :
1168 : #[test]
1169 : /// A simple test that showcases AZ-aware scheduling and its interaction with
1170 : /// affinity scores.
1171 1 : fn az_scheduling() {
1172 1 : let az_a_tag = AvailabilityZone("az-a".to_string());
1173 1 : let az_b_tag = AvailabilityZone("az-b".to_string());
1174 1 :
1175 1 : let nodes = test_utils::make_test_nodes(3, &[az_a_tag.clone(), az_b_tag.clone()]);
1176 1 : let mut scheduler = Scheduler::new(nodes.values());
1177 1 :
1178 1 : // Need to keep these alive because they contribute to shard counts via RAII
1179 1 : let mut scheduled_intents = Vec::new();
1180 1 :
1181 1 : let mut context = ScheduleContext::default();
1182 :
1183 4 : fn assert_scheduler_chooses<Tag: ShardTag>(
1184 4 : expect_node: NodeId,
1185 4 : preferred_az: Option<AvailabilityZone>,
1186 4 : scheduled_intents: &mut Vec<IntentState>,
1187 4 : scheduler: &mut Scheduler,
1188 4 : context: &mut ScheduleContext,
1189 4 : ) {
1190 4 : let scheduled = scheduler
1191 4 : .schedule_shard::<Tag>(&[], &preferred_az, context)
1192 4 : .unwrap();
1193 4 : let mut intent = IntentState::new(preferred_az.clone());
1194 4 : intent.set_attached(scheduler, Some(scheduled));
1195 4 : scheduled_intents.push(intent);
1196 4 : assert_eq!(scheduled, expect_node);
1197 :
1198 4 : context.avoid(&[scheduled]);
1199 4 : }
1200 :
1201 1 : assert_scheduler_chooses::<AttachedShardTag>(
1202 1 : NodeId(1),
1203 1 : Some(az_a_tag.clone()),
1204 1 : &mut scheduled_intents,
1205 1 : &mut scheduler,
1206 1 : &mut context,
1207 1 : );
1208 1 :
1209 1 : // Node 2 and 3 have affinity score equal to 0, but node 3
1210 1 : // is in "az-a" so we prefer that.
1211 1 : assert_scheduler_chooses::<AttachedShardTag>(
1212 1 : NodeId(3),
1213 1 : Some(az_a_tag.clone()),
1214 1 : &mut scheduled_intents,
1215 1 : &mut scheduler,
1216 1 : &mut context,
1217 1 : );
1218 1 :
1219 1 : // Node 1 and 3 (az-a) have same affinity score, so prefer the lowest node id.
1220 1 : assert_scheduler_chooses::<AttachedShardTag>(
1221 1 : NodeId(1),
1222 1 : Some(az_a_tag.clone()),
1223 1 : &mut scheduled_intents,
1224 1 : &mut scheduler,
1225 1 : &mut context,
1226 1 : );
1227 1 :
1228 1 : // Avoid nodes in "az-a" for the secondary location.
1229 1 : assert_scheduler_chooses::<SecondaryShardTag>(
1230 1 : NodeId(2),
1231 1 : Some(az_a_tag.clone()),
1232 1 : &mut scheduled_intents,
1233 1 : &mut scheduler,
1234 1 : &mut context,
1235 1 : );
1236 :
1237 5 : for mut intent in scheduled_intents {
1238 4 : intent.clear(&mut scheduler);
1239 4 : }
1240 1 : }
1241 :
1242 : #[test]
1243 1 : fn az_scheduling_for_new_tenant() {
1244 1 : let az_a_tag = AvailabilityZone("az-a".to_string());
1245 1 : let az_b_tag = AvailabilityZone("az-b".to_string());
1246 1 : let nodes = test_utils::make_test_nodes(
1247 1 : 6,
1248 1 : &[
1249 1 : az_a_tag.clone(),
1250 1 : az_a_tag.clone(),
1251 1 : az_a_tag.clone(),
1252 1 : az_b_tag.clone(),
1253 1 : az_b_tag.clone(),
1254 1 : az_b_tag.clone(),
1255 1 : ],
1256 1 : );
1257 1 :
1258 1 : let mut scheduler = Scheduler::new(nodes.values());
1259 :
1260 : /// Force the `home_shard_count` of a node directly: this is the metric used
1261 : /// by the scheduler when picking AZs.
1262 3 : fn set_shard_count(scheduler: &mut Scheduler, node_id: NodeId, shard_count: usize) {
1263 3 : let node = scheduler.nodes.get_mut(&node_id).unwrap();
1264 3 : node.home_shard_count = shard_count;
1265 3 : }
1266 :
1267 : // Initial empty state. Scores are tied, scheduler prefers lower AZ ID.
1268 1 : assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone()));
1269 :
1270 : // Home shard count is higher in AZ A, so AZ B will be preferred
1271 1 : set_shard_count(&mut scheduler, NodeId(1), 10);
1272 1 : assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_b_tag.clone()));
1273 :
1274 : // Total home shard count is higher in AZ B, so we revert to preferring AZ A
1275 1 : set_shard_count(&mut scheduler, NodeId(4), 6);
1276 1 : set_shard_count(&mut scheduler, NodeId(5), 6);
1277 1 : assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone()));
1278 1 : }
1279 :
1280 : /// Test that when selecting AZs for many new tenants, we get the expected balance across nodes
1281 : #[test]
1282 1 : fn az_selection_many() {
1283 1 : let az_a_tag = AvailabilityZone("az-a".to_string());
1284 1 : let az_b_tag = AvailabilityZone("az-b".to_string());
1285 1 : let az_c_tag = AvailabilityZone("az-c".to_string());
1286 1 : let nodes = test_utils::make_test_nodes(
1287 1 : 6,
1288 1 : &[
1289 1 : az_a_tag.clone(),
1290 1 : az_b_tag.clone(),
1291 1 : az_c_tag.clone(),
1292 1 : az_a_tag.clone(),
1293 1 : az_b_tag.clone(),
1294 1 : az_c_tag.clone(),
1295 1 : ],
1296 1 : );
1297 1 :
1298 1 : let mut scheduler = Scheduler::new(nodes.values());
1299 1 :
1300 1 : // We should get 1/6th of these on each node, give or take a few...
1301 1 : let total_tenants = 300;
1302 1 :
1303 1 : // ...where the 'few' is the number of AZs, because the scheduling will sometimes overshoot
1304 1 : // on one AZ before correcting itself. This is because we select the 'home' AZ based on
1305 1 : // an AZ-wide metric, but we select the location for secondaries on a purely node-based
1306 1 : // metric (while excluding the home AZ).
1307 1 : let grace = 3;
1308 1 :
1309 1 : let mut scheduled_shards = Vec::new();
1310 300 : for _i in 0..total_tenants {
1311 300 : let preferred_az = scheduler.get_az_for_new_tenant().unwrap();
1312 300 :
1313 300 : let mut node_home_counts = scheduler
1314 300 : .nodes
1315 300 : .iter()
1316 1800 : .map(|(node_id, node)| (node_id, node.home_shard_count))
1317 300 : .collect::<Vec<_>>();
1318 6000 : node_home_counts.sort_by_key(|i| i.0);
1319 300 : eprintln!("Selected {}, vs nodes {:?}", preferred_az, node_home_counts);
1320 300 :
1321 300 : let tenant_shard_id = TenantShardId {
1322 300 : tenant_id: TenantId::generate(),
1323 300 : shard_number: ShardNumber(0),
1324 300 : shard_count: ShardCount(1),
1325 300 : };
1326 300 :
1327 300 : let shard_identity = ShardIdentity::new(
1328 300 : tenant_shard_id.shard_number,
1329 300 : tenant_shard_id.shard_count,
1330 300 : pageserver_api::shard::ShardStripeSize(1),
1331 300 : )
1332 300 : .unwrap();
1333 300 : let mut shard = TenantShard::new(
1334 300 : tenant_shard_id,
1335 300 : shard_identity,
1336 300 : pageserver_api::controller_api::PlacementPolicy::Attached(1),
1337 300 : Some(preferred_az),
1338 300 : );
1339 300 :
1340 300 : let mut context = ScheduleContext::default();
1341 300 : shard.schedule(&mut scheduler, &mut context).unwrap();
1342 300 : eprintln!("Scheduled shard at {:?}", shard.intent);
1343 300 :
1344 300 : scheduled_shards.push(shard);
1345 300 : }
1346 :
1347 7 : for (node_id, node) in &scheduler.nodes {
1348 6 : eprintln!(
1349 6 : "Node {}: {} {} {}",
1350 6 : node_id, node.shard_count, node.attached_shard_count, node.home_shard_count
1351 6 : );
1352 6 : }
1353 :
1354 6 : for node in scheduler.nodes.values() {
1355 6 : assert!((node.home_shard_count as i64 - total_tenants as i64 / 6).abs() < grace);
1356 : }
1357 :
1358 301 : for mut shard in scheduled_shards {
1359 300 : shard.intent.clear(&mut scheduler);
1360 300 : }
1361 1 : }
1362 :
1363 : #[test]
1364 : /// Make sure that when we have an odd number of nodes and an even number of shards, we still
1365 : /// get scheduling stability.
1366 1 : fn odd_nodes_stability() {
1367 1 : let az_a = AvailabilityZone("az-a".to_string());
1368 1 : let az_b = AvailabilityZone("az-b".to_string());
1369 1 :
1370 1 : let nodes = test_utils::make_test_nodes(
1371 1 : 10,
1372 1 : &[
1373 1 : az_a.clone(),
1374 1 : az_a.clone(),
1375 1 : az_a.clone(),
1376 1 : az_a.clone(),
1377 1 : az_a.clone(),
1378 1 : az_b.clone(),
1379 1 : az_b.clone(),
1380 1 : az_b.clone(),
1381 1 : az_b.clone(),
1382 1 : az_b.clone(),
1383 1 : ],
1384 1 : );
1385 1 : let mut scheduler = Scheduler::new(nodes.values());
1386 1 :
1387 1 : // Need to keep these alive because they contribute to shard counts via RAII
1388 1 : let mut scheduled_shards = Vec::new();
1389 1 :
1390 1 : let mut context = ScheduleContext::default();
1391 :
1392 8 : fn schedule_shard(
1393 8 : tenant_shard_id: TenantShardId,
1394 8 : expect_attached: NodeId,
1395 8 : expect_secondary: NodeId,
1396 8 : scheduled_shards: &mut Vec<TenantShard>,
1397 8 : scheduler: &mut Scheduler,
1398 8 : preferred_az: Option<AvailabilityZone>,
1399 8 : context: &mut ScheduleContext,
1400 8 : ) {
1401 8 : let shard_identity = ShardIdentity::new(
1402 8 : tenant_shard_id.shard_number,
1403 8 : tenant_shard_id.shard_count,
1404 8 : pageserver_api::shard::ShardStripeSize(1),
1405 8 : )
1406 8 : .unwrap();
1407 8 : let mut shard = TenantShard::new(
1408 8 : tenant_shard_id,
1409 8 : shard_identity,
1410 8 : pageserver_api::controller_api::PlacementPolicy::Attached(1),
1411 8 : preferred_az,
1412 8 : );
1413 8 :
1414 8 : shard.schedule(scheduler, context).unwrap();
1415 8 :
1416 8 : assert_eq!(shard.intent.get_attached().unwrap(), expect_attached);
1417 8 : assert_eq!(
1418 8 : shard.intent.get_secondary().first().unwrap(),
1419 8 : &expect_secondary
1420 8 : );
1421 :
1422 8 : scheduled_shards.push(shard);
1423 8 : }
1424 :
1425 1 : let tenant_id = TenantId::generate();
1426 1 :
1427 1 : schedule_shard(
1428 1 : TenantShardId {
1429 1 : tenant_id,
1430 1 : shard_number: ShardNumber(0),
1431 1 : shard_count: ShardCount(8),
1432 1 : },
1433 1 : NodeId(1),
1434 1 : NodeId(6),
1435 1 : &mut scheduled_shards,
1436 1 : &mut scheduler,
1437 1 : Some(az_a.clone()),
1438 1 : &mut context,
1439 1 : );
1440 1 :
1441 1 : schedule_shard(
1442 1 : TenantShardId {
1443 1 : tenant_id,
1444 1 : shard_number: ShardNumber(1),
1445 1 : shard_count: ShardCount(8),
1446 1 : },
1447 1 : NodeId(2),
1448 1 : NodeId(7),
1449 1 : &mut scheduled_shards,
1450 1 : &mut scheduler,
1451 1 : Some(az_a.clone()),
1452 1 : &mut context,
1453 1 : );
1454 1 :
1455 1 : schedule_shard(
1456 1 : TenantShardId {
1457 1 : tenant_id,
1458 1 : shard_number: ShardNumber(2),
1459 1 : shard_count: ShardCount(8),
1460 1 : },
1461 1 : NodeId(3),
1462 1 : NodeId(8),
1463 1 : &mut scheduled_shards,
1464 1 : &mut scheduler,
1465 1 : Some(az_a.clone()),
1466 1 : &mut context,
1467 1 : );
1468 1 :
1469 1 : schedule_shard(
1470 1 : TenantShardId {
1471 1 : tenant_id,
1472 1 : shard_number: ShardNumber(3),
1473 1 : shard_count: ShardCount(8),
1474 1 : },
1475 1 : NodeId(4),
1476 1 : NodeId(9),
1477 1 : &mut scheduled_shards,
1478 1 : &mut scheduler,
1479 1 : Some(az_a.clone()),
1480 1 : &mut context,
1481 1 : );
1482 1 :
1483 1 : schedule_shard(
1484 1 : TenantShardId {
1485 1 : tenant_id,
1486 1 : shard_number: ShardNumber(4),
1487 1 : shard_count: ShardCount(8),
1488 1 : },
1489 1 : NodeId(5),
1490 1 : NodeId(10),
1491 1 : &mut scheduled_shards,
1492 1 : &mut scheduler,
1493 1 : Some(az_a.clone()),
1494 1 : &mut context,
1495 1 : );
1496 1 :
1497 1 : schedule_shard(
1498 1 : TenantShardId {
1499 1 : tenant_id,
1500 1 : shard_number: ShardNumber(5),
1501 1 : shard_count: ShardCount(8),
1502 1 : },
1503 1 : NodeId(1),
1504 1 : NodeId(6),
1505 1 : &mut scheduled_shards,
1506 1 : &mut scheduler,
1507 1 : Some(az_a.clone()),
1508 1 : &mut context,
1509 1 : );
1510 1 :
1511 1 : schedule_shard(
1512 1 : TenantShardId {
1513 1 : tenant_id,
1514 1 : shard_number: ShardNumber(6),
1515 1 : shard_count: ShardCount(8),
1516 1 : },
1517 1 : NodeId(2),
1518 1 : NodeId(7),
1519 1 : &mut scheduled_shards,
1520 1 : &mut scheduler,
1521 1 : Some(az_a.clone()),
1522 1 : &mut context,
1523 1 : );
1524 1 :
1525 1 : schedule_shard(
1526 1 : TenantShardId {
1527 1 : tenant_id,
1528 1 : shard_number: ShardNumber(7),
1529 1 : shard_count: ShardCount(8),
1530 1 : },
1531 1 : NodeId(3),
1532 1 : NodeId(8),
1533 1 : &mut scheduled_shards,
1534 1 : &mut scheduler,
1535 1 : Some(az_a.clone()),
1536 1 : &mut context,
1537 1 : );
1538 :
1539 : // Assert that the optimizer suggests nochanges, i.e. our initial scheduling was stable.
1540 9 : for shard in &scheduled_shards {
1541 8 : assert_eq!(shard.optimize_attachment(&mut scheduler, &context), None);
1542 : }
1543 :
1544 9 : for mut shard in scheduled_shards {
1545 8 : shard.intent.clear(&mut scheduler);
1546 8 : }
1547 1 : }
1548 :
1549 : #[test]
1550 1 : fn change_preferred_az() {
1551 1 : let az_a = AvailabilityZone("az-a".to_string());
1552 1 : let az_b = AvailabilityZone("az-b".to_string());
1553 1 :
1554 1 : // 2 nodes: 1 az_a and 1 az_b.
1555 1 : let nodes = test_utils::make_test_nodes(2, &[az_a.clone(), az_b.clone()]);
1556 1 : let mut scheduler = Scheduler::new(nodes.values());
1557 1 :
1558 1 : let tenant_shard_id = TenantShardId {
1559 1 : tenant_id: TenantId::generate(),
1560 1 : shard_number: ShardNumber(0),
1561 1 : shard_count: ShardCount(1),
1562 1 : };
1563 1 : let shard_identity = ShardIdentity::new(
1564 1 : tenant_shard_id.shard_number,
1565 1 : tenant_shard_id.shard_count,
1566 1 : pageserver_api::shard::ShardStripeSize(1),
1567 1 : )
1568 1 : .unwrap();
1569 1 : // 1 attached and 1 secondary.
1570 1 : let mut shard = TenantShard::new(
1571 1 : tenant_shard_id,
1572 1 : shard_identity,
1573 1 : pageserver_api::controller_api::PlacementPolicy::Attached(1),
1574 1 : Some(az_a.clone()),
1575 1 : );
1576 1 :
1577 1 : let mut context = ScheduleContext::default();
1578 1 : shard.schedule(&mut scheduler, &mut context).unwrap();
1579 1 : eprintln!("Scheduled shard at {:?}", shard.intent);
1580 :
1581 2 : for node in scheduler.nodes.values() {
1582 : // Only 2 nodes, one tenant shard should be scheduled on each of them.
1583 2 : assert_eq!(node.shard_count, 1);
1584 2 : if node.az == az_a {
1585 1 : assert_eq!(node.home_shard_count, 1);
1586 : } else {
1587 1 : assert_eq!(node.home_shard_count, 0);
1588 : }
1589 : }
1590 :
1591 1 : shard.set_preferred_az(&mut scheduler, Some(az_b.clone()));
1592 : // Home AZ flipped.
1593 2 : for node in scheduler.nodes.values() {
1594 2 : assert_eq!(node.shard_count, 1);
1595 2 : if node.az == az_a {
1596 1 : assert_eq!(node.home_shard_count, 0);
1597 : } else {
1598 1 : assert_eq!(node.home_shard_count, 1);
1599 : }
1600 : }
1601 :
1602 1 : shard.set_preferred_az(&mut scheduler, None);
1603 : // No home AZ.
1604 2 : for node in scheduler.nodes.values() {
1605 2 : assert_eq!(node.shard_count, 1);
1606 2 : assert_eq!(node.home_shard_count, 0);
1607 : }
1608 :
1609 1 : shard.intent.clear(&mut scheduler);
1610 1 : }
1611 : }
|