Line data Source code
1 : use crate::{metrics::NodeLabelGroup, node::Node, tenant_shard::TenantShard};
2 : use http_utils::error::ApiError;
3 : use itertools::Itertools;
4 : use pageserver_api::{controller_api::AvailabilityZone, models::PageserverUtilization};
5 : use serde::Serialize;
6 : use std::{collections::HashMap, fmt::Debug};
7 : use utils::id::NodeId;
8 :
9 : /// Scenarios in which we cannot find a suitable location for a tenant shard
10 : #[derive(thiserror::Error, Debug)]
11 : pub enum ScheduleError {
12 : #[error("No pageservers found")]
13 : NoPageservers,
14 : #[error("No pageserver found matching constraint")]
15 : ImpossibleConstraint,
16 : }
17 :
18 : impl From<ScheduleError> for ApiError {
19 0 : fn from(value: ScheduleError) -> Self {
20 0 : ApiError::Conflict(format!("Scheduling error: {}", value))
21 0 : }
22 : }
23 :
24 : #[derive(Serialize)]
25 : pub enum MaySchedule {
26 : Yes(PageserverUtilization),
27 : No,
28 : }
29 :
30 : #[derive(Serialize)]
31 : pub(crate) struct SchedulerNode {
32 : /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
33 : shard_count: usize,
34 : /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
35 : attached_shard_count: usize,
36 : /// How many shards have a location on this node (via [`crate::tenant_shard::IntentState`]) _and_ this node
37 : /// is in their preferred AZ (i.e. this is their 'home' location)
38 : home_shard_count: usize,
39 : /// Availability zone id in which the node resides
40 : az: AvailabilityZone,
41 :
42 : /// Whether this node is currently elegible to have new shards scheduled (this is derived
43 : /// from a node's availability state and scheduling policy).
44 : may_schedule: MaySchedule,
45 : }
46 :
47 : pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized {
48 : fn generate(
49 : node_id: &NodeId,
50 : node: &mut SchedulerNode,
51 : preferred_az: &Option<AvailabilityZone>,
52 : context: &ScheduleContext,
53 : ) -> Option<Self>;
54 :
55 : /// Return a score that drops any components based on node utilization: this is useful
56 : /// for finding scores for scheduling optimisation, when we want to avoid rescheduling
57 : /// shards due to e.g. disk usage, to avoid flapping.
58 : fn for_optimization(&self) -> Self;
59 :
60 : fn is_overloaded(&self) -> bool;
61 : fn node_id(&self) -> NodeId;
62 : }
63 :
64 : pub(crate) trait ShardTag {
65 : type Score: NodeSchedulingScore;
66 : }
67 :
68 : pub(crate) struct AttachedShardTag {}
69 : impl ShardTag for AttachedShardTag {
70 : type Score = NodeAttachmentSchedulingScore;
71 : }
72 :
73 : pub(crate) struct SecondaryShardTag {}
74 : impl ShardTag for SecondaryShardTag {
75 : type Score = NodeSecondarySchedulingScore;
76 : }
77 :
78 : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
79 : enum AzMatch {
80 : Yes,
81 : No,
82 : Unknown,
83 : }
84 :
85 : impl AzMatch {
86 91473 : fn new(node_az: &AvailabilityZone, shard_preferred_az: Option<&AvailabilityZone>) -> Self {
87 91301 : match shard_preferred_az {
88 91301 : Some(preferred_az) if preferred_az == node_az => Self::Yes,
89 52680 : Some(_preferred_az) => Self::No,
90 172 : None => Self::Unknown,
91 : }
92 91473 : }
93 : }
94 :
95 : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
96 : struct AttachmentAzMatch(AzMatch);
97 :
98 : impl Ord for AttachmentAzMatch {
99 65425 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
100 65425 : // Lower scores indicate a more suitable node.
101 65425 : // Note that we prefer a node for which we don't have
102 65425 : // info to a node which we are certain doesn't match the
103 65425 : // preferred AZ of the shard.
104 130850 : let az_match_score = |az_match: &AzMatch| match az_match {
105 64133 : AzMatch::Yes => 0,
106 168 : AzMatch::Unknown => 1,
107 66549 : AzMatch::No => 2,
108 130850 : };
109 :
110 65425 : az_match_score(&self.0).cmp(&az_match_score(&other.0))
111 65425 : }
112 : }
113 :
114 : impl PartialOrd for AttachmentAzMatch {
115 65425 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
116 65425 : Some(self.cmp(other))
117 65425 : }
118 : }
119 :
120 : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
121 : struct SecondaryAzMatch(AzMatch);
122 :
123 : impl Ord for SecondaryAzMatch {
124 36210 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
125 36210 : // Lower scores indicate a more suitable node.
126 36210 : // For secondary locations we wish to avoid the preferred AZ
127 36210 : // of the shard.
128 72420 : let az_match_score = |az_match: &AzMatch| match az_match {
129 50434 : AzMatch::No => 0,
130 24 : AzMatch::Unknown => 1,
131 21962 : AzMatch::Yes => 2,
132 72420 : };
133 :
134 36210 : az_match_score(&self.0).cmp(&az_match_score(&other.0))
135 36210 : }
136 : }
137 :
138 : impl PartialOrd for SecondaryAzMatch {
139 36207 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
140 36207 : Some(self.cmp(other))
141 36207 : }
142 : }
143 :
144 : /// Scheduling score of a given node for shard attachments.
145 : /// Lower scores indicate more suitable nodes.
146 : /// Ordering is given by member declaration order (top to bottom).
147 : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
148 : pub(crate) struct NodeAttachmentSchedulingScore {
149 : /// Flag indicating whether this node matches the preferred AZ
150 : /// of the shard. For equal affinity scores, nodes in the matching AZ
151 : /// are considered first.
152 : az_match: AttachmentAzMatch,
153 : /// The number of shards belonging to the tenant currently being
154 : /// scheduled that are attached to this node.
155 : affinity_score: AffinityScore,
156 : /// Utilisation score that combines shard count and disk utilisation
157 : utilization_score: u64,
158 : /// Total number of shards attached to this node. When nodes have identical utilisation, this
159 : /// acts as an anti-affinity between attached shards.
160 : total_attached_shard_count: usize,
161 : /// Convenience to make selection deterministic in tests and empty systems
162 : node_id: NodeId,
163 : }
164 :
165 : impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
166 52280 : fn generate(
167 52280 : node_id: &NodeId,
168 52280 : node: &mut SchedulerNode,
169 52280 : preferred_az: &Option<AvailabilityZone>,
170 52280 : context: &ScheduleContext,
171 52280 : ) -> Option<Self> {
172 52280 : let utilization = match &mut node.may_schedule {
173 52278 : MaySchedule::Yes(u) => u,
174 : MaySchedule::No => {
175 2 : return None;
176 : }
177 : };
178 :
179 52278 : Some(Self {
180 52278 : affinity_score: context
181 52278 : .nodes
182 52278 : .get(node_id)
183 52278 : .copied()
184 52278 : .unwrap_or(AffinityScore::FREE),
185 52278 : az_match: AttachmentAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
186 52278 : utilization_score: utilization.cached_score(),
187 52278 : total_attached_shard_count: node.attached_shard_count,
188 52278 : node_id: *node_id,
189 52278 : })
190 52280 : }
191 :
192 : /// For use in scheduling optimisation, where we only want to consider the aspects
193 : /// of the score that can only be resolved by moving things (such as inter-shard affinity
194 : /// and AZ affinity), and ignore aspects that reflect the total utilization of a node (which
195 : /// can fluctuate for other reasons)
196 110 : fn for_optimization(&self) -> Self {
197 110 : Self {
198 110 : utilization_score: 0,
199 110 : total_attached_shard_count: 0,
200 110 : node_id: NodeId(0),
201 110 : ..*self
202 110 : }
203 110 : }
204 :
205 52168 : fn is_overloaded(&self) -> bool {
206 52168 : PageserverUtilization::is_overloaded(self.utilization_score)
207 52168 : }
208 :
209 12879 : fn node_id(&self) -> NodeId {
210 12879 : self.node_id
211 12879 : }
212 : }
213 :
214 : /// Scheduling score of a given node for shard secondaries.
215 : /// Lower scores indicate more suitable nodes.
216 : /// Ordering is given by member declaration order (top to bottom).
217 : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
218 : pub(crate) struct NodeSecondarySchedulingScore {
219 : /// Flag indicating whether this node matches the preferred AZ
220 : /// of the shard. For secondary locations we wish to avoid nodes in.
221 : /// the preferred AZ of the shard, since that's where the attached location
222 : /// should be scheduled and having the secondary in the same AZ is bad for HA.
223 : az_match: SecondaryAzMatch,
224 : /// The number of shards belonging to the tenant currently being
225 : /// scheduled that are attached to this node.
226 : affinity_score: AffinityScore,
227 : /// Utilisation score that combines shard count and disk utilisation
228 : utilization_score: u64,
229 : /// Anti-affinity with other non-home locations: this gives the behavior that secondaries
230 : /// will spread out across the nodes in an AZ.
231 : total_non_home_shard_count: usize,
232 : /// Convenience to make selection deterministic in tests and empty systems
233 : node_id: NodeId,
234 : }
235 :
236 : impl NodeSchedulingScore for NodeSecondarySchedulingScore {
237 39196 : fn generate(
238 39196 : node_id: &NodeId,
239 39196 : node: &mut SchedulerNode,
240 39196 : preferred_az: &Option<AvailabilityZone>,
241 39196 : context: &ScheduleContext,
242 39196 : ) -> Option<Self> {
243 39196 : let utilization = match &mut node.may_schedule {
244 39195 : MaySchedule::Yes(u) => u,
245 : MaySchedule::No => {
246 1 : return None;
247 : }
248 : };
249 :
250 39195 : Some(Self {
251 39195 : az_match: SecondaryAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
252 39195 : affinity_score: context
253 39195 : .nodes
254 39195 : .get(node_id)
255 39195 : .copied()
256 39195 : .unwrap_or(AffinityScore::FREE),
257 39195 : utilization_score: utilization.cached_score(),
258 39195 : total_non_home_shard_count: (node.shard_count - node.home_shard_count),
259 39195 : node_id: *node_id,
260 39195 : })
261 39196 : }
262 :
263 14 : fn for_optimization(&self) -> Self {
264 14 : Self {
265 14 : utilization_score: 0,
266 14 : total_non_home_shard_count: 0,
267 14 : node_id: NodeId(0),
268 14 : ..*self
269 14 : }
270 14 : }
271 :
272 39172 : fn is_overloaded(&self) -> bool {
273 39172 : PageserverUtilization::is_overloaded(self.utilization_score)
274 39172 : }
275 :
276 12846 : fn node_id(&self) -> NodeId {
277 12846 : self.node_id
278 12846 : }
279 : }
280 :
281 : impl PartialEq for SchedulerNode {
282 3 : fn eq(&self, other: &Self) -> bool {
283 3 : let may_schedule_matches = matches!(
284 3 : (&self.may_schedule, &other.may_schedule),
285 : (MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
286 : );
287 :
288 3 : may_schedule_matches
289 3 : && self.shard_count == other.shard_count
290 3 : && self.attached_shard_count == other.attached_shard_count
291 3 : && self.az == other.az
292 3 : }
293 : }
294 :
295 : impl Eq for SchedulerNode {}
296 :
297 : /// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
298 : /// on which to run.
299 : ///
300 : /// The type has no persistent state of its own: this is all populated at startup. The Serialize
301 : /// impl is only for debug dumps.
302 : #[derive(Serialize)]
303 : pub(crate) struct Scheduler {
304 : nodes: HashMap<NodeId, SchedulerNode>,
305 : }
306 :
307 : /// Score for soft constraint scheduling: lower scores are preferred to higher scores.
308 : ///
309 : /// For example, we may set an affinity score based on the number of shards from the same
310 : /// tenant already on a node, to implicitly prefer to balance out shards.
311 : #[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
312 : pub(crate) struct AffinityScore(pub(crate) usize);
313 :
314 : impl AffinityScore {
315 : /// If we have no anti-affinity at all toward a node, this is its score. It means
316 : /// the scheduler has a free choice amongst nodes with this score, and may pick a node
317 : /// based on other information such as total utilization.
318 : pub(crate) const FREE: Self = Self(0);
319 :
320 25773 : pub(crate) fn inc(&mut self) {
321 25773 : self.0 += 1;
322 25773 : }
323 :
324 118 : pub(crate) fn dec(&mut self) {
325 118 : self.0 -= 1;
326 118 : }
327 : }
328 :
329 : impl std::ops::Add for AffinityScore {
330 : type Output = Self;
331 :
332 0 : fn add(self, rhs: Self) -> Self::Output {
333 0 : Self(self.0 + rhs.0)
334 0 : }
335 : }
336 :
337 : /// Hint for whether this is a sincere attempt to schedule, or a speculative
338 : /// check for where we _would_ schedule (done during optimization)
339 : #[derive(Debug, Clone)]
340 : pub(crate) enum ScheduleMode {
341 : Normal,
342 : Speculative,
343 : }
344 :
345 : impl Default for ScheduleMode {
346 5330 : fn default() -> Self {
347 5330 : Self::Normal
348 5330 : }
349 : }
350 :
351 : // For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
352 : // it for many shards in the same tenant.
353 : #[derive(Debug, Default, Clone)]
354 : pub(crate) struct ScheduleContext {
355 : /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
356 : pub(crate) nodes: HashMap<NodeId, AffinityScore>,
357 :
358 : pub(crate) mode: ScheduleMode,
359 : }
360 :
361 : impl ScheduleContext {
362 3 : pub(crate) fn new(mode: ScheduleMode) -> Self {
363 3 : Self {
364 3 : nodes: HashMap::new(),
365 3 : mode,
366 3 : }
367 3 : }
368 :
369 : /// Input is a list of nodes we would like to avoid using again within this context. The more
370 : /// times a node is passed into this call, the less inclined we are to use it.
371 12896 : pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
372 38669 : for node_id in nodes {
373 25773 : let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
374 25773 : entry.inc()
375 : }
376 12896 : }
377 :
378 : /// Remove `shard`'s contributions to this context. This is useful when considering scheduling
379 : /// this shard afresh, where we don't want it to e.g. experience anti-affinity to its current location.
380 60 : pub(crate) fn project_detach(&self, shard: &TenantShard) -> Self {
381 60 : let mut new_context = self.clone();
382 :
383 60 : if let Some(attached) = shard.intent.get_attached() {
384 57 : if let Some(score) = new_context.nodes.get_mut(attached) {
385 57 : score.dec();
386 57 : }
387 3 : }
388 :
389 63 : for secondary in shard.intent.get_secondary() {
390 63 : if let Some(score) = new_context.nodes.get_mut(secondary) {
391 61 : score.dec();
392 61 : }
393 : }
394 :
395 60 : new_context
396 60 : }
397 :
398 : /// For test, track the sum of AffinityScore values, which is effectively how many
399 : /// attached or secondary locations have been registered with this context.
400 : #[cfg(test)]
401 3 : pub(crate) fn location_count(&self) -> usize {
402 7 : self.nodes.values().map(|i| i.0).sum()
403 3 : }
404 : }
405 :
406 : pub(crate) enum RefCountUpdate {
407 : PromoteSecondary,
408 : Attach,
409 : Detach,
410 : DemoteAttached,
411 : AddSecondary,
412 : RemoveSecondary,
413 : }
414 :
415 : impl Scheduler {
416 68 : pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
417 68 : let mut scheduler_nodes = HashMap::new();
418 129 : for node in nodes {
419 61 : scheduler_nodes.insert(
420 61 : node.get_id(),
421 61 : SchedulerNode {
422 61 : shard_count: 0,
423 61 : attached_shard_count: 0,
424 61 : home_shard_count: 0,
425 61 : may_schedule: node.may_schedule(),
426 61 : az: node.get_availability_zone_id().clone(),
427 61 : },
428 61 : );
429 61 : }
430 :
431 68 : Self {
432 68 : nodes: scheduler_nodes,
433 68 : }
434 68 : }
435 :
436 : /// For debug/support: check that our internal statistics are in sync with the state of
437 : /// the nodes & tenant shards.
438 : ///
439 : /// If anything is inconsistent, log details and return an error.
440 1 : pub(crate) fn consistency_check<'a>(
441 1 : &self,
442 1 : nodes: impl Iterator<Item = &'a Node>,
443 1 : shards: impl Iterator<Item = &'a TenantShard>,
444 1 : ) -> anyhow::Result<()> {
445 1 : let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
446 4 : for node in nodes {
447 3 : expect_nodes.insert(
448 3 : node.get_id(),
449 3 : SchedulerNode {
450 3 : shard_count: 0,
451 3 : attached_shard_count: 0,
452 3 : home_shard_count: 0,
453 3 : may_schedule: node.may_schedule(),
454 3 : az: node.get_availability_zone_id().clone(),
455 3 : },
456 3 : );
457 3 : }
458 :
459 2 : for shard in shards {
460 1 : if let Some(node_id) = shard.intent.get_attached() {
461 1 : match expect_nodes.get_mut(node_id) {
462 1 : Some(node) => {
463 1 : node.shard_count += 1;
464 1 : node.attached_shard_count += 1;
465 1 : if Some(&node.az) == shard.preferred_az() {
466 0 : node.home_shard_count += 1;
467 1 : }
468 : }
469 0 : None => anyhow::bail!(
470 0 : "Tenant {} references nonexistent node {}",
471 0 : shard.tenant_shard_id,
472 0 : node_id
473 0 : ),
474 : }
475 0 : }
476 :
477 1 : for node_id in shard.intent.get_secondary() {
478 1 : match expect_nodes.get_mut(node_id) {
479 1 : Some(node) => {
480 1 : node.shard_count += 1;
481 1 : if Some(&node.az) == shard.preferred_az() {
482 0 : node.home_shard_count += 1;
483 1 : }
484 : }
485 0 : None => anyhow::bail!(
486 0 : "Tenant {} references nonexistent node {}",
487 0 : shard.tenant_shard_id,
488 0 : node_id
489 0 : ),
490 : }
491 : }
492 : }
493 :
494 4 : for (node_id, expect_node) in &expect_nodes {
495 3 : let Some(self_node) = self.nodes.get(node_id) else {
496 0 : anyhow::bail!("Node {node_id} not found in Self")
497 : };
498 :
499 3 : if self_node != expect_node {
500 0 : tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
501 0 : tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
502 0 : tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
503 :
504 0 : anyhow::bail!("Inconsistent state on {node_id}");
505 3 : }
506 : }
507 :
508 1 : if expect_nodes.len() != self.nodes.len() {
509 : // We just checked that all the expected nodes are present. If the lengths don't match,
510 : // it means that we have nodes in Self that are unexpected.
511 0 : for node_id in self.nodes.keys() {
512 0 : if !expect_nodes.contains_key(node_id) {
513 0 : anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
514 0 : }
515 : }
516 1 : }
517 :
518 1 : Ok(())
519 1 : }
520 :
521 : /// Update the reference counts of a node. These reference counts are used to guide scheduling
522 : /// decisions, not for memory management: they represent the number of tenant shard whose IntentState
523 : /// targets this node and the number of tenants shars whose IntentState is attached to this
524 : /// node.
525 : ///
526 : /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
527 : /// [`Self::new`] or [`Self::node_upsert`])
528 51416 : pub(crate) fn update_node_ref_counts(
529 51416 : &mut self,
530 51416 : node_id: NodeId,
531 51416 : preferred_az: Option<&AvailabilityZone>,
532 51416 : update: RefCountUpdate,
533 51416 : ) {
534 51416 : let Some(node) = self.nodes.get_mut(&node_id) else {
535 0 : debug_assert!(false);
536 0 : tracing::error!("Scheduler missing node {node_id}");
537 0 : return;
538 : };
539 :
540 51416 : let is_home_az = Some(&node.az) == preferred_az;
541 51416 :
542 51416 : match update {
543 6 : RefCountUpdate::PromoteSecondary => {
544 6 : node.attached_shard_count += 1;
545 6 : }
546 : RefCountUpdate::Attach => {
547 12857 : node.shard_count += 1;
548 12857 : node.attached_shard_count += 1;
549 12857 : if is_home_az {
550 12817 : node.home_shard_count += 1;
551 12817 : }
552 : }
553 : RefCountUpdate::Detach => {
554 12855 : node.shard_count -= 1;
555 12855 : node.attached_shard_count -= 1;
556 12855 : if is_home_az {
557 12818 : node.home_shard_count -= 1;
558 12818 : }
559 : }
560 7 : RefCountUpdate::DemoteAttached => {
561 7 : node.attached_shard_count -= 1;
562 7 : }
563 : RefCountUpdate::AddSecondary => {
564 12845 : node.shard_count += 1;
565 12845 : if is_home_az {
566 5 : node.home_shard_count += 1;
567 12840 : }
568 : }
569 : RefCountUpdate::RemoveSecondary => {
570 12846 : node.shard_count -= 1;
571 12846 : if is_home_az {
572 4 : node.home_shard_count -= 1;
573 12842 : }
574 : }
575 : }
576 :
577 : // Maybe update PageserverUtilization
578 51416 : match update {
579 : RefCountUpdate::AddSecondary | RefCountUpdate::Attach => {
580 : // Referencing the node: if this takes our shard_count above the utilzation structure's
581 : // shard count, then artifically bump it: this ensures that the scheduler immediately
582 : // recognizes that this node has more work on it, without waiting for the next heartbeat
583 : // to update the utilization.
584 25702 : if let MaySchedule::Yes(utilization) = &mut node.may_schedule {
585 25702 : utilization.adjust_shard_count_max(node.shard_count as u32);
586 25702 : }
587 : }
588 : RefCountUpdate::PromoteSecondary
589 : | RefCountUpdate::Detach
590 : | RefCountUpdate::RemoveSecondary
591 25714 : | RefCountUpdate::DemoteAttached => {
592 25714 : // De-referencing the node: leave the utilization's shard_count at a stale higher
593 25714 : // value until some future heartbeat after we have physically removed this shard
594 25714 : // from the node: this prevents the scheduler over-optimistically trying to schedule
595 25714 : // more work onto the node before earlier detaches are done.
596 25714 : }
597 : }
598 51416 : }
599 :
600 : // Check if the number of shards attached to a given node is lagging below
601 : // the cluster average. If that's the case, the node should be filled.
602 0 : pub(crate) fn compute_fill_requirement(&self, node_id: NodeId) -> usize {
603 0 : let Some(node) = self.nodes.get(&node_id) else {
604 0 : debug_assert!(false);
605 0 : tracing::error!("Scheduler missing node {node_id}");
606 0 : return 0;
607 : };
608 0 : assert!(!self.nodes.is_empty());
609 0 : let expected_attached_shards_per_node = self.expected_attached_shard_count();
610 :
611 0 : for (node_id, node) in self.nodes.iter() {
612 0 : tracing::trace!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
613 : }
614 :
615 0 : if node.attached_shard_count < expected_attached_shards_per_node {
616 0 : expected_attached_shards_per_node - node.attached_shard_count
617 : } else {
618 0 : 0
619 : }
620 0 : }
621 :
622 0 : pub(crate) fn expected_attached_shard_count(&self) -> usize {
623 0 : let total_attached_shards: usize =
624 0 : self.nodes.values().map(|n| n.attached_shard_count).sum();
625 0 :
626 0 : assert!(!self.nodes.is_empty());
627 0 : total_attached_shards / self.nodes.len()
628 0 : }
629 :
630 0 : pub(crate) fn nodes_by_attached_shard_count(&self) -> Vec<(NodeId, usize)> {
631 0 : self.nodes
632 0 : .iter()
633 0 : .map(|(node_id, stats)| (*node_id, stats.attached_shard_count))
634 0 : .sorted_by(|lhs, rhs| Ord::cmp(&lhs.1, &rhs.1).reverse())
635 0 : .collect()
636 0 : }
637 :
638 215 : pub(crate) fn node_upsert(&mut self, node: &Node) {
639 : use std::collections::hash_map::Entry::*;
640 215 : match self.nodes.entry(node.get_id()) {
641 4 : Occupied(mut entry) => {
642 4 : // Updates to MaySchedule are how we receive updated PageserverUtilization: adjust these values
643 4 : // to account for any shards scheduled on the controller but not yet visible to the pageserver.
644 4 : let mut may_schedule = node.may_schedule();
645 4 : match &mut may_schedule {
646 2 : MaySchedule::Yes(utilization) => {
647 2 : utilization.adjust_shard_count_max(entry.get().shard_count as u32);
648 2 : }
649 2 : MaySchedule::No => { // Nothing to tweak
650 2 : }
651 : }
652 :
653 4 : entry.get_mut().may_schedule = may_schedule;
654 : }
655 211 : Vacant(entry) => {
656 211 : entry.insert(SchedulerNode {
657 211 : shard_count: 0,
658 211 : attached_shard_count: 0,
659 211 : home_shard_count: 0,
660 211 : may_schedule: node.may_schedule(),
661 211 : az: node.get_availability_zone_id().clone(),
662 211 : });
663 211 : }
664 : }
665 215 : }
666 :
667 0 : pub(crate) fn node_remove(&mut self, node_id: NodeId) {
668 0 : if self.nodes.remove(&node_id).is_none() {
669 0 : tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
670 0 : }
671 0 : }
672 :
673 : /// Calculate a single node's score, used in optimizer logic to compare specific
674 : /// nodes' scores.
675 135 : pub(crate) fn compute_node_score<Score>(
676 135 : &mut self,
677 135 : node_id: NodeId,
678 135 : preferred_az: &Option<AvailabilityZone>,
679 135 : context: &ScheduleContext,
680 135 : ) -> Option<Score>
681 135 : where
682 135 : Score: NodeSchedulingScore,
683 135 : {
684 135 : self.nodes
685 135 : .get_mut(&node_id)
686 135 : .and_then(|node| Score::generate(&node_id, node, preferred_az, context))
687 135 : }
688 :
689 : /// Compute a schedulling score for each node that the scheduler knows of
690 : /// minus a set of hard excluded nodes.
691 25725 : fn compute_node_scores<Score>(
692 25725 : &mut self,
693 25725 : hard_exclude: &[NodeId],
694 25725 : preferred_az: &Option<AvailabilityZone>,
695 25725 : context: &ScheduleContext,
696 25725 : ) -> Vec<Score>
697 25725 : where
698 25725 : Score: NodeSchedulingScore,
699 25725 : {
700 25725 : self.nodes
701 25725 : .iter_mut()
702 104187 : .filter_map(|(k, v)| {
703 104187 : if hard_exclude.contains(k) {
704 12846 : None
705 : } else {
706 91341 : Score::generate(k, v, preferred_az, context)
707 : }
708 104187 : })
709 25725 : .collect()
710 25725 : }
711 :
712 : /// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
713 : /// are already in use by this shard -- we use this to avoid picking the same node
714 : /// as both attached and secondary location. This is a hard constraint: if we cannot
715 : /// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
716 : ///
717 : /// context: we prefer to avoid using nodes identified in the context, according
718 : /// to their anti-affinity score. We use this to prefeer to avoid placing shards in
719 : /// the same tenant on the same node. This is a soft constraint: the context will never
720 : /// cause us to fail to schedule a shard.
721 25725 : pub(crate) fn schedule_shard<Tag: ShardTag>(
722 25725 : &mut self,
723 25725 : hard_exclude: &[NodeId],
724 25725 : preferred_az: &Option<AvailabilityZone>,
725 25725 : context: &ScheduleContext,
726 25725 : ) -> Result<NodeId, ScheduleError> {
727 25725 : if self.nodes.is_empty() {
728 0 : return Err(ScheduleError::NoPageservers);
729 25725 : }
730 25725 :
731 25725 : let mut scores =
732 25725 : self.compute_node_scores::<Tag::Score>(hard_exclude, preferred_az, context);
733 25725 :
734 25725 : // Exclude nodes whose utilization is critically high, if there are alternatives available. This will
735 25725 : // cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example
736 25725 : // we may place shards in the same tenant together on the same pageserver if all other pageservers are
737 25725 : // overloaded.
738 25725 : let non_overloaded_scores = scores
739 25725 : .iter()
740 91340 : .filter(|i| !i.is_overloaded())
741 25725 : .copied()
742 25725 : .collect::<Vec<_>>();
743 25725 : if !non_overloaded_scores.is_empty() {
744 25725 : scores = non_overloaded_scores;
745 25725 : }
746 :
747 : // Sort the nodes by score. The one with the lowest scores will be the preferred node.
748 : // Refer to [`NodeAttachmentSchedulingScore`] for attached locations and
749 : // [`NodeSecondarySchedulingScore`] for secondary locations to understand how the nodes
750 : // are ranked.
751 25725 : scores.sort();
752 25725 :
753 25725 : if scores.is_empty() {
754 : // After applying constraints, no pageservers were left.
755 0 : if !matches!(context.mode, ScheduleMode::Speculative) {
756 : // If this was not a speculative attempt, log details to understand why we couldn't
757 : // schedule: this may help an engineer understand if some nodes are marked offline
758 : // in a way that's preventing progress.
759 0 : tracing::info!(
760 0 : "Scheduling failure, while excluding {hard_exclude:?}, node states:"
761 : );
762 0 : for (node_id, node) in &self.nodes {
763 0 : tracing::info!(
764 0 : "Node {node_id}: may_schedule={} shards={}",
765 0 : !matches!(node.may_schedule, MaySchedule::No),
766 : node.shard_count
767 : );
768 : }
769 0 : }
770 0 : return Err(ScheduleError::ImpossibleConstraint);
771 25725 : }
772 25725 :
773 25725 : // Lowest score wins
774 25725 : let node_id = scores.first().unwrap().node_id();
775 :
776 25725 : if !matches!(context.mode, ScheduleMode::Speculative) {
777 25725 : tracing::info!(
778 0 : "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?}, preferred_az: {:?})",
779 0 : scores.iter().map(|i| i.node_id().0).collect::<Vec<_>>(),
780 : preferred_az,
781 : );
782 0 : }
783 :
784 : // Note that we do not update shard count here to reflect the scheduling: that
785 : // is IntentState's job when the scheduled location is used.
786 :
787 25725 : Ok(node_id)
788 25725 : }
789 :
790 : /// Selects any available node. This is suitable for performing background work (e.g. S3
791 : /// deletions).
792 0 : pub(crate) fn any_available_node(&mut self) -> Result<NodeId, ScheduleError> {
793 0 : self.schedule_shard::<AttachedShardTag>(&[], &None, &ScheduleContext::default())
794 0 : }
795 :
796 : /// For choosing which AZ to schedule a new shard into, use this. It will return the
797 : /// AZ with the the lowest number of shards currently scheduled in this AZ as their home
798 : /// location.
799 : ///
800 : /// We use an AZ-wide measure rather than simply selecting the AZ of the least-loaded
801 : /// node, because while tenants start out single sharded, when they grow and undergo
802 : /// shard-split, they will occupy space on many nodes within an AZ. It is important
803 : /// that we pick the AZ in a way that balances this _future_ load.
804 : ///
805 : /// Once we've picked an AZ, subsequent scheduling within that AZ will be driven by
806 : /// nodes' utilization scores.
807 303 : pub(crate) fn get_az_for_new_tenant(&self) -> Option<AvailabilityZone> {
808 303 : if self.nodes.is_empty() {
809 0 : return None;
810 303 : }
811 :
812 : #[derive(Default)]
813 : struct AzScore {
814 : home_shard_count: usize,
815 : scheduleable: bool,
816 : }
817 :
818 303 : let mut azs: HashMap<&AvailabilityZone, AzScore> = HashMap::new();
819 1818 : for node in self.nodes.values() {
820 1818 : let az = azs.entry(&node.az).or_default();
821 1818 : az.home_shard_count += node.home_shard_count;
822 1818 : az.scheduleable |= matches!(node.may_schedule, MaySchedule::Yes(_));
823 : }
824 :
825 : // If any AZs are schedulable, then filter out the non-schedulable ones (i.e. AZs where
826 : // all nodes are overloaded or otherwise unschedulable).
827 303 : if azs.values().any(|i| i.scheduleable) {
828 906 : azs.retain(|_, i| i.scheduleable);
829 303 : }
830 :
831 : // Find the AZ with the lowest number of shards currently allocated
832 303 : Some(
833 303 : azs.into_iter()
834 906 : .min_by_key(|i| (i.1.home_shard_count, i.0))
835 303 : .unwrap()
836 303 : .0
837 303 : .clone(),
838 303 : )
839 303 : }
840 :
841 9 : pub(crate) fn get_node_az(&self, node_id: &NodeId) -> Option<AvailabilityZone> {
842 9 : self.nodes.get(node_id).map(|n| n.az.clone())
843 9 : }
844 :
845 : /// For use when choosing a preferred secondary location: filter out nodes that are not
846 : /// available, and gather their AZs.
847 12831 : pub(crate) fn filter_usable_nodes(
848 12831 : &self,
849 12831 : nodes: &[NodeId],
850 12831 : ) -> Vec<(NodeId, Option<AvailabilityZone>)> {
851 12831 : nodes
852 12831 : .iter()
853 12831 : .filter_map(|node_id| {
854 3 : let node = self
855 3 : .nodes
856 3 : .get(node_id)
857 3 : .expect("Referenced nodes always exist");
858 3 : if matches!(node.may_schedule, MaySchedule::Yes(_)) {
859 2 : Some((*node_id, Some(node.az.clone())))
860 : } else {
861 1 : None
862 : }
863 12831 : })
864 12831 : .collect()
865 12831 : }
866 :
867 : /// Unit test access to internal state
868 : #[cfg(test)]
869 19 : pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
870 19 : self.nodes.get(&node_id).unwrap().shard_count
871 19 : }
872 :
873 : #[cfg(test)]
874 19 : pub(crate) fn get_node_attached_shard_count(&self, node_id: NodeId) -> usize {
875 19 : self.nodes.get(&node_id).unwrap().attached_shard_count
876 19 : }
877 :
878 : /// Some metrics that we only calculate periodically: this is simpler than
879 : /// rigorously updating them on every change.
880 0 : pub(crate) fn update_metrics(&self) {
881 0 : for (node_id, node) in &self.nodes {
882 0 : let node_id_str = format!("{}", node_id);
883 0 : let label_group = NodeLabelGroup {
884 0 : az: &node.az.0,
885 0 : node_id: &node_id_str,
886 0 : };
887 0 :
888 0 : crate::metrics::METRICS_REGISTRY
889 0 : .metrics_group
890 0 : .storage_controller_node_shards
891 0 : .set(label_group.clone(), node.shard_count as i64);
892 0 :
893 0 : crate::metrics::METRICS_REGISTRY
894 0 : .metrics_group
895 0 : .storage_controller_node_attached_shards
896 0 : .set(label_group.clone(), node.attached_shard_count as i64);
897 0 :
898 0 : crate::metrics::METRICS_REGISTRY
899 0 : .metrics_group
900 0 : .storage_controller_node_home_shards
901 0 : .set(label_group.clone(), node.home_shard_count as i64);
902 0 : }
903 0 : }
904 : }
905 :
906 : #[cfg(test)]
907 : pub(crate) mod test_utils {
908 :
909 : use crate::node::Node;
910 : use pageserver_api::{
911 : controller_api::{AvailabilityZone, NodeAvailability},
912 : models::utilization::test_utilization,
913 : };
914 : use std::collections::HashMap;
915 : use utils::id::NodeId;
916 :
917 : /// Test helper: synthesize the requested number of nodes, all in active state.
918 : ///
919 : /// Node IDs start at one.
920 : ///
921 : /// The `azs` argument specifies the list of availability zones which will be assigned
922 : /// to nodes in round-robin fashion. If empy, a default AZ is assigned.
923 68 : pub(crate) fn make_test_nodes(n: u64, azs: &[AvailabilityZone]) -> HashMap<NodeId, Node> {
924 68 : let mut az_iter = azs.iter().cycle();
925 68 :
926 68 : (1..n + 1)
927 272 : .map(|i| {
928 272 : (NodeId(i), {
929 272 : let mut node = Node::new(
930 272 : NodeId(i),
931 272 : format!("httphost-{i}"),
932 272 : 80 + i as u16,
933 272 : format!("pghost-{i}"),
934 272 : 5432 + i as u16,
935 272 : az_iter
936 272 : .next()
937 272 : .cloned()
938 272 : .unwrap_or(AvailabilityZone("test-az".to_string())),
939 272 : );
940 272 : node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
941 272 : assert!(node.is_available());
942 272 : node
943 272 : })
944 272 : })
945 68 : .collect()
946 68 : }
947 : }
948 :
949 : #[cfg(test)]
950 : mod tests {
951 : use pageserver_api::{
952 : controller_api::NodeAvailability, models::utilization::test_utilization,
953 : shard::ShardIdentity,
954 : };
955 : use utils::{
956 : id::TenantId,
957 : shard::{ShardCount, ShardNumber, TenantShardId},
958 : };
959 :
960 : use super::*;
961 :
962 : use crate::tenant_shard::IntentState;
963 : #[test]
964 1 : fn scheduler_basic() -> anyhow::Result<()> {
965 1 : let nodes = test_utils::make_test_nodes(2, &[]);
966 1 :
967 1 : let mut scheduler = Scheduler::new(nodes.values());
968 1 : let mut t1_intent = IntentState::new(None);
969 1 : let mut t2_intent = IntentState::new(None);
970 1 :
971 1 : let context = ScheduleContext::default();
972 :
973 1 : let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
974 1 : t1_intent.set_attached(&mut scheduler, Some(scheduled));
975 1 : let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
976 1 : t2_intent.set_attached(&mut scheduler, Some(scheduled));
977 1 :
978 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
979 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
980 :
981 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
982 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
983 :
984 1 : let scheduled = scheduler.schedule_shard::<AttachedShardTag>(
985 1 : &t1_intent.all_pageservers(),
986 1 : &None,
987 1 : &context,
988 1 : )?;
989 1 : t1_intent.push_secondary(&mut scheduler, scheduled);
990 1 :
991 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
992 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
993 :
994 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
995 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
996 :
997 1 : t1_intent.clear(&mut scheduler);
998 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
999 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
1000 :
1001 1 : let total_attached = scheduler.get_node_attached_shard_count(NodeId(1))
1002 1 : + scheduler.get_node_attached_shard_count(NodeId(2));
1003 1 : assert_eq!(total_attached, 1);
1004 :
1005 1 : if cfg!(debug_assertions) {
1006 : // Dropping an IntentState without clearing it causes a panic in debug mode,
1007 : // because we have failed to properly update scheduler shard counts.
1008 1 : let result = std::panic::catch_unwind(move || {
1009 1 : drop(t2_intent);
1010 1 : });
1011 1 : assert!(result.is_err());
1012 : } else {
1013 0 : t2_intent.clear(&mut scheduler);
1014 0 :
1015 0 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
1016 0 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 0);
1017 :
1018 0 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 0);
1019 0 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0);
1020 : }
1021 :
1022 1 : Ok(())
1023 1 : }
1024 :
1025 : #[test]
1026 : /// Test the PageserverUtilization's contribution to scheduling algorithm
1027 1 : fn scheduler_utilization() {
1028 1 : let mut nodes = test_utils::make_test_nodes(3, &[]);
1029 1 : let mut scheduler = Scheduler::new(nodes.values());
1030 1 :
1031 1 : // Need to keep these alive because they contribute to shard counts via RAII
1032 1 : let mut scheduled_intents = Vec::new();
1033 1 :
1034 1 : let empty_context = ScheduleContext::default();
1035 :
1036 11 : fn assert_scheduler_chooses(
1037 11 : expect_node: NodeId,
1038 11 : scheduled_intents: &mut Vec<IntentState>,
1039 11 : scheduler: &mut Scheduler,
1040 11 : context: &ScheduleContext,
1041 11 : ) {
1042 11 : let scheduled = scheduler
1043 11 : .schedule_shard::<AttachedShardTag>(&[], &None, context)
1044 11 : .unwrap();
1045 11 : let mut intent = IntentState::new(None);
1046 11 : intent.set_attached(scheduler, Some(scheduled));
1047 11 : scheduled_intents.push(intent);
1048 11 : assert_eq!(scheduled, expect_node);
1049 11 : }
1050 :
1051 : // Independent schedule calls onto empty nodes should round-robin, because each node's
1052 : // utilization's shard count is updated inline. The order is determinsitic because when all other factors are
1053 : // equal, we order by node ID.
1054 1 : assert_scheduler_chooses(
1055 1 : NodeId(1),
1056 1 : &mut scheduled_intents,
1057 1 : &mut scheduler,
1058 1 : &empty_context,
1059 1 : );
1060 1 : assert_scheduler_chooses(
1061 1 : NodeId(2),
1062 1 : &mut scheduled_intents,
1063 1 : &mut scheduler,
1064 1 : &empty_context,
1065 1 : );
1066 1 : assert_scheduler_chooses(
1067 1 : NodeId(3),
1068 1 : &mut scheduled_intents,
1069 1 : &mut scheduler,
1070 1 : &empty_context,
1071 1 : );
1072 1 :
1073 1 : // Manually setting utilization higher should cause schedule calls to round-robin the other nodes
1074 1 : // which have equal utilization.
1075 1 : nodes
1076 1 : .get_mut(&NodeId(1))
1077 1 : .unwrap()
1078 1 : .set_availability(NodeAvailability::Active(test_utilization::simple(
1079 1 : 10,
1080 1 : 1024 * 1024 * 1024,
1081 1 : )));
1082 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1083 1 :
1084 1 : assert_scheduler_chooses(
1085 1 : NodeId(2),
1086 1 : &mut scheduled_intents,
1087 1 : &mut scheduler,
1088 1 : &empty_context,
1089 1 : );
1090 1 : assert_scheduler_chooses(
1091 1 : NodeId(3),
1092 1 : &mut scheduled_intents,
1093 1 : &mut scheduler,
1094 1 : &empty_context,
1095 1 : );
1096 1 : assert_scheduler_chooses(
1097 1 : NodeId(2),
1098 1 : &mut scheduled_intents,
1099 1 : &mut scheduler,
1100 1 : &empty_context,
1101 1 : );
1102 1 : assert_scheduler_chooses(
1103 1 : NodeId(3),
1104 1 : &mut scheduled_intents,
1105 1 : &mut scheduler,
1106 1 : &empty_context,
1107 1 : );
1108 1 :
1109 1 : // The scheduler should prefer nodes with lower affinity score,
1110 1 : // even if they have higher utilization (as long as they aren't utilized at >100%)
1111 1 : let mut context_prefer_node1 = ScheduleContext::default();
1112 1 : context_prefer_node1.avoid(&[NodeId(2), NodeId(3)]);
1113 1 : assert_scheduler_chooses(
1114 1 : NodeId(1),
1115 1 : &mut scheduled_intents,
1116 1 : &mut scheduler,
1117 1 : &context_prefer_node1,
1118 1 : );
1119 1 : assert_scheduler_chooses(
1120 1 : NodeId(1),
1121 1 : &mut scheduled_intents,
1122 1 : &mut scheduler,
1123 1 : &context_prefer_node1,
1124 1 : );
1125 1 :
1126 1 : // If a node is over-utilized, it will not be used even if affinity scores prefer it
1127 1 : nodes
1128 1 : .get_mut(&NodeId(1))
1129 1 : .unwrap()
1130 1 : .set_availability(NodeAvailability::Active(test_utilization::simple(
1131 1 : 20000,
1132 1 : 1024 * 1024 * 1024,
1133 1 : )));
1134 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1135 1 : assert_scheduler_chooses(
1136 1 : NodeId(2),
1137 1 : &mut scheduled_intents,
1138 1 : &mut scheduler,
1139 1 : &context_prefer_node1,
1140 1 : );
1141 1 : assert_scheduler_chooses(
1142 1 : NodeId(3),
1143 1 : &mut scheduled_intents,
1144 1 : &mut scheduler,
1145 1 : &context_prefer_node1,
1146 1 : );
1147 :
1148 12 : for mut intent in scheduled_intents {
1149 11 : intent.clear(&mut scheduler);
1150 11 : }
1151 1 : }
1152 :
1153 : #[test]
1154 : /// A simple test that showcases AZ-aware scheduling and its interaction with
1155 : /// affinity scores.
1156 1 : fn az_scheduling() {
1157 1 : let az_a_tag = AvailabilityZone("az-a".to_string());
1158 1 : let az_b_tag = AvailabilityZone("az-b".to_string());
1159 1 :
1160 1 : let nodes = test_utils::make_test_nodes(3, &[az_a_tag.clone(), az_b_tag.clone()]);
1161 1 : let mut scheduler = Scheduler::new(nodes.values());
1162 1 :
1163 1 : // Need to keep these alive because they contribute to shard counts via RAII
1164 1 : let mut scheduled_intents = Vec::new();
1165 1 :
1166 1 : let mut context = ScheduleContext::default();
1167 :
1168 4 : fn assert_scheduler_chooses<Tag: ShardTag>(
1169 4 : expect_node: NodeId,
1170 4 : preferred_az: Option<AvailabilityZone>,
1171 4 : scheduled_intents: &mut Vec<IntentState>,
1172 4 : scheduler: &mut Scheduler,
1173 4 : context: &mut ScheduleContext,
1174 4 : ) {
1175 4 : let scheduled = scheduler
1176 4 : .schedule_shard::<Tag>(&[], &preferred_az, context)
1177 4 : .unwrap();
1178 4 : let mut intent = IntentState::new(preferred_az.clone());
1179 4 : intent.set_attached(scheduler, Some(scheduled));
1180 4 : scheduled_intents.push(intent);
1181 4 : assert_eq!(scheduled, expect_node);
1182 :
1183 4 : context.avoid(&[scheduled]);
1184 4 : }
1185 :
1186 1 : assert_scheduler_chooses::<AttachedShardTag>(
1187 1 : NodeId(1),
1188 1 : Some(az_a_tag.clone()),
1189 1 : &mut scheduled_intents,
1190 1 : &mut scheduler,
1191 1 : &mut context,
1192 1 : );
1193 1 :
1194 1 : // Node 2 and 3 have affinity score equal to 0, but node 3
1195 1 : // is in "az-a" so we prefer that.
1196 1 : assert_scheduler_chooses::<AttachedShardTag>(
1197 1 : NodeId(3),
1198 1 : Some(az_a_tag.clone()),
1199 1 : &mut scheduled_intents,
1200 1 : &mut scheduler,
1201 1 : &mut context,
1202 1 : );
1203 1 :
1204 1 : // Node 1 and 3 (az-a) have same affinity score, so prefer the lowest node id.
1205 1 : assert_scheduler_chooses::<AttachedShardTag>(
1206 1 : NodeId(1),
1207 1 : Some(az_a_tag.clone()),
1208 1 : &mut scheduled_intents,
1209 1 : &mut scheduler,
1210 1 : &mut context,
1211 1 : );
1212 1 :
1213 1 : // Avoid nodes in "az-a" for the secondary location.
1214 1 : assert_scheduler_chooses::<SecondaryShardTag>(
1215 1 : NodeId(2),
1216 1 : Some(az_a_tag.clone()),
1217 1 : &mut scheduled_intents,
1218 1 : &mut scheduler,
1219 1 : &mut context,
1220 1 : );
1221 :
1222 5 : for mut intent in scheduled_intents {
1223 4 : intent.clear(&mut scheduler);
1224 4 : }
1225 1 : }
1226 :
1227 : #[test]
1228 1 : fn az_scheduling_for_new_tenant() {
1229 1 : let az_a_tag = AvailabilityZone("az-a".to_string());
1230 1 : let az_b_tag = AvailabilityZone("az-b".to_string());
1231 1 : let nodes = test_utils::make_test_nodes(
1232 1 : 6,
1233 1 : &[
1234 1 : az_a_tag.clone(),
1235 1 : az_a_tag.clone(),
1236 1 : az_a_tag.clone(),
1237 1 : az_b_tag.clone(),
1238 1 : az_b_tag.clone(),
1239 1 : az_b_tag.clone(),
1240 1 : ],
1241 1 : );
1242 1 :
1243 1 : let mut scheduler = Scheduler::new(nodes.values());
1244 :
1245 : /// Force the `home_shard_count` of a node directly: this is the metric used
1246 : /// by the scheduler when picking AZs.
1247 3 : fn set_shard_count(scheduler: &mut Scheduler, node_id: NodeId, shard_count: usize) {
1248 3 : let node = scheduler.nodes.get_mut(&node_id).unwrap();
1249 3 : node.home_shard_count = shard_count;
1250 3 : }
1251 :
1252 : // Initial empty state. Scores are tied, scheduler prefers lower AZ ID.
1253 1 : assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone()));
1254 :
1255 : // Home shard count is higher in AZ A, so AZ B will be preferred
1256 1 : set_shard_count(&mut scheduler, NodeId(1), 10);
1257 1 : assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_b_tag.clone()));
1258 :
1259 : // Total home shard count is higher in AZ B, so we revert to preferring AZ A
1260 1 : set_shard_count(&mut scheduler, NodeId(4), 6);
1261 1 : set_shard_count(&mut scheduler, NodeId(5), 6);
1262 1 : assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone()));
1263 1 : }
1264 :
1265 : /// Test that when selecting AZs for many new tenants, we get the expected balance across nodes
1266 : #[test]
1267 1 : fn az_selection_many() {
1268 1 : let az_a_tag = AvailabilityZone("az-a".to_string());
1269 1 : let az_b_tag = AvailabilityZone("az-b".to_string());
1270 1 : let az_c_tag = AvailabilityZone("az-c".to_string());
1271 1 : let nodes = test_utils::make_test_nodes(
1272 1 : 6,
1273 1 : &[
1274 1 : az_a_tag.clone(),
1275 1 : az_b_tag.clone(),
1276 1 : az_c_tag.clone(),
1277 1 : az_a_tag.clone(),
1278 1 : az_b_tag.clone(),
1279 1 : az_c_tag.clone(),
1280 1 : ],
1281 1 : );
1282 1 :
1283 1 : let mut scheduler = Scheduler::new(nodes.values());
1284 1 :
1285 1 : // We should get 1/6th of these on each node, give or take a few...
1286 1 : let total_tenants = 300;
1287 1 :
1288 1 : // ...where the 'few' is the number of AZs, because the scheduling will sometimes overshoot
1289 1 : // on one AZ before correcting itself. This is because we select the 'home' AZ based on
1290 1 : // an AZ-wide metric, but we select the location for secondaries on a purely node-based
1291 1 : // metric (while excluding the home AZ).
1292 1 : let grace = 3;
1293 1 :
1294 1 : let mut scheduled_shards = Vec::new();
1295 300 : for _i in 0..total_tenants {
1296 300 : let preferred_az = scheduler.get_az_for_new_tenant().unwrap();
1297 300 :
1298 300 : let mut node_home_counts = scheduler
1299 300 : .nodes
1300 300 : .iter()
1301 1800 : .map(|(node_id, node)| (node_id, node.home_shard_count))
1302 300 : .collect::<Vec<_>>();
1303 7800 : node_home_counts.sort_by_key(|i| i.0);
1304 300 : eprintln!("Selected {}, vs nodes {:?}", preferred_az, node_home_counts);
1305 300 :
1306 300 : let tenant_shard_id = TenantShardId {
1307 300 : tenant_id: TenantId::generate(),
1308 300 : shard_number: ShardNumber(0),
1309 300 : shard_count: ShardCount(1),
1310 300 : };
1311 300 :
1312 300 : let shard_identity = ShardIdentity::new(
1313 300 : tenant_shard_id.shard_number,
1314 300 : tenant_shard_id.shard_count,
1315 300 : pageserver_api::shard::ShardStripeSize(1),
1316 300 : )
1317 300 : .unwrap();
1318 300 : let mut shard = TenantShard::new(
1319 300 : tenant_shard_id,
1320 300 : shard_identity,
1321 300 : pageserver_api::controller_api::PlacementPolicy::Attached(1),
1322 300 : Some(preferred_az),
1323 300 : );
1324 300 :
1325 300 : let mut context = ScheduleContext::default();
1326 300 : shard.schedule(&mut scheduler, &mut context).unwrap();
1327 300 : eprintln!("Scheduled shard at {:?}", shard.intent);
1328 300 :
1329 300 : scheduled_shards.push(shard);
1330 300 : }
1331 :
1332 7 : for (node_id, node) in &scheduler.nodes {
1333 6 : eprintln!(
1334 6 : "Node {}: {} {} {}",
1335 6 : node_id, node.shard_count, node.attached_shard_count, node.home_shard_count
1336 6 : );
1337 6 : }
1338 :
1339 6 : for node in scheduler.nodes.values() {
1340 6 : assert!((node.home_shard_count as i64 - total_tenants as i64 / 6).abs() < grace);
1341 : }
1342 :
1343 301 : for mut shard in scheduled_shards {
1344 300 : shard.intent.clear(&mut scheduler);
1345 300 : }
1346 1 : }
1347 :
1348 : #[test]
1349 : /// Make sure that when we have an odd number of nodes and an even number of shards, we still
1350 : /// get scheduling stability.
1351 1 : fn odd_nodes_stability() {
1352 1 : let az_a = AvailabilityZone("az-a".to_string());
1353 1 : let az_b = AvailabilityZone("az-b".to_string());
1354 1 :
1355 1 : let nodes = test_utils::make_test_nodes(
1356 1 : 10,
1357 1 : &[
1358 1 : az_a.clone(),
1359 1 : az_a.clone(),
1360 1 : az_a.clone(),
1361 1 : az_a.clone(),
1362 1 : az_a.clone(),
1363 1 : az_b.clone(),
1364 1 : az_b.clone(),
1365 1 : az_b.clone(),
1366 1 : az_b.clone(),
1367 1 : az_b.clone(),
1368 1 : ],
1369 1 : );
1370 1 : let mut scheduler = Scheduler::new(nodes.values());
1371 1 :
1372 1 : // Need to keep these alive because they contribute to shard counts via RAII
1373 1 : let mut scheduled_shards = Vec::new();
1374 1 :
1375 1 : let mut context = ScheduleContext::default();
1376 :
1377 8 : fn schedule_shard(
1378 8 : tenant_shard_id: TenantShardId,
1379 8 : expect_attached: NodeId,
1380 8 : expect_secondary: NodeId,
1381 8 : scheduled_shards: &mut Vec<TenantShard>,
1382 8 : scheduler: &mut Scheduler,
1383 8 : preferred_az: Option<AvailabilityZone>,
1384 8 : context: &mut ScheduleContext,
1385 8 : ) {
1386 8 : let shard_identity = ShardIdentity::new(
1387 8 : tenant_shard_id.shard_number,
1388 8 : tenant_shard_id.shard_count,
1389 8 : pageserver_api::shard::ShardStripeSize(1),
1390 8 : )
1391 8 : .unwrap();
1392 8 : let mut shard = TenantShard::new(
1393 8 : tenant_shard_id,
1394 8 : shard_identity,
1395 8 : pageserver_api::controller_api::PlacementPolicy::Attached(1),
1396 8 : preferred_az,
1397 8 : );
1398 8 :
1399 8 : shard.schedule(scheduler, context).unwrap();
1400 8 :
1401 8 : assert_eq!(shard.intent.get_attached().unwrap(), expect_attached);
1402 8 : assert_eq!(
1403 8 : shard.intent.get_secondary().first().unwrap(),
1404 8 : &expect_secondary
1405 8 : );
1406 :
1407 8 : scheduled_shards.push(shard);
1408 8 : }
1409 :
1410 1 : let tenant_id = TenantId::generate();
1411 1 :
1412 1 : schedule_shard(
1413 1 : TenantShardId {
1414 1 : tenant_id,
1415 1 : shard_number: ShardNumber(0),
1416 1 : shard_count: ShardCount(8),
1417 1 : },
1418 1 : NodeId(1),
1419 1 : NodeId(6),
1420 1 : &mut scheduled_shards,
1421 1 : &mut scheduler,
1422 1 : Some(az_a.clone()),
1423 1 : &mut context,
1424 1 : );
1425 1 :
1426 1 : schedule_shard(
1427 1 : TenantShardId {
1428 1 : tenant_id,
1429 1 : shard_number: ShardNumber(1),
1430 1 : shard_count: ShardCount(8),
1431 1 : },
1432 1 : NodeId(2),
1433 1 : NodeId(7),
1434 1 : &mut scheduled_shards,
1435 1 : &mut scheduler,
1436 1 : Some(az_a.clone()),
1437 1 : &mut context,
1438 1 : );
1439 1 :
1440 1 : schedule_shard(
1441 1 : TenantShardId {
1442 1 : tenant_id,
1443 1 : shard_number: ShardNumber(2),
1444 1 : shard_count: ShardCount(8),
1445 1 : },
1446 1 : NodeId(3),
1447 1 : NodeId(8),
1448 1 : &mut scheduled_shards,
1449 1 : &mut scheduler,
1450 1 : Some(az_a.clone()),
1451 1 : &mut context,
1452 1 : );
1453 1 :
1454 1 : schedule_shard(
1455 1 : TenantShardId {
1456 1 : tenant_id,
1457 1 : shard_number: ShardNumber(3),
1458 1 : shard_count: ShardCount(8),
1459 1 : },
1460 1 : NodeId(4),
1461 1 : NodeId(9),
1462 1 : &mut scheduled_shards,
1463 1 : &mut scheduler,
1464 1 : Some(az_a.clone()),
1465 1 : &mut context,
1466 1 : );
1467 1 :
1468 1 : schedule_shard(
1469 1 : TenantShardId {
1470 1 : tenant_id,
1471 1 : shard_number: ShardNumber(4),
1472 1 : shard_count: ShardCount(8),
1473 1 : },
1474 1 : NodeId(5),
1475 1 : NodeId(10),
1476 1 : &mut scheduled_shards,
1477 1 : &mut scheduler,
1478 1 : Some(az_a.clone()),
1479 1 : &mut context,
1480 1 : );
1481 1 :
1482 1 : schedule_shard(
1483 1 : TenantShardId {
1484 1 : tenant_id,
1485 1 : shard_number: ShardNumber(5),
1486 1 : shard_count: ShardCount(8),
1487 1 : },
1488 1 : NodeId(1),
1489 1 : NodeId(6),
1490 1 : &mut scheduled_shards,
1491 1 : &mut scheduler,
1492 1 : Some(az_a.clone()),
1493 1 : &mut context,
1494 1 : );
1495 1 :
1496 1 : schedule_shard(
1497 1 : TenantShardId {
1498 1 : tenant_id,
1499 1 : shard_number: ShardNumber(6),
1500 1 : shard_count: ShardCount(8),
1501 1 : },
1502 1 : NodeId(2),
1503 1 : NodeId(7),
1504 1 : &mut scheduled_shards,
1505 1 : &mut scheduler,
1506 1 : Some(az_a.clone()),
1507 1 : &mut context,
1508 1 : );
1509 1 :
1510 1 : schedule_shard(
1511 1 : TenantShardId {
1512 1 : tenant_id,
1513 1 : shard_number: ShardNumber(7),
1514 1 : shard_count: ShardCount(8),
1515 1 : },
1516 1 : NodeId(3),
1517 1 : NodeId(8),
1518 1 : &mut scheduled_shards,
1519 1 : &mut scheduler,
1520 1 : Some(az_a.clone()),
1521 1 : &mut context,
1522 1 : );
1523 :
1524 : // Assert that the optimizer suggests nochanges, i.e. our initial scheduling was stable.
1525 9 : for shard in &scheduled_shards {
1526 8 : assert_eq!(shard.optimize_attachment(&mut scheduler, &context), None);
1527 : }
1528 :
1529 9 : for mut shard in scheduled_shards {
1530 8 : shard.intent.clear(&mut scheduler);
1531 8 : }
1532 1 : }
1533 : }
|