Line data Source code
1 : use crate::{node::Node, tenant_shard::TenantShard};
2 : use pageserver_api::controller_api::UtilizationScore;
3 : use serde::Serialize;
4 : use std::collections::HashMap;
5 : use utils::{http::error::ApiError, id::NodeId};
6 :
7 : /// Scenarios in which we cannot find a suitable location for a tenant shard
8 0 : #[derive(thiserror::Error, Debug)]
9 : pub enum ScheduleError {
10 : #[error("No pageservers found")]
11 : NoPageservers,
12 : #[error("No pageserver found matching constraint")]
13 : ImpossibleConstraint,
14 : }
15 :
16 : impl From<ScheduleError> for ApiError {
17 0 : fn from(value: ScheduleError) -> Self {
18 0 : ApiError::Conflict(format!("Scheduling error: {}", value))
19 0 : }
20 : }
21 :
22 : #[derive(Serialize, Eq, PartialEq)]
23 : pub enum MaySchedule {
24 : Yes(UtilizationScore),
25 : No,
26 : }
27 :
28 : #[derive(Serialize)]
29 : struct SchedulerNode {
30 : /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
31 : shard_count: usize,
32 :
33 : /// Whether this node is currently elegible to have new shards scheduled (this is derived
34 : /// from a node's availability state and scheduling policy).
35 : may_schedule: MaySchedule,
36 : }
37 :
38 : impl PartialEq for SchedulerNode {
39 6 : fn eq(&self, other: &Self) -> bool {
40 6 : let may_schedule_matches = matches!(
41 6 : (&self.may_schedule, &other.may_schedule),
42 : (MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
43 : );
44 :
45 6 : may_schedule_matches && self.shard_count == other.shard_count
46 6 : }
47 : }
48 :
49 : impl Eq for SchedulerNode {}
50 :
51 : /// This type is responsible for selecting which node is used when a tenant shard needs to choose a pageserver
52 : /// on which to run.
53 : ///
54 : /// The type has no persistent state of its own: this is all populated at startup. The Serialize
55 : /// impl is only for debug dumps.
56 : #[derive(Serialize)]
57 : pub(crate) struct Scheduler {
58 : nodes: HashMap<NodeId, SchedulerNode>,
59 : }
60 :
61 : /// Score for soft constraint scheduling: lower scores are preferred to higher scores.
62 : ///
63 : /// For example, we may set an affinity score based on the number of shards from the same
64 : /// tenant already on a node, to implicitly prefer to balance out shards.
65 : #[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
66 : pub(crate) struct AffinityScore(pub(crate) usize);
67 :
68 : impl AffinityScore {
69 : /// If we have no anti-affinity at all toward a node, this is its score. It means
70 : /// the scheduler has a free choice amongst nodes with this score, and may pick a node
71 : /// based on other information such as total utilization.
72 : pub(crate) const FREE: Self = Self(0);
73 :
74 188 : pub(crate) fn inc(&mut self) {
75 188 : self.0 += 1;
76 188 : }
77 : }
78 :
79 : impl std::ops::Add for AffinityScore {
80 : type Output = Self;
81 :
82 54 : fn add(self, rhs: Self) -> Self::Output {
83 54 : Self(self.0 + rhs.0)
84 54 : }
85 : }
86 :
87 : // For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
88 : // it for many shards in the same tenant.
89 : #[derive(Debug, Default)]
90 : pub(crate) struct ScheduleContext {
91 : /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
92 : pub(crate) nodes: HashMap<NodeId, AffinityScore>,
93 :
94 : /// Specifically how many _attached_ locations are on each node
95 : pub(crate) attached_nodes: HashMap<NodeId, usize>,
96 : }
97 :
98 : impl ScheduleContext {
99 : /// Input is a list of nodes we would like to avoid using again within this context. The more
100 : /// times a node is passed into this call, the less inclined we are to use it.
101 96 : pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
102 284 : for node_id in nodes {
103 188 : let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
104 188 : entry.inc()
105 : }
106 96 : }
107 :
108 94 : pub(crate) fn push_attached(&mut self, node_id: NodeId) {
109 94 : let entry = self.attached_nodes.entry(node_id).or_default();
110 94 : *entry += 1;
111 94 : }
112 :
113 120 : pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
114 120 : self.nodes
115 120 : .get(&node_id)
116 120 : .copied()
117 120 : .unwrap_or(AffinityScore::FREE)
118 120 : }
119 :
120 120 : pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
121 120 : self.attached_nodes.get(&node_id).copied().unwrap_or(0)
122 120 : }
123 : }
124 :
125 : impl Scheduler {
126 14 : pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
127 14 : let mut scheduler_nodes = HashMap::new();
128 50 : for node in nodes {
129 36 : scheduler_nodes.insert(
130 36 : node.get_id(),
131 36 : SchedulerNode {
132 36 : shard_count: 0,
133 36 : may_schedule: node.may_schedule(),
134 36 : },
135 36 : );
136 36 : }
137 :
138 14 : Self {
139 14 : nodes: scheduler_nodes,
140 14 : }
141 14 : }
142 :
143 : /// For debug/support: check that our internal statistics are in sync with the state of
144 : /// the nodes & tenant shards.
145 : ///
146 : /// If anything is inconsistent, log details and return an error.
147 2 : pub(crate) fn consistency_check<'a>(
148 2 : &self,
149 2 : nodes: impl Iterator<Item = &'a Node>,
150 2 : shards: impl Iterator<Item = &'a TenantShard>,
151 2 : ) -> anyhow::Result<()> {
152 2 : let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
153 8 : for node in nodes {
154 6 : expect_nodes.insert(
155 6 : node.get_id(),
156 6 : SchedulerNode {
157 6 : shard_count: 0,
158 6 : may_schedule: node.may_schedule(),
159 6 : },
160 6 : );
161 6 : }
162 :
163 4 : for shard in shards {
164 2 : if let Some(node_id) = shard.intent.get_attached() {
165 2 : match expect_nodes.get_mut(node_id) {
166 2 : Some(node) => node.shard_count += 1,
167 0 : None => anyhow::bail!(
168 0 : "Tenant {} references nonexistent node {}",
169 0 : shard.tenant_shard_id,
170 0 : node_id
171 0 : ),
172 : }
173 0 : }
174 :
175 2 : for node_id in shard.intent.get_secondary() {
176 2 : match expect_nodes.get_mut(node_id) {
177 2 : Some(node) => node.shard_count += 1,
178 0 : None => anyhow::bail!(
179 0 : "Tenant {} references nonexistent node {}",
180 0 : shard.tenant_shard_id,
181 0 : node_id
182 0 : ),
183 : }
184 : }
185 : }
186 :
187 8 : for (node_id, expect_node) in &expect_nodes {
188 6 : let Some(self_node) = self.nodes.get(node_id) else {
189 0 : anyhow::bail!("Node {node_id} not found in Self")
190 : };
191 :
192 6 : if self_node != expect_node {
193 0 : tracing::error!("Inconsistency detected in scheduling state for node {node_id}");
194 0 : tracing::error!("Expected state: {}", serde_json::to_string(expect_node)?);
195 0 : tracing::error!("Self state: {}", serde_json::to_string(self_node)?);
196 :
197 0 : anyhow::bail!("Inconsistent state on {node_id}");
198 6 : }
199 : }
200 :
201 2 : if expect_nodes.len() != self.nodes.len() {
202 : // We just checked that all the expected nodes are present. If the lengths don't match,
203 : // it means that we have nodes in Self that are unexpected.
204 0 : for node_id in self.nodes.keys() {
205 0 : if !expect_nodes.contains_key(node_id) {
206 0 : anyhow::bail!("Node {node_id} found in Self but not in expected nodes");
207 0 : }
208 : }
209 2 : }
210 :
211 2 : Ok(())
212 2 : }
213 :
214 : /// Increment the reference count of a node. This reference count is used to guide scheduling
215 : /// decisions, not for memory management: it represents one tenant shard whose IntentState targets
216 : /// this node.
217 : ///
218 : /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
219 : /// [`Self::new`] or [`Self::node_upsert`])
220 60 : pub(crate) fn node_inc_ref(&mut self, node_id: NodeId) {
221 60 : let Some(node) = self.nodes.get_mut(&node_id) else {
222 0 : tracing::error!("Scheduler missing node {node_id}");
223 0 : debug_assert!(false);
224 0 : return;
225 : };
226 :
227 60 : node.shard_count += 1;
228 60 : }
229 :
230 : /// Decrement a node's reference count. Inverse of [`Self::node_inc_ref`].
231 58 : pub(crate) fn node_dec_ref(&mut self, node_id: NodeId) {
232 58 : let Some(node) = self.nodes.get_mut(&node_id) else {
233 0 : debug_assert!(false);
234 0 : tracing::error!("Scheduler missing node {node_id}");
235 0 : return;
236 : };
237 :
238 58 : node.shard_count -= 1;
239 58 : }
240 :
241 10 : pub(crate) fn node_upsert(&mut self, node: &Node) {
242 10 : use std::collections::hash_map::Entry::*;
243 10 : match self.nodes.entry(node.get_id()) {
244 2 : Occupied(mut entry) => {
245 2 : entry.get_mut().may_schedule = node.may_schedule();
246 2 : }
247 8 : Vacant(entry) => {
248 8 : entry.insert(SchedulerNode {
249 8 : shard_count: 0,
250 8 : may_schedule: node.may_schedule(),
251 8 : });
252 8 : }
253 : }
254 10 : }
255 :
256 0 : pub(crate) fn node_remove(&mut self, node_id: NodeId) {
257 0 : if self.nodes.remove(&node_id).is_none() {
258 0 : tracing::warn!(node_id=%node_id, "Removed non-existent node from scheduler");
259 0 : }
260 0 : }
261 :
262 : /// Where we have several nodes to choose from, for example when picking a secondary location
263 : /// to promote to an attached location, this method may be used to pick the best choice based
264 : /// on the scheduler's knowledge of utilization and availability.
265 : ///
266 : /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
267 : /// caller can pick a node some other way.
268 28 : pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option<NodeId> {
269 28 : if nodes.is_empty() {
270 24 : return None;
271 4 : }
272 4 :
273 4 : // TODO: When the utilization score returned by the pageserver becomes meaningful,
274 4 : // schedule based on that instead of the shard count.
275 4 : let node = nodes
276 4 : .iter()
277 8 : .map(|node_id| {
278 8 : let may_schedule = self
279 8 : .nodes
280 8 : .get(node_id)
281 8 : .map(|n| n.may_schedule != MaySchedule::No)
282 8 : .unwrap_or(false);
283 8 : (*node_id, may_schedule)
284 8 : })
285 8 : .max_by_key(|(_n, may_schedule)| *may_schedule);
286 4 :
287 4 : // If even the preferred node has may_schedule==false, return None
288 4 : node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
289 28 : }
290 :
291 : /// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
292 : /// are already in use by this shard -- we use this to avoid picking the same node
293 : /// as both attached and secondary location. This is a hard constraint: if we cannot
294 : /// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
295 : ///
296 : /// context: we prefer to avoid using nodes identified in the context, according
297 : /// to their anti-affinity score. We use this to prefeer to avoid placing shards in
298 : /// the same tenant on the same node. This is a soft constraint: the context will never
299 : /// cause us to fail to schedule a shard.
300 60 : pub(crate) fn schedule_shard(
301 60 : &self,
302 60 : hard_exclude: &[NodeId],
303 60 : context: &ScheduleContext,
304 60 : ) -> Result<NodeId, ScheduleError> {
305 60 : if self.nodes.is_empty() {
306 0 : return Err(ScheduleError::NoPageservers);
307 60 : }
308 60 :
309 60 : let mut scores: Vec<(NodeId, AffinityScore, usize)> = self
310 60 : .nodes
311 60 : .iter()
312 188 : .filter_map(|(k, v)| {
313 188 : if hard_exclude.contains(k) || v.may_schedule == MaySchedule::No {
314 74 : None
315 : } else {
316 114 : Some((
317 114 : *k,
318 114 : context.nodes.get(k).copied().unwrap_or(AffinityScore::FREE),
319 114 : v.shard_count,
320 114 : ))
321 : }
322 188 : })
323 60 : .collect();
324 60 :
325 60 : // Sort by, in order of precedence:
326 60 : // 1st: Affinity score. We should never pick a higher-score node if a lower-score node is available
327 60 : // 2nd: Utilization. Within nodes with the same affinity, use the least loaded nodes.
328 60 : // 3rd: Node ID. This is a convenience to make selection deterministic in tests and empty systems.
329 114 : scores.sort_by_key(|i| (i.1, i.2, i.0));
330 60 :
331 60 : if scores.is_empty() {
332 : // After applying constraints, no pageservers were left. We log some detail about
333 : // the state of nodes to help understand why this happened. This is not logged as an error because
334 : // it is legitimately possible for enough nodes to be Offline to prevent scheduling a shard.
335 0 : tracing::info!("Scheduling failure, while excluding {hard_exclude:?}, node states:");
336 0 : for (node_id, node) in &self.nodes {
337 0 : tracing::info!(
338 0 : "Node {node_id}: may_schedule={} shards={}",
339 0 : node.may_schedule != MaySchedule::No,
340 0 : node.shard_count
341 0 : );
342 : }
343 :
344 0 : return Err(ScheduleError::ImpossibleConstraint);
345 60 : }
346 60 :
347 60 : // Lowest score wins
348 60 : let node_id = scores.first().unwrap().0;
349 60 : tracing::info!(
350 0 : "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
351 0 : scores.iter().map(|i| i.0 .0).collect::<Vec<_>>()
352 0 : );
353 :
354 : // Note that we do not update shard count here to reflect the scheduling: that
355 : // is IntentState's job when the scheduled location is used.
356 :
357 60 : Ok(node_id)
358 60 : }
359 :
360 : /// Unit test access to internal state
361 : #[cfg(test)]
362 12 : pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
363 12 : self.nodes.get(&node_id).unwrap().shard_count
364 12 : }
365 : }
366 :
367 : #[cfg(test)]
368 : pub(crate) mod test_utils {
369 :
370 : use crate::node::Node;
371 : use pageserver_api::controller_api::{NodeAvailability, UtilizationScore};
372 : use std::collections::HashMap;
373 : use utils::id::NodeId;
374 : /// Test helper: synthesize the requested number of nodes, all in active state.
375 : ///
376 : /// Node IDs start at one.
377 14 : pub(crate) fn make_test_nodes(n: u64) -> HashMap<NodeId, Node> {
378 14 : (1..n + 1)
379 44 : .map(|i| {
380 44 : (NodeId(i), {
381 44 : let mut node = Node::new(
382 44 : NodeId(i),
383 44 : format!("httphost-{i}"),
384 44 : 80 + i as u16,
385 44 : format!("pghost-{i}"),
386 44 : 5432 + i as u16,
387 44 : );
388 44 : node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
389 44 : assert!(node.is_available());
390 44 : node
391 44 : })
392 44 : })
393 14 : .collect()
394 14 : }
395 : }
396 :
397 : #[cfg(test)]
398 : mod tests {
399 : use super::*;
400 :
401 : use crate::tenant_shard::IntentState;
402 : #[test]
403 2 : fn scheduler_basic() -> anyhow::Result<()> {
404 2 : let nodes = test_utils::make_test_nodes(2);
405 2 :
406 2 : let mut scheduler = Scheduler::new(nodes.values());
407 2 : let mut t1_intent = IntentState::new();
408 2 : let mut t2_intent = IntentState::new();
409 2 :
410 2 : let context = ScheduleContext::default();
411 :
412 2 : let scheduled = scheduler.schedule_shard(&[], &context)?;
413 2 : t1_intent.set_attached(&mut scheduler, Some(scheduled));
414 2 : let scheduled = scheduler.schedule_shard(&[], &context)?;
415 2 : t2_intent.set_attached(&mut scheduler, Some(scheduled));
416 2 :
417 2 : assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
418 2 : assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
419 :
420 2 : let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers(), &context)?;
421 2 : t1_intent.push_secondary(&mut scheduler, scheduled);
422 2 :
423 2 : assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
424 2 : assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 2);
425 :
426 2 : t1_intent.clear(&mut scheduler);
427 2 : assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
428 2 : assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
429 :
430 2 : if cfg!(debug_assertions) {
431 : // Dropping an IntentState without clearing it causes a panic in debug mode,
432 : // because we have failed to properly update scheduler shard counts.
433 2 : let result = std::panic::catch_unwind(move || {
434 2 : drop(t2_intent);
435 2 : });
436 2 : assert!(result.is_err());
437 : } else {
438 0 : t2_intent.clear(&mut scheduler);
439 0 : assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
440 0 : assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 0);
441 : }
442 :
443 2 : Ok(())
444 2 : }
445 : }
|