Line data Source code
1 : use crate::{metrics::NodeLabelGroup, node::Node, tenant_shard::TenantShard};
2 : use itertools::Itertools;
3 : use pageserver_api::{controller_api::AvailabilityZone, models::PageserverUtilization};
4 : use serde::Serialize;
5 : use std::{collections::HashMap, fmt::Debug};
6 : use utils::{http::error::ApiError, id::NodeId};
7 :
8 : /// Scenarios in which we cannot find a suitable location for a tenant shard
9 : #[derive(thiserror::Error, Debug)]
10 : pub enum ScheduleError {
11 : #[error("No pageservers found")]
12 : NoPageservers,
13 : #[error("No pageserver found matching constraint")]
14 : ImpossibleConstraint,
15 : }
16 :
17 : impl From<ScheduleError> for ApiError {
18 0 : fn from(value: ScheduleError) -> Self {
19 0 : ApiError::Conflict(format!("Scheduling error: {}", value))
20 0 : }
21 : }
22 :
23 : #[derive(Serialize)]
24 : pub enum MaySchedule {
25 : Yes(PageserverUtilization),
26 : No,
27 : }
28 :
29 : #[derive(Serialize)]
30 : pub(crate) struct SchedulerNode {
31 : /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
32 : shard_count: usize,
33 : /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
34 : attached_shard_count: usize,
35 : /// How many shards have a location on this node (via [`crate::tenant_shard::IntentState`]) _and_ this node
36 : /// is in their preferred AZ (i.e. this is their 'home' location)
37 : home_shard_count: usize,
38 : /// Availability zone id in which the node resides
39 : az: AvailabilityZone,
40 :
41 : /// Whether this node is currently elegible to have new shards scheduled (this is derived
42 : /// from a node's availability state and scheduling policy).
43 : may_schedule: MaySchedule,
44 : }
45 :
46 : pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized {
47 : fn generate(
48 : node_id: &NodeId,
49 : node: &mut SchedulerNode,
50 : preferred_az: &Option<AvailabilityZone>,
51 : context: &ScheduleContext,
52 : ) -> Option<Self>;
53 :
54 : /// Return a score that drops any components based on node utilization: this is useful
55 : /// for finding scores for scheduling optimisation, when we want to avoid rescheduling
56 : /// shards due to e.g. disk usage, to avoid flapping.
57 : fn for_optimization(&self) -> Self;
58 :
59 : fn is_overloaded(&self) -> bool;
60 : fn node_id(&self) -> NodeId;
61 : }
62 :
63 : pub(crate) trait ShardTag {
64 : type Score: NodeSchedulingScore;
65 : }
66 :
67 : pub(crate) struct AttachedShardTag {}
68 : impl ShardTag for AttachedShardTag {
69 : type Score = NodeAttachmentSchedulingScore;
70 : }
71 :
72 : pub(crate) struct SecondaryShardTag {}
73 : impl ShardTag for SecondaryShardTag {
74 : type Score = NodeSecondarySchedulingScore;
75 : }
76 :
77 : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
78 : enum AzMatch {
79 : Yes,
80 : No,
81 : Unknown,
82 : }
83 :
84 : impl AzMatch {
85 91418 : fn new(node_az: &AvailabilityZone, shard_preferred_az: Option<&AvailabilityZone>) -> Self {
86 91246 : match shard_preferred_az {
87 91246 : Some(preferred_az) if preferred_az == node_az => Self::Yes,
88 52647 : Some(_preferred_az) => Self::No,
89 172 : None => Self::Unknown,
90 : }
91 91418 : }
92 : }
93 :
94 : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
95 : struct AttachmentAzMatch(AzMatch);
96 :
97 : impl Ord for AttachmentAzMatch {
98 65876 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
99 65876 : // Lower scores indicate a more suitable node.
100 65876 : // Note that we prefer a node for which we don't have
101 65876 : // info to a node which we are certain doesn't match the
102 65876 : // preferred AZ of the shard.
103 131752 : let az_match_score = |az_match: &AzMatch| match az_match {
104 64600 : AzMatch::Yes => 0,
105 168 : AzMatch::Unknown => 1,
106 66984 : AzMatch::No => 2,
107 131752 : };
108 :
109 65876 : az_match_score(&self.0).cmp(&az_match_score(&other.0))
110 65876 : }
111 : }
112 :
113 : impl PartialOrd for AttachmentAzMatch {
114 65876 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
115 65876 : Some(self.cmp(other))
116 65876 : }
117 : }
118 :
119 : #[derive(PartialEq, Eq, Debug, Clone, Copy)]
120 : struct SecondaryAzMatch(AzMatch);
121 :
122 : impl Ord for SecondaryAzMatch {
123 35961 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
124 35961 : // Lower scores indicate a more suitable node.
125 35961 : // For secondary locations we wish to avoid the preferred AZ
126 35961 : // of the shard.
127 71922 : let az_match_score = |az_match: &AzMatch| match az_match {
128 50081 : AzMatch::No => 0,
129 24 : AzMatch::Unknown => 1,
130 21817 : AzMatch::Yes => 2,
131 71922 : };
132 :
133 35961 : az_match_score(&self.0).cmp(&az_match_score(&other.0))
134 35961 : }
135 : }
136 :
137 : impl PartialOrd for SecondaryAzMatch {
138 35959 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
139 35959 : Some(self.cmp(other))
140 35959 : }
141 : }
142 :
143 : /// Scheduling score of a given node for shard attachments.
144 : /// Lower scores indicate more suitable nodes.
145 : /// Ordering is given by member declaration order (top to bottom).
146 : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
147 : pub(crate) struct NodeAttachmentSchedulingScore {
148 : /// Flag indicating whether this node matches the preferred AZ
149 : /// of the shard. For equal affinity scores, nodes in the matching AZ
150 : /// are considered first.
151 : az_match: AttachmentAzMatch,
152 : /// The number of shards belonging to the tenant currently being
153 : /// scheduled that are attached to this node.
154 : affinity_score: AffinityScore,
155 : /// Utilisation score that combines shard count and disk utilisation
156 : utilization_score: u64,
157 : /// Total number of shards attached to this node. When nodes have identical utilisation, this
158 : /// acts as an anti-affinity between attached shards.
159 : total_attached_shard_count: usize,
160 : /// Convenience to make selection deterministic in tests and empty systems
161 : node_id: NodeId,
162 : }
163 :
164 : impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
165 52241 : fn generate(
166 52241 : node_id: &NodeId,
167 52241 : node: &mut SchedulerNode,
168 52241 : preferred_az: &Option<AvailabilityZone>,
169 52241 : context: &ScheduleContext,
170 52241 : ) -> Option<Self> {
171 52241 : let utilization = match &mut node.may_schedule {
172 52241 : MaySchedule::Yes(u) => u,
173 : MaySchedule::No => {
174 0 : return None;
175 : }
176 : };
177 :
178 52241 : Some(Self {
179 52241 : affinity_score: context
180 52241 : .nodes
181 52241 : .get(node_id)
182 52241 : .copied()
183 52241 : .unwrap_or(AffinityScore::FREE),
184 52241 : az_match: AttachmentAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
185 52241 : utilization_score: utilization.cached_score(),
186 52241 : total_attached_shard_count: node.attached_shard_count,
187 52241 : node_id: *node_id,
188 52241 : })
189 52241 : }
190 :
191 : /// For use in scheduling optimisation, where we only want to consider the aspects
192 : /// of the score that can only be resolved by moving things (such as inter-shard affinity
193 : /// and AZ affinity), and ignore aspects that reflect the total utilization of a node (which
194 : /// can fluctuate for other reasons)
195 96 : fn for_optimization(&self) -> Self {
196 96 : Self {
197 96 : utilization_score: 0,
198 96 : total_attached_shard_count: 0,
199 96 : node_id: NodeId(0),
200 96 : ..*self
201 96 : }
202 96 : }
203 :
204 52145 : fn is_overloaded(&self) -> bool {
205 52145 : PageserverUtilization::is_overloaded(self.utilization_score)
206 52145 : }
207 :
208 12872 : fn node_id(&self) -> NodeId {
209 12872 : self.node_id
210 12872 : }
211 : }
212 :
213 : /// Scheduling score of a given node for shard secondaries.
214 : /// Lower scores indicate more suitable nodes.
215 : /// Ordering is given by member declaration order (top to bottom).
216 : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
217 : pub(crate) struct NodeSecondarySchedulingScore {
218 : /// Flag indicating whether this node matches the preferred AZ
219 : /// of the shard. For secondary locations we wish to avoid nodes in.
220 : /// the preferred AZ of the shard, since that's where the attached location
221 : /// should be scheduled and having the secondary in the same AZ is bad for HA.
222 : az_match: SecondaryAzMatch,
223 : /// The number of shards belonging to the tenant currently being
224 : /// scheduled that are attached to this node.
225 : affinity_score: AffinityScore,
226 : /// Utilisation score that combines shard count and disk utilisation
227 : utilization_score: u64,
228 : /// Anti-affinity with other non-home locations: this gives the behavior that secondaries
229 : /// will spread out across the nodes in an AZ.
230 : total_non_home_shard_count: usize,
231 : /// Convenience to make selection deterministic in tests and empty systems
232 : node_id: NodeId,
233 : }
234 :
235 : impl NodeSchedulingScore for NodeSecondarySchedulingScore {
236 39177 : fn generate(
237 39177 : node_id: &NodeId,
238 39177 : node: &mut SchedulerNode,
239 39177 : preferred_az: &Option<AvailabilityZone>,
240 39177 : context: &ScheduleContext,
241 39177 : ) -> Option<Self> {
242 39177 : let utilization = match &mut node.may_schedule {
243 39177 : MaySchedule::Yes(u) => u,
244 : MaySchedule::No => {
245 0 : return None;
246 : }
247 : };
248 :
249 39177 : Some(Self {
250 39177 : az_match: SecondaryAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
251 39177 : affinity_score: context
252 39177 : .nodes
253 39177 : .get(node_id)
254 39177 : .copied()
255 39177 : .unwrap_or(AffinityScore::FREE),
256 39177 : utilization_score: utilization.cached_score(),
257 39177 : total_non_home_shard_count: (node.shard_count - node.home_shard_count),
258 39177 : node_id: *node_id,
259 39177 : })
260 39177 : }
261 :
262 10 : fn for_optimization(&self) -> Self {
263 10 : Self {
264 10 : utilization_score: 0,
265 10 : total_non_home_shard_count: 0,
266 10 : node_id: NodeId(0),
267 10 : ..*self
268 10 : }
269 10 : }
270 :
271 39162 : fn is_overloaded(&self) -> bool {
272 39162 : PageserverUtilization::is_overloaded(self.utilization_score)
273 39162 : }
274 :
275 12842 : fn node_id(&self) -> NodeId {
276 12842 : self.node_id
277 12842 : }
278 : }
279 :
280 : impl PartialEq for SchedulerNode {
281 3 : fn eq(&self, other: &Self) -> bool {
282 3 : let may_schedule_matches = matches!(
283 3 : (&self.may_schedule, &other.may_schedule),
284 : (MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
285 : );
286 :
287 3 : may_schedule_matches
288 3 : && self.shard_count == other.shard_count
289 3 : && self.attached_shard_count == other.attached_shard_count
290 3 : && self.az == other.az
291 3 : }
292 : }
293 :
294 : impl Eq for SchedulerNode {}
295 :
296 : /// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
297 : /// on which to run.
298 : ///
299 : /// The type has no persistent state of its own: this is all populated at startup. The Serialize
300 : /// impl is only for debug dumps.
301 : #[derive(Serialize)]
302 : pub(crate) struct Scheduler {
303 : nodes: HashMap<NodeId, SchedulerNode>,
304 : }
305 :
306 : /// Score for soft constraint scheduling: lower scores are preferred to higher scores.
307 : ///
308 : /// For example, we may set an affinity score based on the number of shards from the same
309 : /// tenant already on a node, to implicitly prefer to balance out shards.
310 : #[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
311 : pub(crate) struct AffinityScore(pub(crate) usize);
312 :
313 : impl AffinityScore {
314 : /// If we have no anti-affinity at all toward a node, this is its score. It means
315 : /// the scheduler has a free choice amongst nodes with this score, and may pick a node
316 : /// based on other information such as total utilization.
317 : pub(crate) const FREE: Self = Self(0);
318 :
319 25766 : pub(crate) fn inc(&mut self) {
320 25766 : self.0 += 1;
321 25766 : }
322 :
323 101 : pub(crate) fn dec(&mut self) {
324 101 : self.0 -= 1;
325 101 : }
326 : }
327 :
328 : impl std::ops::Add for AffinityScore {
329 : type Output = Self;
330 :
331 0 : fn add(self, rhs: Self) -> Self::Output {
332 0 : Self(self.0 + rhs.0)
333 0 : }
334 : }
335 :
336 : /// Hint for whether this is a sincere attempt to schedule, or a speculative
337 : /// check for where we _would_ schedule (done during optimization)
338 : #[derive(Debug, Clone)]
339 : pub(crate) enum ScheduleMode {
340 : Normal,
341 : Speculative,
342 : }
343 :
344 : impl Default for ScheduleMode {
345 5327 : fn default() -> Self {
346 5327 : Self::Normal
347 5327 : }
348 : }
349 :
350 : // For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
351 : // it for many shards in the same tenant.
352 : #[derive(Debug, Default, Clone)]
353 : pub(crate) struct ScheduleContext {
354 : /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
355 : pub(crate) nodes: HashMap<NodeId, AffinityScore>,
356 :
357 : pub(crate) mode: ScheduleMode,
358 : }
359 :
360 : impl ScheduleContext {
361 3 : pub(crate) fn new(mode: ScheduleMode) -> Self {
362 3 : Self {
363 3 : nodes: HashMap::new(),
364 3 : mode,
365 3 : }
366 3 : }
367 :
368 : /// Input is a list of nodes we would like to avoid using again within this context. The more
369 : /// times a node is passed into this call, the less inclined we are to use it.
370 12891 : pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
371 38657 : for node_id in nodes {
372 25766 : let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
373 25766 : entry.inc()
374 : }
375 12891 : }
376 :
377 : /// Remove `shard`'s contributions to this context. This is useful when considering scheduling
378 : /// this shard afresh, where we don't want it to e.g. experience anti-affinity to its current location.
379 50 : pub(crate) fn project_detach(&self, shard: &TenantShard) -> Self {
380 50 : let mut new_context = self.clone();
381 :
382 50 : if let Some(attached) = shard.intent.get_attached() {
383 50 : if let Some(score) = new_context.nodes.get_mut(attached) {
384 50 : score.dec();
385 50 : }
386 0 : }
387 :
388 51 : for secondary in shard.intent.get_secondary() {
389 51 : if let Some(score) = new_context.nodes.get_mut(secondary) {
390 51 : score.dec();
391 51 : }
392 : }
393 :
394 50 : new_context
395 50 : }
396 :
397 : /// For test, track the sum of AffinityScore values, which is effectively how many
398 : /// attached or secondary locations have been registered with this context.
399 : #[cfg(test)]
400 3 : pub(crate) fn location_count(&self) -> usize {
401 7 : self.nodes.values().map(|i| i.0).sum()
402 3 : }
403 : }
404 :
405 : pub(crate) enum RefCountUpdate {
406 : PromoteSecondary,
407 : Attach,
408 : Detach,
409 : DemoteAttached,
410 : AddSecondary,
411 : RemoveSecondary,
412 : }
413 :
414 : impl Scheduler {
415 66 : pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
416 66 : let mut scheduler_nodes = HashMap::new();
417 120 : for node in nodes {
418 54 : scheduler_nodes.insert(
419 54 : node.get_id(),
420 54 : SchedulerNode {
421 54 : shard_count: 0,
422 54 : attached_shard_count: 0,
423 54 : home_shard_count: 0,
424 54 : may_schedule: node.may_schedule(),
425 54 : az: node.get_availability_zone_id().clone(),
426 54 : },
427 54 : );
428 54 : }
429 :
430 66 : Self {
431 66 : nodes: scheduler_nodes,
432 66 : }
433 66 : }
434 :
435 : /// For debug/support: check that our internal statistics are in sync with the state of
436 : /// the nodes & tenant shards.
437 : ///
438 : /// If anything is inconsistent, log details and return an error.
439 1 : pub(crate) fn consistency_check<'a>(
440 1 : &self,
441 1 : nodes: impl Iterator<Item = &'a Node>,
442 1 : shards: impl Iterator<Item = &'a TenantShard>,
443 1 : ) -> anyhow::Result<()> {
444 1 : let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
445 4 : for node in nodes {
446 3 : expect_nodes.insert(
447 3 : node.get_id(),
448 3 : SchedulerNode {
449 3 : shard_count: 0,
450 3 : attached_shard_count: 0,
451 3 : home_shard_count: 0,
452 3 : may_schedule: node.may_schedule(),
453 3 : az: node.get_availability_zone_id().clone(),
454 3 : },
455 3 : );
456 3 : }
457 :
458 2 : for shard in shards {
459 1 : if let Some(node_id) = shard.intent.get_attached() {
460 1 : match expect_nodes.get_mut(node_id) {
461 1 : Some(node) => {
462 1 : node.shard_count += 1;
463 1 : node.attached_shard_count += 1;
464 1 : if Some(&node.az) == shard.preferred_az() {
465 0 : node.home_shard_count += 1;
466 1 : }
467 : }
468 0 : None => anyhow::bail!(
469 0 : "Tenant {} references nonexistent node {}",
470 0 : shard.tenant_shard_id,
471 0 : node_id
472 0 : ),
473 : }
474 0 : }
475 :
476 1 : for node_id in shard.intent.get_secondary() {
477 1 : match expect_nodes.get_mut(node_id) {
478 1 : Some(node) => {
479 1 : node.shard_count += 1;
480 1 : if Some(&node.az) == shard.preferred_az() {
481 0 : node.home_shard_count += 1;
482 1 : }
483 : }
484 0 : None => anyhow::bail!(
485 0 : "Tenant {} references nonexistent node {}",
486 0 : shard.tenant_shard_id,
487 0 : node_id
488 0 : ),
489 : }
490 : }
491 : }
492 :
493 4 : for (node_id, expect_node) in &expect_nodes {
494 3 : let Some(self_node) = self.nodes.get(node_id) else {
495 0 : anyhow::bail!("Node {node_id} not found in Self")
496 : };
497 :
498 3 : if self_node != expect_node {
499 0 : tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
500 0 : tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
501 0 : tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
502 :
503 0 : anyhow::bail!("Inconsistent state on {node_id}");
504 3 : }
505 : }
506 :
507 1 : if expect_nodes.len() != self.nodes.len() {
508 : // We just checked that all the expected nodes are present. If the lengths don't match,
509 : // it means that we have nodes in Self that are unexpected.
510 0 : for node_id in self.nodes.keys() {
511 0 : if !expect_nodes.contains_key(node_id) {
512 0 : anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
513 0 : }
514 : }
515 1 : }
516 :
517 1 : Ok(())
518 1 : }
519 :
520 : /// Update the reference counts of a node. These reference counts are used to guide scheduling
521 : /// decisions, not for memory management: they represent the number of tenant shard whose IntentState
522 : /// targets this node and the number of tenants shars whose IntentState is attached to this
523 : /// node.
524 : ///
525 : /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
526 : /// [`Self::new`] or [`Self::node_upsert`])
527 51401 : pub(crate) fn update_node_ref_counts(
528 51401 : &mut self,
529 51401 : node_id: NodeId,
530 51401 : preferred_az: Option<&AvailabilityZone>,
531 51401 : update: RefCountUpdate,
532 51401 : ) {
533 51401 : let Some(node) = self.nodes.get_mut(&node_id) else {
534 0 : debug_assert!(false);
535 0 : tracing::error!("Scheduler missing node {node_id}");
536 0 : return;
537 : };
538 :
539 51401 : let is_home_az = Some(&node.az) == preferred_az;
540 51401 :
541 51401 : match update {
542 5 : RefCountUpdate::PromoteSecondary => {
543 5 : node.attached_shard_count += 1;
544 5 : }
545 : RefCountUpdate::Attach => {
546 12856 : node.shard_count += 1;
547 12856 : node.attached_shard_count += 1;
548 12856 : if is_home_az {
549 12816 : node.home_shard_count += 1;
550 12816 : }
551 : }
552 : RefCountUpdate::Detach => {
553 12855 : node.shard_count -= 1;
554 12855 : node.attached_shard_count -= 1;
555 12855 : if is_home_az {
556 12818 : node.home_shard_count -= 1;
557 12818 : }
558 : }
559 5 : RefCountUpdate::DemoteAttached => {
560 5 : node.attached_shard_count -= 1;
561 5 : }
562 : RefCountUpdate::AddSecondary => {
563 12840 : node.shard_count += 1;
564 12840 : if is_home_az {
565 3 : node.home_shard_count += 1;
566 12837 : }
567 : }
568 : RefCountUpdate::RemoveSecondary => {
569 12840 : node.shard_count -= 1;
570 12840 : if is_home_az {
571 1 : node.home_shard_count -= 1;
572 12839 : }
573 : }
574 : }
575 :
576 : // Maybe update PageserverUtilization
577 51401 : match update {
578 : RefCountUpdate::AddSecondary | RefCountUpdate::Attach => {
579 : // Referencing the node: if this takes our shard_count above the utilzation structure's
580 : // shard count, then artifically bump it: this ensures that the scheduler immediately
581 : // recognizes that this node has more work on it, without waiting for the next heartbeat
582 : // to update the utilization.
583 25696 : if let MaySchedule::Yes(utilization) = &mut node.may_schedule {
584 25696 : utilization.adjust_shard_count_max(node.shard_count as u32);
585 25696 : }
586 : }
587 : RefCountUpdate::PromoteSecondary
588 : | RefCountUpdate::Detach
589 : | RefCountUpdate::RemoveSecondary
590 25705 : | RefCountUpdate::DemoteAttached => {
591 25705 : // De-referencing the node: leave the utilization's shard_count at a stale higher
592 25705 : // value until some future heartbeat after we have physically removed this shard
593 25705 : // from the node: this prevents the scheduler over-optimistically trying to schedule
594 25705 : // more work onto the node before earlier detaches are done.
595 25705 : }
596 : }
597 51401 : }
598 :
599 : // Check if the number of shards attached to a given node is lagging below
600 : // the cluster average. If that's the case, the node should be filled.
601 0 : pub(crate) fn compute_fill_requirement(&self, node_id: NodeId) -> usize {
602 0 : let Some(node) = self.nodes.get(&node_id) else {
603 0 : debug_assert!(false);
604 0 : tracing::error!("Scheduler missing node {node_id}");
605 0 : return 0;
606 : };
607 0 : assert!(!self.nodes.is_empty());
608 0 : let expected_attached_shards_per_node = self.expected_attached_shard_count();
609 :
610 0 : for (node_id, node) in self.nodes.iter() {
611 0 : tracing::trace!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
612 : }
613 :
614 0 : if node.attached_shard_count < expected_attached_shards_per_node {
615 0 : expected_attached_shards_per_node - node.attached_shard_count
616 : } else {
617 0 : 0
618 : }
619 0 : }
620 :
621 0 : pub(crate) fn expected_attached_shard_count(&self) -> usize {
622 0 : let total_attached_shards: usize =
623 0 : self.nodes.values().map(|n| n.attached_shard_count).sum();
624 0 :
625 0 : assert!(!self.nodes.is_empty());
626 0 : total_attached_shards / self.nodes.len()
627 0 : }
628 :
629 0 : pub(crate) fn nodes_by_attached_shard_count(&self) -> Vec<(NodeId, usize)> {
630 0 : self.nodes
631 0 : .iter()
632 0 : .map(|(node_id, stats)| (*node_id, stats.attached_shard_count))
633 0 : .sorted_by(|lhs, rhs| Ord::cmp(&lhs.1, &rhs.1).reverse())
634 0 : .collect()
635 0 : }
636 :
637 214 : pub(crate) fn node_upsert(&mut self, node: &Node) {
638 : use std::collections::hash_map::Entry::*;
639 214 : match self.nodes.entry(node.get_id()) {
640 3 : Occupied(mut entry) => {
641 3 : // Updates to MaySchedule are how we receive updated PageserverUtilization: adjust these values
642 3 : // to account for any shards scheduled on the controller but not yet visible to the pageserver.
643 3 : let mut may_schedule = node.may_schedule();
644 3 : match &mut may_schedule {
645 2 : MaySchedule::Yes(utilization) => {
646 2 : utilization.adjust_shard_count_max(entry.get().shard_count as u32);
647 2 : }
648 1 : MaySchedule::No => { // Nothing to tweak
649 1 : }
650 : }
651 :
652 3 : entry.get_mut().may_schedule = may_schedule;
653 : }
654 211 : Vacant(entry) => {
655 211 : entry.insert(SchedulerNode {
656 211 : shard_count: 0,
657 211 : attached_shard_count: 0,
658 211 : home_shard_count: 0,
659 211 : may_schedule: node.may_schedule(),
660 211 : az: node.get_availability_zone_id().clone(),
661 211 : });
662 211 : }
663 : }
664 214 : }
665 :
666 0 : pub(crate) fn node_remove(&mut self, node_id: NodeId) {
667 0 : if self.nodes.remove(&node_id).is_none() {
668 0 : tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
669 0 : }
670 0 : }
671 :
672 : /// Calculate a single node's score, used in optimizer logic to compare specific
673 : /// nodes' scores.
674 111 : pub(crate) fn compute_node_score<Score>(
675 111 : &mut self,
676 111 : node_id: NodeId,
677 111 : preferred_az: &Option<AvailabilityZone>,
678 111 : context: &ScheduleContext,
679 111 : ) -> Option<Score>
680 111 : where
681 111 : Score: NodeSchedulingScore,
682 111 : {
683 111 : self.nodes
684 111 : .get_mut(&node_id)
685 111 : .and_then(|node| Score::generate(&node_id, node, preferred_az, context))
686 111 : }
687 :
688 : /// Compute a schedulling score for each node that the scheduler knows of
689 : /// minus a set of hard excluded nodes.
690 25714 : fn compute_node_scores<Score>(
691 25714 : &mut self,
692 25714 : hard_exclude: &[NodeId],
693 25714 : preferred_az: &Option<AvailabilityZone>,
694 25714 : context: &ScheduleContext,
695 25714 : ) -> Vec<Score>
696 25714 : where
697 25714 : Score: NodeSchedulingScore,
698 25714 : {
699 25714 : self.nodes
700 25714 : .iter_mut()
701 104149 : .filter_map(|(k, v)| {
702 104149 : if hard_exclude.contains(k) {
703 12842 : None
704 : } else {
705 91307 : Score::generate(k, v, preferred_az, context)
706 : }
707 104149 : })
708 25714 : .collect()
709 25714 : }
710 :
711 : /// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
712 : /// are already in use by this shard -- we use this to avoid picking the same node
713 : /// as both attached and secondary location. This is a hard constraint: if we cannot
714 : /// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
715 : ///
716 : /// context: we prefer to avoid using nodes identified in the context, according
717 : /// to their anti-affinity score. We use this to prefeer to avoid placing shards in
718 : /// the same tenant on the same node. This is a soft constraint: the context will never
719 : /// cause us to fail to schedule a shard.
720 25714 : pub(crate) fn schedule_shard<Tag: ShardTag>(
721 25714 : &mut self,
722 25714 : hard_exclude: &[NodeId],
723 25714 : preferred_az: &Option<AvailabilityZone>,
724 25714 : context: &ScheduleContext,
725 25714 : ) -> Result<NodeId, ScheduleError> {
726 25714 : if self.nodes.is_empty() {
727 0 : return Err(ScheduleError::NoPageservers);
728 25714 : }
729 25714 :
730 25714 : let mut scores =
731 25714 : self.compute_node_scores::<Tag::Score>(hard_exclude, preferred_az, context);
732 25714 :
733 25714 : // Exclude nodes whose utilization is critically high, if there are alternatives available. This will
734 25714 : // cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example
735 25714 : // we may place shards in the same tenant together on the same pageserver if all other pageservers are
736 25714 : // overloaded.
737 25714 : let non_overloaded_scores = scores
738 25714 : .iter()
739 91307 : .filter(|i| !i.is_overloaded())
740 25714 : .copied()
741 25714 : .collect::<Vec<_>>();
742 25714 : if !non_overloaded_scores.is_empty() {
743 25714 : scores = non_overloaded_scores;
744 25714 : }
745 :
746 : // Sort the nodes by score. The one with the lowest scores will be the preferred node.
747 : // Refer to [`NodeAttachmentSchedulingScore`] for attached locations and
748 : // [`NodeSecondarySchedulingScore`] for secondary locations to understand how the nodes
749 : // are ranked.
750 25714 : scores.sort();
751 25714 :
752 25714 : if scores.is_empty() {
753 : // After applying constraints, no pageservers were left.
754 0 : if !matches!(context.mode, ScheduleMode::Speculative) {
755 : // If this was not a speculative attempt, log details to understand why we couldn't
756 : // schedule: this may help an engineer understand if some nodes are marked offline
757 : // in a way that's preventing progress.
758 0 : tracing::info!(
759 0 : "Scheduling failure, while excluding {hard_exclude:?}, node states:"
760 : );
761 0 : for (node_id, node) in &self.nodes {
762 0 : tracing::info!(
763 0 : "Node {node_id}: may_schedule={} shards={}",
764 0 : !matches!(node.may_schedule, MaySchedule::No),
765 : node.shard_count
766 : );
767 : }
768 0 : }
769 0 : return Err(ScheduleError::ImpossibleConstraint);
770 25714 : }
771 25714 :
772 25714 : // Lowest score wins
773 25714 : let node_id = scores.first().unwrap().node_id();
774 :
775 25714 : if !matches!(context.mode, ScheduleMode::Speculative) {
776 25714 : tracing::info!(
777 0 : "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
778 0 : scores.iter().map(|i| i.node_id().0).collect::<Vec<_>>()
779 : );
780 0 : }
781 :
782 : // Note that we do not update shard count here to reflect the scheduling: that
783 : // is IntentState's job when the scheduled location is used.
784 :
785 25714 : Ok(node_id)
786 25714 : }
787 :
788 : /// Selects any available node. This is suitable for performing background work (e.g. S3
789 : /// deletions).
790 0 : pub(crate) fn any_available_node(&mut self) -> Result<NodeId, ScheduleError> {
791 0 : self.schedule_shard::<AttachedShardTag>(&[], &None, &ScheduleContext::default())
792 0 : }
793 :
794 : /// For choosing which AZ to schedule a new shard into, use this. It will return the
795 : /// AZ with the the lowest number of shards currently scheduled in this AZ as their home
796 : /// location.
797 : ///
798 : /// We use an AZ-wide measure rather than simply selecting the AZ of the least-loaded
799 : /// node, because while tenants start out single sharded, when they grow and undergo
800 : /// shard-split, they will occupy space on many nodes within an AZ. It is important
801 : /// that we pick the AZ in a way that balances this _future_ load.
802 : ///
803 : /// Once we've picked an AZ, subsequent scheduling within that AZ will be driven by
804 : /// nodes' utilization scores.
805 303 : pub(crate) fn get_az_for_new_tenant(&self) -> Option<AvailabilityZone> {
806 303 : if self.nodes.is_empty() {
807 0 : return None;
808 303 : }
809 :
810 : #[derive(Default)]
811 : struct AzScore {
812 : home_shard_count: usize,
813 : scheduleable: bool,
814 : }
815 :
816 303 : let mut azs: HashMap<&AvailabilityZone, AzScore> = HashMap::new();
817 1818 : for node in self.nodes.values() {
818 1818 : let az = azs.entry(&node.az).or_default();
819 1818 : az.home_shard_count += node.home_shard_count;
820 1818 : az.scheduleable |= matches!(node.may_schedule, MaySchedule::Yes(_));
821 : }
822 :
823 : // If any AZs are schedulable, then filter out the non-schedulable ones (i.e. AZs where
824 : // all nodes are overloaded or otherwise unschedulable).
825 303 : if azs.values().any(|i| i.scheduleable) {
826 906 : azs.retain(|_, i| i.scheduleable);
827 303 : }
828 :
829 : // Find the AZ with the lowest number of shards currently allocated
830 303 : Some(
831 303 : azs.into_iter()
832 906 : .min_by_key(|i| (i.1.home_shard_count, i.0))
833 303 : .unwrap()
834 303 : .0
835 303 : .clone(),
836 303 : )
837 303 : }
838 :
839 2 : pub(crate) fn get_node_az(&self, node_id: &NodeId) -> Option<AvailabilityZone> {
840 2 : self.nodes.get(node_id).map(|n| n.az.clone())
841 2 : }
842 :
843 : /// For use when choosing a preferred secondary location: filter out nodes that are not
844 : /// available, and gather their AZs.
845 12829 : pub(crate) fn filter_usable_nodes(
846 12829 : &self,
847 12829 : nodes: &[NodeId],
848 12829 : ) -> Vec<(NodeId, Option<AvailabilityZone>)> {
849 12829 : nodes
850 12829 : .iter()
851 12829 : .filter_map(|node_id| {
852 2 : let node = self
853 2 : .nodes
854 2 : .get(node_id)
855 2 : .expect("Referenced nodes always exist");
856 2 : if matches!(node.may_schedule, MaySchedule::Yes(_)) {
857 1 : Some((*node_id, Some(node.az.clone())))
858 : } else {
859 1 : None
860 : }
861 12829 : })
862 12829 : .collect()
863 12829 : }
864 :
865 : /// Unit test access to internal state
866 : #[cfg(test)]
867 19 : pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
868 19 : self.nodes.get(&node_id).unwrap().shard_count
869 19 : }
870 :
871 : #[cfg(test)]
872 19 : pub(crate) fn get_node_attached_shard_count(&self, node_id: NodeId) -> usize {
873 19 : self.nodes.get(&node_id).unwrap().attached_shard_count
874 19 : }
875 :
876 : /// Some metrics that we only calculate periodically: this is simpler than
877 : /// rigorously updating them on every change.
878 0 : pub(crate) fn update_metrics(&self) {
879 0 : for (node_id, node) in &self.nodes {
880 0 : let node_id_str = format!("{}", node_id);
881 0 : let label_group = NodeLabelGroup {
882 0 : az: &node.az.0,
883 0 : node_id: &node_id_str,
884 0 : };
885 0 :
886 0 : crate::metrics::METRICS_REGISTRY
887 0 : .metrics_group
888 0 : .storage_controller_node_shards
889 0 : .set(label_group.clone(), node.shard_count as i64);
890 0 :
891 0 : crate::metrics::METRICS_REGISTRY
892 0 : .metrics_group
893 0 : .storage_controller_node_attached_shards
894 0 : .set(label_group.clone(), node.attached_shard_count as i64);
895 0 :
896 0 : crate::metrics::METRICS_REGISTRY
897 0 : .metrics_group
898 0 : .storage_controller_node_home_shards
899 0 : .set(label_group.clone(), node.home_shard_count as i64);
900 0 : }
901 0 : }
902 : }
903 :
904 : #[cfg(test)]
905 : pub(crate) mod test_utils {
906 :
907 : use crate::node::Node;
908 : use pageserver_api::{
909 : controller_api::{AvailabilityZone, NodeAvailability},
910 : models::utilization::test_utilization,
911 : };
912 : use std::collections::HashMap;
913 : use utils::id::NodeId;
914 :
915 : /// Test helper: synthesize the requested number of nodes, all in active state.
916 : ///
917 : /// Node IDs start at one.
918 : ///
919 : /// The `azs` argument specifies the list of availability zones which will be assigned
920 : /// to nodes in round-robin fashion. If empy, a default AZ is assigned.
921 66 : pub(crate) fn make_test_nodes(n: u64, azs: &[AvailabilityZone]) -> HashMap<NodeId, Node> {
922 66 : let mut az_iter = azs.iter().cycle();
923 66 :
924 66 : (1..n + 1)
925 265 : .map(|i| {
926 265 : (NodeId(i), {
927 265 : let mut node = Node::new(
928 265 : NodeId(i),
929 265 : format!("httphost-{i}"),
930 265 : 80 + i as u16,
931 265 : format!("pghost-{i}"),
932 265 : 5432 + i as u16,
933 265 : az_iter
934 265 : .next()
935 265 : .cloned()
936 265 : .unwrap_or(AvailabilityZone("test-az".to_string())),
937 265 : );
938 265 : node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
939 265 : assert!(node.is_available());
940 265 : node
941 265 : })
942 265 : })
943 66 : .collect()
944 66 : }
945 : }
946 :
947 : #[cfg(test)]
948 : mod tests {
949 : use pageserver_api::{
950 : controller_api::NodeAvailability, models::utilization::test_utilization,
951 : shard::ShardIdentity,
952 : };
953 : use utils::{
954 : id::TenantId,
955 : shard::{ShardCount, ShardNumber, TenantShardId},
956 : };
957 :
958 : use super::*;
959 :
960 : use crate::tenant_shard::IntentState;
961 : #[test]
962 1 : fn scheduler_basic() -> anyhow::Result<()> {
963 1 : let nodes = test_utils::make_test_nodes(2, &[]);
964 1 :
965 1 : let mut scheduler = Scheduler::new(nodes.values());
966 1 : let mut t1_intent = IntentState::new(None);
967 1 : let mut t2_intent = IntentState::new(None);
968 1 :
969 1 : let context = ScheduleContext::default();
970 :
971 1 : let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
972 1 : t1_intent.set_attached(&mut scheduler, Some(scheduled));
973 1 : let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
974 1 : t2_intent.set_attached(&mut scheduler, Some(scheduled));
975 1 :
976 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
977 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
978 :
979 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
980 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
981 :
982 1 : let scheduled = scheduler.schedule_shard::<AttachedShardTag>(
983 1 : &t1_intent.all_pageservers(),
984 1 : &None,
985 1 : &context,
986 1 : )?;
987 1 : t1_intent.push_secondary(&mut scheduler, scheduled);
988 1 :
989 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
990 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
991 :
992 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
993 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
994 :
995 1 : t1_intent.clear(&mut scheduler);
996 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
997 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
998 :
999 1 : let total_attached = scheduler.get_node_attached_shard_count(NodeId(1))
1000 1 : + scheduler.get_node_attached_shard_count(NodeId(2));
1001 1 : assert_eq!(total_attached, 1);
1002 :
1003 1 : if cfg!(debug_assertions) {
1004 : // Dropping an IntentState without clearing it causes a panic in debug mode,
1005 : // because we have failed to properly update scheduler shard counts.
1006 1 : let result = std::panic::catch_unwind(move || {
1007 1 : drop(t2_intent);
1008 1 : });
1009 1 : assert!(result.is_err());
1010 : } else {
1011 0 : t2_intent.clear(&mut scheduler);
1012 0 :
1013 0 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
1014 0 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 0);
1015 :
1016 0 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 0);
1017 0 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0);
1018 : }
1019 :
1020 1 : Ok(())
1021 1 : }
1022 :
1023 : #[test]
1024 : /// Test the PageserverUtilization's contribution to scheduling algorithm
1025 1 : fn scheduler_utilization() {
1026 1 : let mut nodes = test_utils::make_test_nodes(3, &[]);
1027 1 : let mut scheduler = Scheduler::new(nodes.values());
1028 1 :
1029 1 : // Need to keep these alive because they contribute to shard counts via RAII
1030 1 : let mut scheduled_intents = Vec::new();
1031 1 :
1032 1 : let empty_context = ScheduleContext::default();
1033 :
1034 11 : fn assert_scheduler_chooses(
1035 11 : expect_node: NodeId,
1036 11 : scheduled_intents: &mut Vec<IntentState>,
1037 11 : scheduler: &mut Scheduler,
1038 11 : context: &ScheduleContext,
1039 11 : ) {
1040 11 : let scheduled = scheduler
1041 11 : .schedule_shard::<AttachedShardTag>(&[], &None, context)
1042 11 : .unwrap();
1043 11 : let mut intent = IntentState::new(None);
1044 11 : intent.set_attached(scheduler, Some(scheduled));
1045 11 : scheduled_intents.push(intent);
1046 11 : assert_eq!(scheduled, expect_node);
1047 11 : }
1048 :
1049 : // Independent schedule calls onto empty nodes should round-robin, because each node's
1050 : // utilization's shard count is updated inline. The order is determinsitic because when all other factors are
1051 : // equal, we order by node ID.
1052 1 : assert_scheduler_chooses(
1053 1 : NodeId(1),
1054 1 : &mut scheduled_intents,
1055 1 : &mut scheduler,
1056 1 : &empty_context,
1057 1 : );
1058 1 : assert_scheduler_chooses(
1059 1 : NodeId(2),
1060 1 : &mut scheduled_intents,
1061 1 : &mut scheduler,
1062 1 : &empty_context,
1063 1 : );
1064 1 : assert_scheduler_chooses(
1065 1 : NodeId(3),
1066 1 : &mut scheduled_intents,
1067 1 : &mut scheduler,
1068 1 : &empty_context,
1069 1 : );
1070 1 :
1071 1 : // Manually setting utilization higher should cause schedule calls to round-robin the other nodes
1072 1 : // which have equal utilization.
1073 1 : nodes
1074 1 : .get_mut(&NodeId(1))
1075 1 : .unwrap()
1076 1 : .set_availability(NodeAvailability::Active(test_utilization::simple(
1077 1 : 10,
1078 1 : 1024 * 1024 * 1024,
1079 1 : )));
1080 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1081 1 :
1082 1 : assert_scheduler_chooses(
1083 1 : NodeId(2),
1084 1 : &mut scheduled_intents,
1085 1 : &mut scheduler,
1086 1 : &empty_context,
1087 1 : );
1088 1 : assert_scheduler_chooses(
1089 1 : NodeId(3),
1090 1 : &mut scheduled_intents,
1091 1 : &mut scheduler,
1092 1 : &empty_context,
1093 1 : );
1094 1 : assert_scheduler_chooses(
1095 1 : NodeId(2),
1096 1 : &mut scheduled_intents,
1097 1 : &mut scheduler,
1098 1 : &empty_context,
1099 1 : );
1100 1 : assert_scheduler_chooses(
1101 1 : NodeId(3),
1102 1 : &mut scheduled_intents,
1103 1 : &mut scheduler,
1104 1 : &empty_context,
1105 1 : );
1106 1 :
1107 1 : // The scheduler should prefer nodes with lower affinity score,
1108 1 : // even if they have higher utilization (as long as they aren't utilized at >100%)
1109 1 : let mut context_prefer_node1 = ScheduleContext::default();
1110 1 : context_prefer_node1.avoid(&[NodeId(2), NodeId(3)]);
1111 1 : assert_scheduler_chooses(
1112 1 : NodeId(1),
1113 1 : &mut scheduled_intents,
1114 1 : &mut scheduler,
1115 1 : &context_prefer_node1,
1116 1 : );
1117 1 : assert_scheduler_chooses(
1118 1 : NodeId(1),
1119 1 : &mut scheduled_intents,
1120 1 : &mut scheduler,
1121 1 : &context_prefer_node1,
1122 1 : );
1123 1 :
1124 1 : // If a node is over-utilized, it will not be used even if affinity scores prefer it
1125 1 : nodes
1126 1 : .get_mut(&NodeId(1))
1127 1 : .unwrap()
1128 1 : .set_availability(NodeAvailability::Active(test_utilization::simple(
1129 1 : 20000,
1130 1 : 1024 * 1024 * 1024,
1131 1 : )));
1132 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1133 1 : assert_scheduler_chooses(
1134 1 : NodeId(2),
1135 1 : &mut scheduled_intents,
1136 1 : &mut scheduler,
1137 1 : &context_prefer_node1,
1138 1 : );
1139 1 : assert_scheduler_chooses(
1140 1 : NodeId(3),
1141 1 : &mut scheduled_intents,
1142 1 : &mut scheduler,
1143 1 : &context_prefer_node1,
1144 1 : );
1145 :
1146 12 : for mut intent in scheduled_intents {
1147 11 : intent.clear(&mut scheduler);
1148 11 : }
1149 1 : }
1150 :
1151 : #[test]
1152 : /// A simple test that showcases AZ-aware scheduling and its interaction with
1153 : /// affinity scores.
1154 1 : fn az_scheduling() {
1155 1 : let az_a_tag = AvailabilityZone("az-a".to_string());
1156 1 : let az_b_tag = AvailabilityZone("az-b".to_string());
1157 1 :
1158 1 : let nodes = test_utils::make_test_nodes(3, &[az_a_tag.clone(), az_b_tag.clone()]);
1159 1 : let mut scheduler = Scheduler::new(nodes.values());
1160 1 :
1161 1 : // Need to keep these alive because they contribute to shard counts via RAII
1162 1 : let mut scheduled_intents = Vec::new();
1163 1 :
1164 1 : let mut context = ScheduleContext::default();
1165 :
1166 4 : fn assert_scheduler_chooses<Tag: ShardTag>(
1167 4 : expect_node: NodeId,
1168 4 : preferred_az: Option<AvailabilityZone>,
1169 4 : scheduled_intents: &mut Vec<IntentState>,
1170 4 : scheduler: &mut Scheduler,
1171 4 : context: &mut ScheduleContext,
1172 4 : ) {
1173 4 : let scheduled = scheduler
1174 4 : .schedule_shard::<Tag>(&[], &preferred_az, context)
1175 4 : .unwrap();
1176 4 : let mut intent = IntentState::new(preferred_az.clone());
1177 4 : intent.set_attached(scheduler, Some(scheduled));
1178 4 : scheduled_intents.push(intent);
1179 4 : assert_eq!(scheduled, expect_node);
1180 :
1181 4 : context.avoid(&[scheduled]);
1182 4 : }
1183 :
1184 1 : assert_scheduler_chooses::<AttachedShardTag>(
1185 1 : NodeId(1),
1186 1 : Some(az_a_tag.clone()),
1187 1 : &mut scheduled_intents,
1188 1 : &mut scheduler,
1189 1 : &mut context,
1190 1 : );
1191 1 :
1192 1 : // Node 2 and 3 have affinity score equal to 0, but node 3
1193 1 : // is in "az-a" so we prefer that.
1194 1 : assert_scheduler_chooses::<AttachedShardTag>(
1195 1 : NodeId(3),
1196 1 : Some(az_a_tag.clone()),
1197 1 : &mut scheduled_intents,
1198 1 : &mut scheduler,
1199 1 : &mut context,
1200 1 : );
1201 1 :
1202 1 : // Node 1 and 3 (az-a) have same affinity score, so prefer the lowest node id.
1203 1 : assert_scheduler_chooses::<AttachedShardTag>(
1204 1 : NodeId(1),
1205 1 : Some(az_a_tag.clone()),
1206 1 : &mut scheduled_intents,
1207 1 : &mut scheduler,
1208 1 : &mut context,
1209 1 : );
1210 1 :
1211 1 : // Avoid nodes in "az-a" for the secondary location.
1212 1 : assert_scheduler_chooses::<SecondaryShardTag>(
1213 1 : NodeId(2),
1214 1 : Some(az_a_tag.clone()),
1215 1 : &mut scheduled_intents,
1216 1 : &mut scheduler,
1217 1 : &mut context,
1218 1 : );
1219 :
1220 5 : for mut intent in scheduled_intents {
1221 4 : intent.clear(&mut scheduler);
1222 4 : }
1223 1 : }
1224 :
1225 : #[test]
1226 1 : fn az_scheduling_for_new_tenant() {
1227 1 : let az_a_tag = AvailabilityZone("az-a".to_string());
1228 1 : let az_b_tag = AvailabilityZone("az-b".to_string());
1229 1 : let nodes = test_utils::make_test_nodes(
1230 1 : 6,
1231 1 : &[
1232 1 : az_a_tag.clone(),
1233 1 : az_a_tag.clone(),
1234 1 : az_a_tag.clone(),
1235 1 : az_b_tag.clone(),
1236 1 : az_b_tag.clone(),
1237 1 : az_b_tag.clone(),
1238 1 : ],
1239 1 : );
1240 1 :
1241 1 : let mut scheduler = Scheduler::new(nodes.values());
1242 :
1243 : /// Force the `home_shard_count` of a node directly: this is the metric used
1244 : /// by the scheduler when picking AZs.
1245 3 : fn set_shard_count(scheduler: &mut Scheduler, node_id: NodeId, shard_count: usize) {
1246 3 : let node = scheduler.nodes.get_mut(&node_id).unwrap();
1247 3 : node.home_shard_count = shard_count;
1248 3 : }
1249 :
1250 : // Initial empty state. Scores are tied, scheduler prefers lower AZ ID.
1251 1 : assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone()));
1252 :
1253 : // Home shard count is higher in AZ A, so AZ B will be preferred
1254 1 : set_shard_count(&mut scheduler, NodeId(1), 10);
1255 1 : assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_b_tag.clone()));
1256 :
1257 : // Total home shard count is higher in AZ B, so we revert to preferring AZ A
1258 1 : set_shard_count(&mut scheduler, NodeId(4), 6);
1259 1 : set_shard_count(&mut scheduler, NodeId(5), 6);
1260 1 : assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone()));
1261 1 : }
1262 :
1263 : /// Test that when selecting AZs for many new tenants, we get the expected balance across nodes
1264 : #[test]
1265 1 : fn az_selection_many() {
1266 1 : let az_a_tag = AvailabilityZone("az-a".to_string());
1267 1 : let az_b_tag = AvailabilityZone("az-b".to_string());
1268 1 : let az_c_tag = AvailabilityZone("az-c".to_string());
1269 1 : let nodes = test_utils::make_test_nodes(
1270 1 : 6,
1271 1 : &[
1272 1 : az_a_tag.clone(),
1273 1 : az_b_tag.clone(),
1274 1 : az_c_tag.clone(),
1275 1 : az_a_tag.clone(),
1276 1 : az_b_tag.clone(),
1277 1 : az_c_tag.clone(),
1278 1 : ],
1279 1 : );
1280 1 :
1281 1 : let mut scheduler = Scheduler::new(nodes.values());
1282 1 :
1283 1 : // We should get 1/6th of these on each node, give or take a few...
1284 1 : let total_tenants = 300;
1285 1 :
1286 1 : // ...where the 'few' is the number of AZs, because the scheduling will sometimes overshoot
1287 1 : // on one AZ before correcting itself. This is because we select the 'home' AZ based on
1288 1 : // an AZ-wide metric, but we select the location for secondaries on a purely node-based
1289 1 : // metric (while excluding the home AZ).
1290 1 : let grace = 3;
1291 1 :
1292 1 : let mut scheduled_shards = Vec::new();
1293 300 : for _i in 0..total_tenants {
1294 300 : let preferred_az = scheduler.get_az_for_new_tenant().unwrap();
1295 300 :
1296 300 : let mut node_home_counts = scheduler
1297 300 : .nodes
1298 300 : .iter()
1299 1800 : .map(|(node_id, node)| (node_id, node.home_shard_count))
1300 300 : .collect::<Vec<_>>();
1301 7200 : node_home_counts.sort_by_key(|i| i.0);
1302 300 : eprintln!("Selected {}, vs nodes {:?}", preferred_az, node_home_counts);
1303 300 :
1304 300 : let tenant_shard_id = TenantShardId {
1305 300 : tenant_id: TenantId::generate(),
1306 300 : shard_number: ShardNumber(0),
1307 300 : shard_count: ShardCount(1),
1308 300 : };
1309 300 :
1310 300 : let shard_identity = ShardIdentity::new(
1311 300 : tenant_shard_id.shard_number,
1312 300 : tenant_shard_id.shard_count,
1313 300 : pageserver_api::shard::ShardStripeSize(1),
1314 300 : )
1315 300 : .unwrap();
1316 300 : let mut shard = TenantShard::new(
1317 300 : tenant_shard_id,
1318 300 : shard_identity,
1319 300 : pageserver_api::controller_api::PlacementPolicy::Attached(1),
1320 300 : Some(preferred_az),
1321 300 : );
1322 300 :
1323 300 : let mut context = ScheduleContext::default();
1324 300 : shard.schedule(&mut scheduler, &mut context).unwrap();
1325 300 : eprintln!("Scheduled shard at {:?}", shard.intent);
1326 300 :
1327 300 : scheduled_shards.push(shard);
1328 300 : }
1329 :
1330 7 : for (node_id, node) in &scheduler.nodes {
1331 6 : eprintln!(
1332 6 : "Node {}: {} {} {}",
1333 6 : node_id, node.shard_count, node.attached_shard_count, node.home_shard_count
1334 6 : );
1335 6 : }
1336 :
1337 6 : for node in scheduler.nodes.values() {
1338 6 : assert!((node.home_shard_count as i64 - total_tenants as i64 / 6).abs() < grace);
1339 : }
1340 :
1341 301 : for mut shard in scheduled_shards {
1342 300 : shard.intent.clear(&mut scheduler);
1343 300 : }
1344 1 : }
1345 :
1346 : #[test]
1347 : /// Make sure that when we have an odd number of nodes and an even number of shards, we still
1348 : /// get scheduling stability.
1349 1 : fn odd_nodes_stability() {
1350 1 : let az_a = AvailabilityZone("az-a".to_string());
1351 1 : let az_b = AvailabilityZone("az-b".to_string());
1352 1 :
1353 1 : let nodes = test_utils::make_test_nodes(
1354 1 : 10,
1355 1 : &[
1356 1 : az_a.clone(),
1357 1 : az_a.clone(),
1358 1 : az_a.clone(),
1359 1 : az_a.clone(),
1360 1 : az_a.clone(),
1361 1 : az_b.clone(),
1362 1 : az_b.clone(),
1363 1 : az_b.clone(),
1364 1 : az_b.clone(),
1365 1 : az_b.clone(),
1366 1 : ],
1367 1 : );
1368 1 : let mut scheduler = Scheduler::new(nodes.values());
1369 1 :
1370 1 : // Need to keep these alive because they contribute to shard counts via RAII
1371 1 : let mut scheduled_shards = Vec::new();
1372 1 :
1373 1 : let mut context = ScheduleContext::default();
1374 :
1375 8 : fn schedule_shard(
1376 8 : tenant_shard_id: TenantShardId,
1377 8 : expect_attached: NodeId,
1378 8 : expect_secondary: NodeId,
1379 8 : scheduled_shards: &mut Vec<TenantShard>,
1380 8 : scheduler: &mut Scheduler,
1381 8 : preferred_az: Option<AvailabilityZone>,
1382 8 : context: &mut ScheduleContext,
1383 8 : ) {
1384 8 : let shard_identity = ShardIdentity::new(
1385 8 : tenant_shard_id.shard_number,
1386 8 : tenant_shard_id.shard_count,
1387 8 : pageserver_api::shard::ShardStripeSize(1),
1388 8 : )
1389 8 : .unwrap();
1390 8 : let mut shard = TenantShard::new(
1391 8 : tenant_shard_id,
1392 8 : shard_identity,
1393 8 : pageserver_api::controller_api::PlacementPolicy::Attached(1),
1394 8 : preferred_az,
1395 8 : );
1396 8 :
1397 8 : shard.schedule(scheduler, context).unwrap();
1398 8 :
1399 8 : assert_eq!(shard.intent.get_attached().unwrap(), expect_attached);
1400 8 : assert_eq!(
1401 8 : shard.intent.get_secondary().first().unwrap(),
1402 8 : &expect_secondary
1403 8 : );
1404 :
1405 8 : scheduled_shards.push(shard);
1406 8 : }
1407 :
1408 1 : let tenant_id = TenantId::generate();
1409 1 :
1410 1 : schedule_shard(
1411 1 : TenantShardId {
1412 1 : tenant_id,
1413 1 : shard_number: ShardNumber(0),
1414 1 : shard_count: ShardCount(8),
1415 1 : },
1416 1 : NodeId(1),
1417 1 : NodeId(6),
1418 1 : &mut scheduled_shards,
1419 1 : &mut scheduler,
1420 1 : Some(az_a.clone()),
1421 1 : &mut context,
1422 1 : );
1423 1 :
1424 1 : schedule_shard(
1425 1 : TenantShardId {
1426 1 : tenant_id,
1427 1 : shard_number: ShardNumber(1),
1428 1 : shard_count: ShardCount(8),
1429 1 : },
1430 1 : NodeId(2),
1431 1 : NodeId(7),
1432 1 : &mut scheduled_shards,
1433 1 : &mut scheduler,
1434 1 : Some(az_a.clone()),
1435 1 : &mut context,
1436 1 : );
1437 1 :
1438 1 : schedule_shard(
1439 1 : TenantShardId {
1440 1 : tenant_id,
1441 1 : shard_number: ShardNumber(2),
1442 1 : shard_count: ShardCount(8),
1443 1 : },
1444 1 : NodeId(3),
1445 1 : NodeId(8),
1446 1 : &mut scheduled_shards,
1447 1 : &mut scheduler,
1448 1 : Some(az_a.clone()),
1449 1 : &mut context,
1450 1 : );
1451 1 :
1452 1 : schedule_shard(
1453 1 : TenantShardId {
1454 1 : tenant_id,
1455 1 : shard_number: ShardNumber(3),
1456 1 : shard_count: ShardCount(8),
1457 1 : },
1458 1 : NodeId(4),
1459 1 : NodeId(9),
1460 1 : &mut scheduled_shards,
1461 1 : &mut scheduler,
1462 1 : Some(az_a.clone()),
1463 1 : &mut context,
1464 1 : );
1465 1 :
1466 1 : schedule_shard(
1467 1 : TenantShardId {
1468 1 : tenant_id,
1469 1 : shard_number: ShardNumber(4),
1470 1 : shard_count: ShardCount(8),
1471 1 : },
1472 1 : NodeId(5),
1473 1 : NodeId(10),
1474 1 : &mut scheduled_shards,
1475 1 : &mut scheduler,
1476 1 : Some(az_a.clone()),
1477 1 : &mut context,
1478 1 : );
1479 1 :
1480 1 : schedule_shard(
1481 1 : TenantShardId {
1482 1 : tenant_id,
1483 1 : shard_number: ShardNumber(5),
1484 1 : shard_count: ShardCount(8),
1485 1 : },
1486 1 : NodeId(1),
1487 1 : NodeId(6),
1488 1 : &mut scheduled_shards,
1489 1 : &mut scheduler,
1490 1 : Some(az_a.clone()),
1491 1 : &mut context,
1492 1 : );
1493 1 :
1494 1 : schedule_shard(
1495 1 : TenantShardId {
1496 1 : tenant_id,
1497 1 : shard_number: ShardNumber(6),
1498 1 : shard_count: ShardCount(8),
1499 1 : },
1500 1 : NodeId(2),
1501 1 : NodeId(7),
1502 1 : &mut scheduled_shards,
1503 1 : &mut scheduler,
1504 1 : Some(az_a.clone()),
1505 1 : &mut context,
1506 1 : );
1507 1 :
1508 1 : schedule_shard(
1509 1 : TenantShardId {
1510 1 : tenant_id,
1511 1 : shard_number: ShardNumber(7),
1512 1 : shard_count: ShardCount(8),
1513 1 : },
1514 1 : NodeId(3),
1515 1 : NodeId(8),
1516 1 : &mut scheduled_shards,
1517 1 : &mut scheduler,
1518 1 : Some(az_a.clone()),
1519 1 : &mut context,
1520 1 : );
1521 :
1522 : // Assert that the optimizer suggests nochanges, i.e. our initial scheduling was stable.
1523 9 : for shard in &scheduled_shards {
1524 8 : assert_eq!(shard.optimize_attachment(&mut scheduler, &context), None);
1525 : }
1526 :
1527 9 : for mut shard in scheduled_shards {
1528 8 : shard.intent.clear(&mut scheduler);
1529 8 : }
1530 1 : }
1531 : }
|