Line data Source code
1 : use crate::{node::Node, tenant_shard::TenantShard};
2 : use itertools::Itertools;
3 : use pageserver_api::models::PageserverUtilization;
4 : use serde::Serialize;
5 : use std::collections::HashMap;
6 : use utils::{http::error::ApiError, id::NodeId};
7 :
8 : /// Scenarios in which we cannot find a suitable location for a tenant shard
9 0 : #[derive(thiserror::Error, Debug)]
10 : pub enum ScheduleError {
11 : #[error("No pageservers found")]
12 : NoPageservers,
13 : #[error("No pageserver found matching constraint")]
14 : ImpossibleConstraint,
15 : }
16 :
17 : impl From<ScheduleError> for ApiError {
18 0 : fn from(value: ScheduleError) -> Self {
19 0 : ApiError::Conflict(format!("Scheduling error: {}", value))
20 0 : }
21 : }
22 :
23 : #[derive(Serialize)]
24 : pub enum MaySchedule {
25 : Yes(PageserverUtilization),
26 : No,
27 : }
28 :
29 : #[derive(Serialize)]
30 : struct SchedulerNode {
31 : /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
32 : shard_count: usize,
33 : /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
34 : attached_shard_count: usize,
35 :
36 : /// Whether this node is currently elegible to have new shards scheduled (this is derived
37 : /// from a node's availability state and scheduling policy).
38 : may_schedule: MaySchedule,
39 : }
40 :
41 : impl PartialEq for SchedulerNode {
42 3 : fn eq(&self, other: &Self) -> bool {
43 3 : let may_schedule_matches = matches!(
44 3 : (&self.may_schedule, &other.may_schedule),
45 : (MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
46 : );
47 :
48 3 : may_schedule_matches
49 3 : && self.shard_count == other.shard_count
50 3 : && self.attached_shard_count == other.attached_shard_count
51 3 : }
52 : }
53 :
54 : impl Eq for SchedulerNode {}
55 :
56 : /// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
57 : /// on which to run.
58 : ///
59 : /// The type has no persistent state of its own: this is all populated at startup. The Serialize
60 : /// impl is only for debug dumps.
61 : #[derive(Serialize)]
62 : pub(crate) struct Scheduler {
63 : nodes: HashMap<NodeId, SchedulerNode>,
64 : }
65 :
66 : /// Score for soft constraint scheduling: lower scores are preferred to higher scores.
67 : ///
68 : /// For example, we may set an affinity score based on the number of shards from the same
69 : /// tenant already on a node, to implicitly prefer to balance out shards.
70 : #[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
71 : pub(crate) struct AffinityScore(pub(crate) usize);
72 :
73 : impl AffinityScore {
74 : /// If we have no anti-affinity at all toward a node, this is its score. It means
75 : /// the scheduler has a free choice amongst nodes with this score, and may pick a node
76 : /// based on other information such as total utilization.
77 : pub(crate) const FREE: Self = Self(0);
78 :
79 80 : pub(crate) fn inc(&mut self) {
80 80 : self.0 += 1;
81 80 : }
82 : }
83 :
84 : impl std::ops::Add for AffinityScore {
85 : type Output = Self;
86 :
87 20 : fn add(self, rhs: Self) -> Self::Output {
88 20 : Self(self.0 + rhs.0)
89 20 : }
90 : }
91 :
92 : /// Hint for whether this is a sincere attempt to schedule, or a speculative
93 : /// check for where we _would_ schedule (done during optimization)
94 : #[derive(Debug)]
95 : pub(crate) enum ScheduleMode {
96 : Normal,
97 : Speculative,
98 : }
99 :
100 : impl Default for ScheduleMode {
101 16 : fn default() -> Self {
102 16 : Self::Normal
103 16 : }
104 : }
105 :
106 : // For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
107 : // it for many shards in the same tenant.
108 : #[derive(Debug, Default)]
109 : pub(crate) struct ScheduleContext {
110 : /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
111 : pub(crate) nodes: HashMap<NodeId, AffinityScore>,
112 :
113 : /// Specifically how many _attached_ locations are on each node
114 : pub(crate) attached_nodes: HashMap<NodeId, usize>,
115 :
116 : pub(crate) mode: ScheduleMode,
117 : }
118 :
119 : impl ScheduleContext {
120 : /// Input is a list of nodes we would like to avoid using again within this context. The more
121 : /// times a node is passed into this call, the less inclined we are to use it.
122 41 : pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
123 121 : for node_id in nodes {
124 80 : let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
125 80 : entry.inc()
126 : }
127 41 : }
128 :
129 39 : pub(crate) fn push_attached(&mut self, node_id: NodeId) {
130 39 : let entry = self.attached_nodes.entry(node_id).or_default();
131 39 : *entry += 1;
132 39 : }
133 :
134 45 : pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
135 45 : self.nodes
136 45 : .get(&node_id)
137 45 : .copied()
138 45 : .unwrap_or(AffinityScore::FREE)
139 45 : }
140 :
141 45 : pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
142 45 : self.attached_nodes.get(&node_id).copied().unwrap_or(0)
143 45 : }
144 : }
145 :
146 : pub(crate) enum RefCountUpdate {
147 : PromoteSecondary,
148 : Attach,
149 : Detach,
150 : DemoteAttached,
151 : AddSecondary,
152 : RemoveSecondary,
153 : }
154 :
155 : impl Scheduler {
156 8 : pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
157 8 : let mut scheduler_nodes = HashMap::new();
158 29 : for node in nodes {
159 21 : scheduler_nodes.insert(
160 21 : node.get_id(),
161 21 : SchedulerNode {
162 21 : shard_count: 0,
163 21 : attached_shard_count: 0,
164 21 : may_schedule: node.may_schedule(),
165 21 : },
166 21 : );
167 21 : }
168 :
169 8 : Self {
170 8 : nodes: scheduler_nodes,
171 8 : }
172 8 : }
173 :
174 : /// For debug/support: check that our internal statistics are in sync with the state of
175 : /// the nodes & tenant shards.
176 : ///
177 : /// If anything is inconsistent, log details and return an error.
178 1 : pub(crate) fn consistency_check<'a>(
179 1 : &self,
180 1 : nodes: impl Iterator<Item = &'a Node>,
181 1 : shards: impl Iterator<Item = &'a TenantShard>,
182 1 : ) -> anyhow::Result<()> {
183 1 : let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
184 4 : for node in nodes {
185 3 : expect_nodes.insert(
186 3 : node.get_id(),
187 3 : SchedulerNode {
188 3 : shard_count: 0,
189 3 : attached_shard_count: 0,
190 3 : may_schedule: node.may_schedule(),
191 3 : },
192 3 : );
193 3 : }
194 :
195 2 : for shard in shards {
196 1 : if let Some(node_id) = shard.intent.get_attached() {
197 1 : match expect_nodes.get_mut(node_id) {
198 1 : Some(node) => {
199 1 : node.shard_count += 1;
200 1 : node.attached_shard_count += 1;
201 1 : }
202 0 : None => anyhow::bail!(
203 0 : "Tenant {} references nonexistent node {}",
204 0 : shard.tenant_shard_id,
205 0 : node_id
206 0 : ),
207 : }
208 0 : }
209 :
210 1 : for node_id in shard.intent.get_secondary() {
211 1 : match expect_nodes.get_mut(node_id) {
212 1 : Some(node) => node.shard_count += 1,
213 0 : None => anyhow::bail!(
214 0 : "Tenant {} references nonexistent node {}",
215 0 : shard.tenant_shard_id,
216 0 : node_id
217 0 : ),
218 : }
219 : }
220 : }
221 :
222 4 : for (node_id, expect_node) in &expect_nodes {
223 3 : let Some(self_node) = self.nodes.get(node_id) else {
224 0 : anyhow::bail!("Node {node_id} not found in Self")
225 : };
226 :
227 3 : if self_node != expect_node {
228 0 : tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
229 0 : tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
230 0 : tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
231 :
232 0 : anyhow::bail!("Inconsistent state on {node_id}");
233 3 : }
234 : }
235 :
236 1 : if expect_nodes.len() != self.nodes.len() {
237 : // We just checked that all the expected nodes are present. If the lengths don't match,
238 : // it means that we have nodes in Self that are unexpected.
239 0 : for node_id in self.nodes.keys() {
240 0 : if !expect_nodes.contains_key(node_id) {
241 0 : anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
242 0 : }
243 : }
244 1 : }
245 :
246 1 : Ok(())
247 1 : }
248 :
249 : /// Update the reference counts of a node. These reference counts are used to guide scheduling
250 : /// decisions, not for memory management: they represent the number of tenant shard whose IntentState
251 : /// targets this node and the number of tenants shars whose IntentState is attached to this
252 : /// node.
253 : ///
254 : /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
255 : /// [`Self::new`] or [`Self::node_upsert`])
256 91 : pub(crate) fn update_node_ref_counts(&mut self, node_id: NodeId, update: RefCountUpdate) {
257 91 : let Some(node) = self.nodes.get_mut(&node_id) else {
258 0 : debug_assert!(false);
259 0 : tracing::error!("Scheduler missing node {node_id}");
260 0 : return;
261 : };
262 :
263 91 : match update {
264 5 : RefCountUpdate::PromoteSecondary => {
265 5 : node.attached_shard_count += 1;
266 5 : }
267 24 : RefCountUpdate::Attach => {
268 24 : node.shard_count += 1;
269 24 : node.attached_shard_count += 1;
270 24 : }
271 23 : RefCountUpdate::Detach => {
272 23 : node.shard_count -= 1;
273 23 : node.attached_shard_count -= 1;
274 23 : }
275 5 : RefCountUpdate::DemoteAttached => {
276 5 : node.attached_shard_count -= 1;
277 5 : }
278 17 : RefCountUpdate::AddSecondary => {
279 17 : node.shard_count += 1;
280 17 : }
281 17 : RefCountUpdate::RemoveSecondary => {
282 17 : node.shard_count -= 1;
283 17 : }
284 : }
285 :
286 : // Maybe update PageserverUtilization
287 91 : match update {
288 : RefCountUpdate::AddSecondary | RefCountUpdate::Attach => {
289 : // Referencing the node: if this takes our shard_count above the utilzation structure's
290 : // shard count, then artifically bump it: this ensures that the scheduler immediately
291 : // recognizes that this node has more work on it, without waiting for the next heartbeat
292 : // to update the utilization.
293 41 : if let MaySchedule::Yes(utilization) = &mut node.may_schedule {
294 41 : utilization.adjust_shard_count_max(node.shard_count as u32);
295 41 : }
296 : }
297 : RefCountUpdate::PromoteSecondary
298 : | RefCountUpdate::Detach
299 : | RefCountUpdate::RemoveSecondary
300 50 : | RefCountUpdate::DemoteAttached => {
301 50 : // De-referencing the node: leave the utilization's shard_count at a stale higher
302 50 : // value until some future heartbeat after we have physically removed this shard
303 50 : // from the node: this prevents the scheduler over-optimistically trying to schedule
304 50 : // more work onto the node before earlier detaches are done.
305 50 : }
306 : }
307 91 : }
308 :
309 : // Check if the number of shards attached to a given node is lagging below
310 : // the cluster average. If that's the case, the node should be filled.
311 0 : pub(crate) fn compute_fill_requirement(&self, node_id: NodeId) -> usize {
312 0 : let Some(node) = self.nodes.get(&node_id) else {
313 0 : debug_assert!(false);
314 0 : tracing::error!("Scheduler missing node {node_id}");
315 0 : return 0;
316 : };
317 0 : assert!(!self.nodes.is_empty());
318 0 : let expected_attached_shards_per_node = self.expected_attached_shard_count();
319 :
320 0 : for (node_id, node) in self.nodes.iter() {
321 0 : tracing::trace!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
322 : }
323 :
324 0 : if node.attached_shard_count < expected_attached_shards_per_node {
325 0 : expected_attached_shards_per_node - node.attached_shard_count
326 : } else {
327 0 : 0
328 : }
329 0 : }
330 :
331 0 : pub(crate) fn expected_attached_shard_count(&self) -> usize {
332 0 : let total_attached_shards: usize =
333 0 : self.nodes.values().map(|n| n.attached_shard_count).sum();
334 0 :
335 0 : assert!(!self.nodes.is_empty());
336 0 : total_attached_shards / self.nodes.len()
337 0 : }
338 :
339 0 : pub(crate) fn nodes_by_attached_shard_count(&self) -> Vec<(NodeId, usize)> {
340 0 : self.nodes
341 0 : .iter()
342 0 : .map(|(node_id, stats)| (*node_id, stats.attached_shard_count))
343 0 : .sorted_by(|lhs, rhs| Ord::cmp(&lhs.1, &rhs.1).reverse())
344 0 : .collect()
345 0 : }
346 :
347 7 : pub(crate) fn node_upsert(&mut self, node: &Node) {
348 : use std::collections::hash_map::Entry::*;
349 7 : match self.nodes.entry(node.get_id()) {
350 3 : Occupied(mut entry) => {
351 3 : // Updates to MaySchedule are how we receive updated PageserverUtilization: adjust these values
352 3 : // to account for any shards scheduled on the controller but not yet visible to the pageserver.
353 3 : let mut may_schedule = node.may_schedule();
354 3 : match &mut may_schedule {
355 2 : MaySchedule::Yes(utilization) => {
356 2 : utilization.adjust_shard_count_max(entry.get().shard_count as u32);
357 2 : }
358 1 : MaySchedule::No => { // Nothing to tweak
359 1 : }
360 : }
361 :
362 3 : entry.get_mut().may_schedule = may_schedule;
363 : }
364 4 : Vacant(entry) => {
365 4 : entry.insert(SchedulerNode {
366 4 : shard_count: 0,
367 4 : attached_shard_count: 0,
368 4 : may_schedule: node.may_schedule(),
369 4 : });
370 4 : }
371 : }
372 7 : }
373 :
374 0 : pub(crate) fn node_remove(&mut self, node_id: NodeId) {
375 0 : if self.nodes.remove(&node_id).is_none() {
376 0 : tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
377 0 : }
378 0 : }
379 :
380 : /// Where we have several nodes to choose from, for example when picking a secondary location
381 : /// to promote to an attached location, this method may be used to pick the best choice based
382 : /// on the scheduler's knowledge of utilization and availability.
383 : ///
384 : /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
385 : /// caller can pick a node some other way.
386 14 : pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option<NodeId> {
387 14 : if nodes.is_empty() {
388 12 : return None;
389 2 : }
390 2 :
391 2 : // TODO: When the utilization score returned by the pageserver becomes meaningful,
392 2 : // schedule based on that instead of the shard count.
393 2 : let node = nodes
394 2 : .iter()
395 4 : .map(|node_id| {
396 4 : let may_schedule = self
397 4 : .nodes
398 4 : .get(node_id)
399 4 : .map(|n| !matches!(n.may_schedule, MaySchedule::No))
400 4 : .unwrap_or(false);
401 4 : (*node_id, may_schedule)
402 4 : })
403 4 : .max_by_key(|(_n, may_schedule)| *may_schedule);
404 2 :
405 2 : // If even the preferred node has may_schedule==false, return None
406 2 : node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
407 14 : }
408 :
409 : /// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
410 : /// are already in use by this shard -- we use this to avoid picking the same node
411 : /// as both attached and secondary location. This is a hard constraint: if we cannot
412 : /// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
413 : ///
414 : /// context: we prefer to avoid using nodes identified in the context, according
415 : /// to their anti-affinity score. We use this to prefeer to avoid placing shards in
416 : /// the same tenant on the same node. This is a soft constraint: the context will never
417 : /// cause us to fail to schedule a shard.
418 38 : pub(crate) fn schedule_shard(
419 38 : &mut self,
420 38 : hard_exclude: &[NodeId],
421 38 : context: &ScheduleContext,
422 38 : ) -> Result<NodeId, ScheduleError> {
423 38 : if self.nodes.is_empty() {
424 0 : return Err(ScheduleError::NoPageservers);
425 38 : }
426 38 :
427 38 : let mut scores: Vec<(NodeId, AffinityScore, u64, usize)> = self
428 38 : .nodes
429 38 : .iter_mut()
430 115 : .filter_map(|(k, v)| match &mut v.may_schedule {
431 0 : MaySchedule::No => None,
432 115 : MaySchedule::Yes(_) if hard_exclude.contains(k) => None,
433 84 : MaySchedule::Yes(utilization) => Some((
434 84 : *k,
435 84 : context.nodes.get(k).copied().unwrap_or(AffinityScore::FREE),
436 84 : utilization.cached_score(),
437 84 : v.attached_shard_count,
438 84 : )),
439 115 : })
440 38 : .collect();
441 38 :
442 38 : // Exclude nodes whose utilization is critically high, if there are alternatives available. This will
443 38 : // cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example
444 38 : // we may place shards in the same tenant together on the same pageserver if all other pageservers are
445 38 : // overloaded.
446 38 : let non_overloaded_scores = scores
447 38 : .iter()
448 84 : .filter(|i| !PageserverUtilization::is_overloaded(i.2))
449 38 : .copied()
450 38 : .collect::<Vec<_>>();
451 38 : if !non_overloaded_scores.is_empty() {
452 38 : scores = non_overloaded_scores;
453 38 : }
454 :
455 : // Sort by, in order of precedence:
456 : // 1st: Affinity score. We should never pick a higher-score node if a lower-score node is available
457 : // 2nd: Utilization score (this combines shard count and disk utilization)
458 : // 3rd: Attached shard count. When nodes have identical utilization (e.g. when populating some
459 : // empty nodes), this acts as an anti-affinity between attached shards.
460 : // 4th: Node ID. This is a convenience to make selection deterministic in tests and empty systems.
461 98 : scores.sort_by_key(|i| (i.1, i.2, i.3, i.0));
462 38 :
463 38 : if scores.is_empty() {
464 : // After applying constraints, no pageservers were left.
465 0 : if !matches!(context.mode, ScheduleMode::Speculative) {
466 : // If this was not a speculative attempt, log details to understand why we couldn't
467 : // schedule: this may help an engineer understand if some nodes are marked offline
468 : // in a way that's preventing progress.
469 0 : tracing::info!(
470 0 : "Scheduling failure, while excluding {hard_exclude:?}, node states:"
471 : );
472 0 : for (node_id, node) in &self.nodes {
473 0 : tracing::info!(
474 0 : "Node {node_id}: may_schedule={} shards={}",
475 0 : !matches!(node.may_schedule, MaySchedule::No),
476 : node.shard_count
477 : );
478 : }
479 0 : }
480 0 : return Err(ScheduleError::ImpossibleConstraint);
481 38 : }
482 38 :
483 38 : // Lowest score wins
484 38 : let node_id = scores.first().unwrap().0;
485 :
486 38 : if !matches!(context.mode, ScheduleMode::Speculative) {
487 38 : tracing::info!(
488 0 : "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
489 0 : scores.iter().map(|i| i.0 .0).collect::<Vec<_>>()
490 : );
491 0 : }
492 :
493 : // Note that we do not update shard count here to reflect the scheduling: that
494 : // is IntentState's job when the scheduled location is used.
495 :
496 38 : Ok(node_id)
497 38 : }
498 :
499 : /// Unit test access to internal state
500 : #[cfg(test)]
501 12 : pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
502 12 : self.nodes.get(&node_id).unwrap().shard_count
503 12 : }
504 :
505 : #[cfg(test)]
506 12 : pub(crate) fn get_node_attached_shard_count(&self, node_id: NodeId) -> usize {
507 12 : self.nodes.get(&node_id).unwrap().attached_shard_count
508 12 : }
509 : }
510 :
511 : #[cfg(test)]
512 : pub(crate) mod test_utils {
513 :
514 : use crate::node::Node;
515 : use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
516 : use std::collections::HashMap;
517 : use utils::id::NodeId;
518 : /// Test helper: synthesize the requested number of nodes, all in active state.
519 : ///
520 : /// Node IDs start at one.
521 8 : pub(crate) fn make_test_nodes(n: u64) -> HashMap<NodeId, Node> {
522 8 : (1..n + 1)
523 25 : .map(|i| {
524 25 : (NodeId(i), {
525 25 : let mut node = Node::new(
526 25 : NodeId(i),
527 25 : format!("httphost-{i}"),
528 25 : 80 + i as u16,
529 25 : format!("pghost-{i}"),
530 25 : 5432 + i as u16,
531 25 : "test-az".to_string(),
532 25 : );
533 25 : node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
534 25 : assert!(node.is_available());
535 25 : node
536 25 : })
537 25 : })
538 8 : .collect()
539 8 : }
540 : }
541 :
542 : #[cfg(test)]
543 : mod tests {
544 : use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
545 :
546 : use super::*;
547 :
548 : use crate::tenant_shard::IntentState;
549 : #[test]
550 1 : fn scheduler_basic() -> anyhow::Result<()> {
551 1 : let nodes = test_utils::make_test_nodes(2);
552 1 :
553 1 : let mut scheduler = Scheduler::new(nodes.values());
554 1 : let mut t1_intent = IntentState::new();
555 1 : let mut t2_intent = IntentState::new();
556 1 :
557 1 : let context = ScheduleContext::default();
558 :
559 1 : let scheduled = scheduler.schedule_shard(&[], &context)?;
560 1 : t1_intent.set_attached(&mut scheduler, Some(scheduled));
561 1 : let scheduled = scheduler.schedule_shard(&[], &context)?;
562 1 : t2_intent.set_attached(&mut scheduler, Some(scheduled));
563 1 :
564 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
565 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
566 :
567 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
568 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
569 :
570 1 : let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers(), &context)?;
571 1 : t1_intent.push_secondary(&mut scheduler, scheduled);
572 1 :
573 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
574 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
575 :
576 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
577 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
578 :
579 1 : t1_intent.clear(&mut scheduler);
580 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
581 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
582 :
583 1 : let total_attached = scheduler.get_node_attached_shard_count(NodeId(1))
584 1 : + scheduler.get_node_attached_shard_count(NodeId(2));
585 1 : assert_eq!(total_attached, 1);
586 :
587 1 : if cfg!(debug_assertions) {
588 : // Dropping an IntentState without clearing it causes a panic in debug mode,
589 : // because we have failed to properly update scheduler shard counts.
590 1 : let result = std::panic::catch_unwind(move || {
591 1 : drop(t2_intent);
592 1 : });
593 1 : assert!(result.is_err());
594 : } else {
595 0 : t2_intent.clear(&mut scheduler);
596 0 :
597 0 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
598 0 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 0);
599 :
600 0 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 0);
601 0 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0);
602 : }
603 :
604 1 : Ok(())
605 1 : }
606 :
607 : #[test]
608 : /// Test the PageserverUtilization's contribution to scheduling algorithm
609 1 : fn scheduler_utilization() {
610 1 : let mut nodes = test_utils::make_test_nodes(3);
611 1 : let mut scheduler = Scheduler::new(nodes.values());
612 1 :
613 1 : // Need to keep these alive because they contribute to shard counts via RAII
614 1 : let mut scheduled_intents = Vec::new();
615 1 :
616 1 : let empty_context = ScheduleContext::default();
617 :
618 11 : fn assert_scheduler_chooses(
619 11 : expect_node: NodeId,
620 11 : scheduled_intents: &mut Vec<IntentState>,
621 11 : scheduler: &mut Scheduler,
622 11 : context: &ScheduleContext,
623 11 : ) {
624 11 : let scheduled = scheduler.schedule_shard(&[], context).unwrap();
625 11 : let mut intent = IntentState::new();
626 11 : intent.set_attached(scheduler, Some(scheduled));
627 11 : scheduled_intents.push(intent);
628 11 : assert_eq!(scheduled, expect_node);
629 11 : }
630 :
631 : // Independent schedule calls onto empty nodes should round-robin, because each node's
632 : // utilization's shard count is updated inline. The order is determinsitic because when all other factors are
633 : // equal, we order by node ID.
634 1 : assert_scheduler_chooses(
635 1 : NodeId(1),
636 1 : &mut scheduled_intents,
637 1 : &mut scheduler,
638 1 : &empty_context,
639 1 : );
640 1 : assert_scheduler_chooses(
641 1 : NodeId(2),
642 1 : &mut scheduled_intents,
643 1 : &mut scheduler,
644 1 : &empty_context,
645 1 : );
646 1 : assert_scheduler_chooses(
647 1 : NodeId(3),
648 1 : &mut scheduled_intents,
649 1 : &mut scheduler,
650 1 : &empty_context,
651 1 : );
652 1 :
653 1 : // Manually setting utilization higher should cause schedule calls to round-robin the other nodes
654 1 : // which have equal utilization.
655 1 : nodes
656 1 : .get_mut(&NodeId(1))
657 1 : .unwrap()
658 1 : .set_availability(NodeAvailability::Active(test_utilization::simple(
659 1 : 10,
660 1 : 1024 * 1024 * 1024,
661 1 : )));
662 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
663 1 :
664 1 : assert_scheduler_chooses(
665 1 : NodeId(2),
666 1 : &mut scheduled_intents,
667 1 : &mut scheduler,
668 1 : &empty_context,
669 1 : );
670 1 : assert_scheduler_chooses(
671 1 : NodeId(3),
672 1 : &mut scheduled_intents,
673 1 : &mut scheduler,
674 1 : &empty_context,
675 1 : );
676 1 : assert_scheduler_chooses(
677 1 : NodeId(2),
678 1 : &mut scheduled_intents,
679 1 : &mut scheduler,
680 1 : &empty_context,
681 1 : );
682 1 : assert_scheduler_chooses(
683 1 : NodeId(3),
684 1 : &mut scheduled_intents,
685 1 : &mut scheduler,
686 1 : &empty_context,
687 1 : );
688 1 :
689 1 : // The scheduler should prefer nodes with lower affinity score,
690 1 : // even if they have higher utilization (as long as they aren't utilized at >100%)
691 1 : let mut context_prefer_node1 = ScheduleContext::default();
692 1 : context_prefer_node1.avoid(&[NodeId(2), NodeId(3)]);
693 1 : assert_scheduler_chooses(
694 1 : NodeId(1),
695 1 : &mut scheduled_intents,
696 1 : &mut scheduler,
697 1 : &context_prefer_node1,
698 1 : );
699 1 : assert_scheduler_chooses(
700 1 : NodeId(1),
701 1 : &mut scheduled_intents,
702 1 : &mut scheduler,
703 1 : &context_prefer_node1,
704 1 : );
705 1 :
706 1 : // If a node is over-utilized, it will not be used even if affinity scores prefer it
707 1 : nodes
708 1 : .get_mut(&NodeId(1))
709 1 : .unwrap()
710 1 : .set_availability(NodeAvailability::Active(test_utilization::simple(
711 1 : 20000,
712 1 : 1024 * 1024 * 1024,
713 1 : )));
714 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
715 1 : assert_scheduler_chooses(
716 1 : NodeId(2),
717 1 : &mut scheduled_intents,
718 1 : &mut scheduler,
719 1 : &context_prefer_node1,
720 1 : );
721 1 : assert_scheduler_chooses(
722 1 : NodeId(3),
723 1 : &mut scheduled_intents,
724 1 : &mut scheduler,
725 1 : &context_prefer_node1,
726 1 : );
727 :
728 12 : for mut intent in scheduled_intents {
729 11 : intent.clear(&mut scheduler);
730 11 : }
731 1 : }
732 : }
|