Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : sync::Arc,
4 : time::Duration,
5 : };
6 :
7 : use crate::{
8 : metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
9 : persistence::TenantShardPersistence,
10 : reconciler::{ReconcileUnits, ReconcilerConfig},
11 : scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext},
12 : service::ReconcileResultRequest,
13 : };
14 : use pageserver_api::controller_api::{
15 : NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
16 : };
17 : use pageserver_api::{
18 : models::{LocationConfig, LocationConfigMode, TenantConfig},
19 : shard::{ShardIdentity, TenantShardId},
20 : };
21 : use serde::{Deserialize, Serialize};
22 : use tokio::task::JoinHandle;
23 : use tokio_util::sync::CancellationToken;
24 : use tracing::{instrument, Instrument};
25 : use utils::{
26 : generation::Generation,
27 : id::NodeId,
28 : seqwait::{SeqWait, SeqWaitError},
29 : sync::gate::GateGuard,
30 : };
31 :
32 : use crate::{
33 : compute_hook::ComputeHook,
34 : node::Node,
35 : persistence::{split_state::SplitState, Persistence},
36 : reconciler::{
37 : attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
38 : },
39 : scheduler::{ScheduleError, Scheduler},
40 : service, Sequence,
41 : };
42 :
43 : /// Serialization helper
44 0 : fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
45 0 : where
46 0 : S: serde::ser::Serializer,
47 0 : T: std::fmt::Display,
48 0 : {
49 0 : serializer.collect_str(
50 0 : &v.lock()
51 0 : .unwrap()
52 0 : .as_ref()
53 0 : .map(|e| format!("{e}"))
54 0 : .unwrap_or("".to_string()),
55 0 : )
56 0 : }
57 :
58 : /// In-memory state for a particular tenant shard.
59 : ///
60 : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
61 : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
62 0 : #[derive(Serialize)]
63 : pub(crate) struct TenantShard {
64 : pub(crate) tenant_shard_id: TenantShardId,
65 :
66 : pub(crate) shard: ShardIdentity,
67 :
68 : // Runtime only: sequence used to coordinate when updating this object while
69 : // with background reconcilers may be running. A reconciler runs to a particular
70 : // sequence.
71 : pub(crate) sequence: Sequence,
72 :
73 : // Latest generation number: next time we attach, increment this
74 : // and use the incremented number when attaching.
75 : //
76 : // None represents an incompletely onboarded tenant via the [`Service::location_config`]
77 : // API, where this tenant may only run in PlacementPolicy::Secondary.
78 : pub(crate) generation: Option<Generation>,
79 :
80 : // High level description of how the tenant should be set up. Provided
81 : // externally.
82 : pub(crate) policy: PlacementPolicy,
83 :
84 : // Low level description of exactly which pageservers should fulfil
85 : // which role. Generated by `Self::schedule`.
86 : pub(crate) intent: IntentState,
87 :
88 : // Low level description of how the tenant is configured on pageservers:
89 : // if this does not match `Self::intent` then the tenant needs reconciliation
90 : // with `Self::reconcile`.
91 : pub(crate) observed: ObservedState,
92 :
93 : // Tenant configuration, passed through opaquely to the pageserver. Identical
94 : // for all shards in a tenant.
95 : pub(crate) config: TenantConfig,
96 :
97 : /// If a reconcile task is currently in flight, it may be joined here (it is
98 : /// only safe to join if either the result has been received or the reconciler's
99 : /// cancellation token has been fired)
100 : #[serde(skip)]
101 : pub(crate) reconciler: Option<ReconcilerHandle>,
102 :
103 : /// If a tenant is being split, then all shards with that TenantId will have a
104 : /// SplitState set, this acts as a guard against other operations such as background
105 : /// reconciliation, and timeline creation.
106 : pub(crate) splitting: SplitState,
107 :
108 : /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
109 : /// is set. This flag is cleared when the tenant is popped off the delay queue.
110 : pub(crate) delayed_reconcile: bool,
111 :
112 : /// Optionally wait for reconciliation to complete up to a particular
113 : /// sequence number.
114 : #[serde(skip)]
115 : pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
116 :
117 : /// Indicates sequence number for which we have encountered an error reconciling. If
118 : /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
119 : /// and callers should stop waiting for `waiter` and propagate the error.
120 : #[serde(skip)]
121 : pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
122 :
123 : /// The most recent error from a reconcile on this tenant. This is a nested Arc
124 : /// because:
125 : /// - ReconcileWaiters need to Arc-clone the overall object to read it later
126 : /// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
127 : /// many waiters for one shard, and the underlying error types are not Clone.
128 : ///
129 : /// TODO: generalize to an array of recent events
130 : /// TOOD: use a ArcSwap instead of mutex for faster reads?
131 : #[serde(serialize_with = "read_last_error")]
132 : pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
133 :
134 : /// If we have a pending compute notification that for some reason we weren't able to send,
135 : /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
136 : /// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
137 : /// of state that we publish externally in an eventually consistent way.
138 : pub(crate) pending_compute_notification: bool,
139 :
140 : // Support/debug tool: if something is going wrong or flapping with scheduling, this may
141 : // be set to a non-active state to avoid making changes while the issue is fixed.
142 : scheduling_policy: ShardSchedulingPolicy,
143 :
144 : // We should attempt to schedule this shard in the provided AZ to
145 : // decrease chances of cross-AZ compute.
146 : preferred_az_id: Option<String>,
147 : }
148 :
149 : #[derive(Default, Clone, Debug, Serialize)]
150 : pub(crate) struct IntentState {
151 : attached: Option<NodeId>,
152 : secondary: Vec<NodeId>,
153 : }
154 :
155 : impl IntentState {
156 13 : pub(crate) fn new() -> Self {
157 13 : Self {
158 13 : attached: None,
159 13 : secondary: vec![],
160 13 : }
161 13 : }
162 0 : pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
163 0 : if let Some(node_id) = node_id {
164 0 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach);
165 0 : }
166 0 : Self {
167 0 : attached: node_id,
168 0 : secondary: vec![],
169 0 : }
170 0 : }
171 :
172 24 : pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
173 24 : if self.attached != new_attached {
174 24 : if let Some(old_attached) = self.attached.take() {
175 0 : scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
176 24 : }
177 24 : if let Some(new_attached) = &new_attached {
178 24 : scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach);
179 24 : }
180 24 : self.attached = new_attached;
181 0 : }
182 24 : }
183 :
184 : /// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
185 : /// secondary to attached while maintaining the scheduler's reference counts.
186 5 : pub(crate) fn promote_attached(
187 5 : &mut self,
188 5 : scheduler: &mut Scheduler,
189 5 : promote_secondary: NodeId,
190 5 : ) {
191 5 : // If we call this with a node that isn't in secondary, it would cause incorrect
192 5 : // scheduler reference counting, since we assume the node is already referenced as a secondary.
193 5 : debug_assert!(self.secondary.contains(&promote_secondary));
194 :
195 10 : self.secondary.retain(|n| n != &promote_secondary);
196 5 :
197 5 : let demoted = self.attached;
198 5 : self.attached = Some(promote_secondary);
199 5 :
200 5 : scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary);
201 5 : if let Some(demoted) = demoted {
202 0 : scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached);
203 5 : }
204 5 : }
205 :
206 17 : pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
207 17 : debug_assert!(!self.secondary.contains(&new_secondary));
208 17 : scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary);
209 17 : self.secondary.push(new_secondary);
210 17 : }
211 :
212 : /// It is legal to call this with a node that is not currently a secondary: that is a no-op
213 5 : pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
214 5 : let index = self.secondary.iter().position(|n| *n == node_id);
215 5 : if let Some(index) = index {
216 5 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
217 5 : self.secondary.remove(index);
218 5 : }
219 5 : }
220 :
221 23 : pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
222 23 : for secondary in self.secondary.drain(..) {
223 12 : scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary);
224 12 : }
225 23 : }
226 :
227 : /// Remove the last secondary node from the list of secondaries
228 0 : pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
229 0 : if let Some(node_id) = self.secondary.pop() {
230 0 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
231 0 : }
232 0 : }
233 :
234 23 : pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
235 23 : if let Some(old_attached) = self.attached.take() {
236 23 : scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
237 23 : }
238 :
239 23 : self.clear_secondary(scheduler);
240 23 : }
241 :
242 70 : pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
243 70 : let mut result = Vec::new();
244 70 : if let Some(p) = self.attached {
245 68 : result.push(p)
246 2 : }
247 :
248 70 : result.extend(self.secondary.iter().copied());
249 70 :
250 70 : result
251 70 : }
252 :
253 59 : pub(crate) fn get_attached(&self) -> &Option<NodeId> {
254 59 : &self.attached
255 59 : }
256 :
257 16 : pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
258 16 : &self.secondary
259 16 : }
260 :
261 : /// If the node is in use as the attached location, demote it into
262 : /// the list of secondary locations. This is used when a node goes offline,
263 : /// and we want to use a different node for attachment, but not permanently
264 : /// forget the location on the offline node.
265 : ///
266 : /// Returns true if a change was made
267 5 : pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
268 5 : if self.attached == Some(node_id) {
269 5 : self.attached = None;
270 5 : self.secondary.push(node_id);
271 5 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached);
272 5 : true
273 : } else {
274 0 : false
275 : }
276 5 : }
277 : }
278 :
279 : impl Drop for IntentState {
280 24 : fn drop(&mut self) {
281 24 : // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
282 24 : // We do not check this while panicking, to avoid polluting unit test failures or
283 24 : // other assertions with this assertion's output. It's still wrong to leak these,
284 24 : // but if we already have a panic then we don't need to independently flag this case.
285 24 : if !(std::thread::panicking()) {
286 24 : debug_assert!(self.attached.is_none() && self.secondary.is_empty());
287 0 : }
288 23 : }
289 : }
290 :
291 0 : #[derive(Default, Clone, Serialize, Deserialize, Debug)]
292 : pub(crate) struct ObservedState {
293 : pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
294 : }
295 :
296 : /// Our latest knowledge of how this tenant is configured in the outside world.
297 : ///
298 : /// Meaning:
299 : /// * No instance of this type exists for a node: we are certain that we have nothing configured on that
300 : /// node for this shard.
301 : /// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
302 : /// what it is (e.g. we failed partway through configuring it)
303 : /// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
304 : /// and that configuration will still be present unless something external interfered.
305 0 : #[derive(Clone, Serialize, Deserialize, Debug)]
306 : pub(crate) struct ObservedStateLocation {
307 : /// If None, it means we do not know the status of this shard's location on this node, but
308 : /// we know that we might have some state on this node.
309 : pub(crate) conf: Option<LocationConfig>,
310 : }
311 : pub(crate) struct ReconcilerWaiter {
312 : // For observability purposes, remember the ID of the shard we're
313 : // waiting for.
314 : pub(crate) tenant_shard_id: TenantShardId,
315 :
316 : seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
317 : error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
318 : error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
319 : seq: Sequence,
320 : }
321 :
322 : pub(crate) enum ReconcilerStatus {
323 : Done,
324 : Failed,
325 : InProgress,
326 : }
327 :
328 0 : #[derive(thiserror::Error, Debug)]
329 : pub(crate) enum ReconcileWaitError {
330 : #[error("Timeout waiting for shard {0}")]
331 : Timeout(TenantShardId),
332 : #[error("shutting down")]
333 : Shutdown,
334 : #[error("Reconcile error on shard {0}: {1}")]
335 : Failed(TenantShardId, Arc<ReconcileError>),
336 : }
337 :
338 : #[derive(Eq, PartialEq, Debug)]
339 : pub(crate) struct ReplaceSecondary {
340 : old_node_id: NodeId,
341 : new_node_id: NodeId,
342 : }
343 :
344 : #[derive(Eq, PartialEq, Debug)]
345 : pub(crate) struct MigrateAttachment {
346 : pub(crate) old_attached_node_id: NodeId,
347 : pub(crate) new_attached_node_id: NodeId,
348 : }
349 :
350 : #[derive(Eq, PartialEq, Debug)]
351 : pub(crate) enum ScheduleOptimizationAction {
352 : // Replace one of our secondary locations with a different node
353 : ReplaceSecondary(ReplaceSecondary),
354 : // Migrate attachment to an existing secondary location
355 : MigrateAttachment(MigrateAttachment),
356 : }
357 :
358 : #[derive(Eq, PartialEq, Debug)]
359 : pub(crate) struct ScheduleOptimization {
360 : // What was the reconcile sequence when we generated this optimization? The optimization
361 : // should only be applied if the shard's sequence is still at this value, in case other changes
362 : // happened between planning the optimization and applying it.
363 : sequence: Sequence,
364 :
365 : pub(crate) action: ScheduleOptimizationAction,
366 : }
367 :
368 : impl ReconcilerWaiter {
369 0 : pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
370 0 : tokio::select! {
371 0 : result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
372 0 : result.map_err(|e| match e {
373 0 : SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
374 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
375 0 : })?;
376 : },
377 0 : result = self.error_seq_wait.wait_for(self.seq) => {
378 0 : result.map_err(|e| match e {
379 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
380 0 : SeqWaitError::Timeout => unreachable!()
381 0 : })?;
382 :
383 0 : return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
384 0 : self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
385 : }
386 : }
387 :
388 0 : Ok(())
389 0 : }
390 :
391 0 : pub(crate) fn get_status(&self) -> ReconcilerStatus {
392 0 : if self.seq_wait.would_wait_for(self.seq).is_ok() {
393 0 : ReconcilerStatus::Done
394 0 : } else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
395 0 : ReconcilerStatus::Failed
396 : } else {
397 0 : ReconcilerStatus::InProgress
398 : }
399 0 : }
400 : }
401 :
402 : /// Having spawned a reconciler task, the tenant shard's state will carry enough
403 : /// information to optionally cancel & await it later.
404 : pub(crate) struct ReconcilerHandle {
405 : sequence: Sequence,
406 : handle: JoinHandle<()>,
407 : cancel: CancellationToken,
408 : }
409 :
410 : pub(crate) enum ReconcileNeeded {
411 : /// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
412 : /// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
413 : No,
414 : /// shard has a reconciler running, and its intent hasn't changed since that one was
415 : /// spawned: wait for the existing reconciler rather than spawning a new one.
416 : WaitExisting(ReconcilerWaiter),
417 : /// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
418 : Yes,
419 : }
420 :
421 : /// When a reconcile task completes, it sends this result object
422 : /// to be applied to the primary TenantShard.
423 : pub(crate) struct ReconcileResult {
424 : pub(crate) sequence: Sequence,
425 : /// On errors, `observed` should be treated as an incompleted description
426 : /// of state (i.e. any nodes present in the result should override nodes
427 : /// present in the parent tenant state, but any unmentioned nodes should
428 : /// not be removed from parent tenant state)
429 : pub(crate) result: Result<(), ReconcileError>,
430 :
431 : pub(crate) tenant_shard_id: TenantShardId,
432 : pub(crate) generation: Option<Generation>,
433 : pub(crate) observed: ObservedState,
434 :
435 : /// Set [`TenantShard::pending_compute_notification`] from this flag
436 : pub(crate) pending_compute_notification: bool,
437 : }
438 :
439 : impl ObservedState {
440 0 : pub(crate) fn new() -> Self {
441 0 : Self {
442 0 : locations: HashMap::new(),
443 0 : }
444 0 : }
445 : }
446 :
447 : impl TenantShard {
448 11 : pub(crate) fn new(
449 11 : tenant_shard_id: TenantShardId,
450 11 : shard: ShardIdentity,
451 11 : policy: PlacementPolicy,
452 11 : ) -> Self {
453 11 : Self {
454 11 : tenant_shard_id,
455 11 : policy,
456 11 : intent: IntentState::default(),
457 11 : generation: Some(Generation::new(0)),
458 11 : shard,
459 11 : observed: ObservedState::default(),
460 11 : config: TenantConfig::default(),
461 11 : reconciler: None,
462 11 : splitting: SplitState::Idle,
463 11 : sequence: Sequence(1),
464 11 : delayed_reconcile: false,
465 11 : waiter: Arc::new(SeqWait::new(Sequence(0))),
466 11 : error_waiter: Arc::new(SeqWait::new(Sequence(0))),
467 11 : last_error: Arc::default(),
468 11 : pending_compute_notification: false,
469 11 : scheduling_policy: ShardSchedulingPolicy::default(),
470 11 : preferred_az_id: None,
471 11 : }
472 11 : }
473 :
474 : /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
475 : /// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
476 : /// to get an intent state that complies with placement policy. The overall goal is to do scheduling
477 : /// in a way that makes use of any configured locations that already exist in the outside world.
478 1 : pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
479 1 : // Choose an attached location by filtering observed locations, and then sorting to get the highest
480 1 : // generation
481 1 : let mut attached_locs = self
482 1 : .observed
483 1 : .locations
484 1 : .iter()
485 2 : .filter_map(|(node_id, l)| {
486 2 : if let Some(conf) = &l.conf {
487 2 : if conf.mode == LocationConfigMode::AttachedMulti
488 1 : || conf.mode == LocationConfigMode::AttachedSingle
489 1 : || conf.mode == LocationConfigMode::AttachedStale
490 : {
491 2 : Some((node_id, conf.generation))
492 : } else {
493 0 : None
494 : }
495 : } else {
496 0 : None
497 : }
498 2 : })
499 1 : .collect::<Vec<_>>();
500 1 :
501 2 : attached_locs.sort_by_key(|i| i.1);
502 1 : if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
503 1 : self.intent.set_attached(scheduler, Some(*node_id));
504 1 : }
505 :
506 : // All remaining observed locations generate secondary intents. This includes None
507 : // observations, as these may well have some local content on disk that is usable (this
508 : // is an edge case that might occur if we restarted during a migration or other change)
509 : //
510 : // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
511 : // will take care of promoting one of these secondaries to be attached.
512 2 : self.observed.locations.keys().for_each(|node_id| {
513 2 : if Some(*node_id) != self.intent.attached {
514 1 : self.intent.push_secondary(scheduler, *node_id);
515 1 : }
516 2 : });
517 1 : }
518 :
519 : /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
520 : /// attached pageserver for a shard.
521 : ///
522 : /// Returns whether we modified it, and the NodeId selected.
523 7 : fn schedule_attached(
524 7 : &mut self,
525 7 : scheduler: &mut Scheduler,
526 7 : context: &ScheduleContext,
527 7 : ) -> Result<(bool, NodeId), ScheduleError> {
528 : // No work to do if we already have an attached tenant
529 7 : if let Some(node_id) = self.intent.attached {
530 0 : return Ok((false, node_id));
531 7 : }
532 :
533 7 : if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
534 : // Promote a secondary
535 1 : tracing::debug!("Promoted secondary {} to attached", promote_secondary);
536 1 : self.intent.promote_attached(scheduler, promote_secondary);
537 1 : Ok((true, promote_secondary))
538 : } else {
539 : // Pick a fresh node: either we had no secondaries or none were schedulable
540 6 : let node_id = scheduler.schedule_shard(&self.intent.secondary, context)?;
541 6 : tracing::debug!("Selected {} as attached", node_id);
542 6 : self.intent.set_attached(scheduler, Some(node_id));
543 6 : Ok((true, node_id))
544 : }
545 7 : }
546 :
547 8 : pub(crate) fn schedule(
548 8 : &mut self,
549 8 : scheduler: &mut Scheduler,
550 8 : context: &mut ScheduleContext,
551 8 : ) -> Result<(), ScheduleError> {
552 8 : let r = self.do_schedule(scheduler, context);
553 8 :
554 8 : context.avoid(&self.intent.all_pageservers());
555 8 : if let Some(attached) = self.intent.get_attached() {
556 7 : context.push_attached(*attached);
557 7 : }
558 :
559 8 : r
560 8 : }
561 :
562 8 : pub(crate) fn do_schedule(
563 8 : &mut self,
564 8 : scheduler: &mut Scheduler,
565 8 : context: &ScheduleContext,
566 8 : ) -> Result<(), ScheduleError> {
567 8 : // TODO: before scheduling new nodes, check if any existing content in
568 8 : // self.intent refers to pageservers that are offline, and pick other
569 8 : // pageservers if so.
570 8 :
571 8 : // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
572 8 : // change their attach location.
573 8 :
574 8 : match self.scheduling_policy {
575 7 : ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
576 : ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
577 : // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
578 1 : tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
579 0 : "Scheduling is disabled by policy {:?}", self.scheduling_policy);
580 1 : return Ok(());
581 : }
582 : }
583 :
584 : // Build the set of pageservers already in use by this tenant, to avoid scheduling
585 : // more work on the same pageservers we're already using.
586 7 : let mut modified = false;
587 :
588 : // Add/remove nodes to fulfil policy
589 : use PlacementPolicy::*;
590 7 : match self.policy {
591 7 : Attached(secondary_count) => {
592 7 : let retain_secondaries = if self.intent.attached.is_none()
593 7 : && scheduler.node_preferred(&self.intent.secondary).is_some()
594 : {
595 : // If we have no attached, and one of the secondaries is elegible to be promoted, retain
596 : // one more secondary than we usually would, as one of them will become attached futher down this function.
597 1 : secondary_count + 1
598 : } else {
599 6 : secondary_count
600 : };
601 :
602 7 : while self.intent.secondary.len() > retain_secondaries {
603 0 : // We have no particular preference for one secondary location over another: just
604 0 : // arbitrarily drop from the end
605 0 : self.intent.pop_secondary(scheduler);
606 0 : modified = true;
607 0 : }
608 :
609 : // Should have exactly one attached, and N secondaries
610 7 : let (modified_attached, attached_node_id) =
611 7 : self.schedule_attached(scheduler, context)?;
612 7 : modified |= modified_attached;
613 7 :
614 7 : let mut used_pageservers = vec![attached_node_id];
615 13 : while self.intent.secondary.len() < secondary_count {
616 6 : let node_id = scheduler.schedule_shard(&used_pageservers, context)?;
617 6 : self.intent.push_secondary(scheduler, node_id);
618 6 : used_pageservers.push(node_id);
619 6 : modified = true;
620 : }
621 : }
622 : Secondary => {
623 0 : if let Some(node_id) = self.intent.get_attached() {
624 0 : // Populate secondary by demoting the attached node
625 0 : self.intent.demote_attached(scheduler, *node_id);
626 0 : modified = true;
627 0 : } else if self.intent.secondary.is_empty() {
628 : // Populate secondary by scheduling a fresh node
629 0 : let node_id = scheduler.schedule_shard(&[], context)?;
630 0 : self.intent.push_secondary(scheduler, node_id);
631 0 : modified = true;
632 0 : }
633 0 : while self.intent.secondary.len() > 1 {
634 0 : // We have no particular preference for one secondary location over another: just
635 0 : // arbitrarily drop from the end
636 0 : self.intent.pop_secondary(scheduler);
637 0 : modified = true;
638 0 : }
639 : }
640 : Detached => {
641 : // Never add locations in this mode
642 0 : if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
643 0 : self.intent.clear(scheduler);
644 0 : modified = true;
645 0 : }
646 : }
647 : }
648 :
649 7 : if modified {
650 7 : self.sequence.0 += 1;
651 7 : }
652 :
653 7 : Ok(())
654 8 : }
655 :
656 : /// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
657 : /// if the swap is not possible and leaves the intent state in its original state.
658 : ///
659 : /// Arguments:
660 : /// `attached_to`: the currently attached location matching the intent state (may be None if the
661 : /// shard is not attached)
662 : /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
663 : /// the scheduler to recommend a node
664 0 : pub(crate) fn reschedule_to_secondary(
665 0 : &mut self,
666 0 : promote_to: Option<NodeId>,
667 0 : scheduler: &mut Scheduler,
668 0 : ) -> Result<(), ScheduleError> {
669 0 : let promote_to = match promote_to {
670 0 : Some(node) => node,
671 0 : None => match scheduler.node_preferred(self.intent.get_secondary()) {
672 0 : Some(node) => node,
673 : None => {
674 0 : return Err(ScheduleError::ImpossibleConstraint);
675 : }
676 : },
677 : };
678 :
679 0 : assert!(self.intent.get_secondary().contains(&promote_to));
680 :
681 0 : if let Some(node) = self.intent.get_attached() {
682 0 : let demoted = self.intent.demote_attached(scheduler, *node);
683 0 : if !demoted {
684 0 : return Err(ScheduleError::ImpossibleConstraint);
685 0 : }
686 0 : }
687 :
688 0 : self.intent.promote_attached(scheduler, promote_to);
689 0 :
690 0 : // Increment the sequence number for the edge case where a
691 0 : // reconciler is already running to avoid waiting on the
692 0 : // current reconcile instead of spawning a new one.
693 0 : self.sequence = self.sequence.next();
694 0 :
695 0 : Ok(())
696 0 : }
697 :
698 : /// Optimize attachments: if a shard has a secondary location that is preferable to
699 : /// its primary location based on soft constraints, switch that secondary location
700 : /// to be attached.
701 15 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
702 : pub(crate) fn optimize_attachment(
703 : &self,
704 : nodes: &HashMap<NodeId, Node>,
705 : schedule_context: &ScheduleContext,
706 : ) -> Option<ScheduleOptimization> {
707 : let attached = (*self.intent.get_attached())?;
708 : if self.intent.secondary.is_empty() {
709 : // We can only do useful work if we have both attached and secondary locations: this
710 : // function doesn't schedule new locations, only swaps between attached and secondaries.
711 : return None;
712 : }
713 :
714 : let current_affinity_score = schedule_context.get_node_affinity(attached);
715 : let current_attachment_count = schedule_context.get_node_attachments(attached);
716 :
717 : // Generate score for each node, dropping any un-schedulable nodes.
718 : let all_pageservers = self.intent.all_pageservers();
719 : let mut scores = all_pageservers
720 : .iter()
721 30 : .flat_map(|node_id| {
722 30 : let node = nodes.get(node_id);
723 30 : if node.is_none() {
724 0 : None
725 30 : } else if matches!(
726 30 : node.unwrap().get_scheduling(),
727 : NodeSchedulingPolicy::Filling
728 : ) {
729 : // If the node is currently filling, don't count it as a candidate to avoid,
730 : // racing with the background fill.
731 0 : None
732 30 : } else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
733 0 : None
734 : } else {
735 30 : let affinity_score = schedule_context.get_node_affinity(*node_id);
736 30 : let attachment_count = schedule_context.get_node_attachments(*node_id);
737 30 : Some((*node_id, affinity_score, attachment_count))
738 : }
739 30 : })
740 : .collect::<Vec<_>>();
741 :
742 : // Sort precedence:
743 : // 1st - prefer nodes with the lowest total affinity score
744 : // 2nd - prefer nodes with the lowest number of attachments in this context
745 : // 3rd - if all else is equal, sort by node ID for determinism in tests.
746 30 : scores.sort_by_key(|i| (i.1, i.2, i.0));
747 :
748 : if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
749 : scores.first()
750 : {
751 : if attached != *preferred_node {
752 : // The best alternative must be more than 1 better than us, otherwise we could end
753 : // up flapping back next time we're called (e.g. there's no point migrating from
754 : // a location with score 1 to a score zero, because on next location the situation
755 : // would be the same, but in reverse).
756 : if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
757 : || current_attachment_count > *preferred_attachment_count + 1
758 : {
759 : tracing::info!(
760 : "Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
761 : self.intent.get_secondary()
762 : );
763 : return Some(ScheduleOptimization {
764 : sequence: self.sequence,
765 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
766 : old_attached_node_id: attached,
767 : new_attached_node_id: *preferred_node,
768 : }),
769 : });
770 : }
771 : } else {
772 : tracing::debug!(
773 : "Node {} is already preferred (score {:?})",
774 : preferred_node,
775 : preferred_affinity_score
776 : );
777 : }
778 : }
779 :
780 : // Fall-through: we didn't find an optimization
781 : None
782 : }
783 :
784 12 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
785 : pub(crate) fn optimize_secondary(
786 : &self,
787 : scheduler: &mut Scheduler,
788 : schedule_context: &ScheduleContext,
789 : ) -> Option<ScheduleOptimization> {
790 : if self.intent.secondary.is_empty() {
791 : // We can only do useful work if we have both attached and secondary locations: this
792 : // function doesn't schedule new locations, only swaps between attached and secondaries.
793 : return None;
794 : }
795 :
796 : for secondary in self.intent.get_secondary() {
797 : let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
798 : // We're already on a node unaffected any affinity constraints,
799 : // so we won't change it.
800 : continue;
801 : };
802 :
803 : // Let the scheduler suggest a node, where it would put us if we were scheduling afresh
804 : // This implicitly limits the choice to nodes that are available, and prefers nodes
805 : // with lower utilization.
806 : let Ok(candidate_node) =
807 : scheduler.schedule_shard(&self.intent.all_pageservers(), schedule_context)
808 : else {
809 : // A scheduling error means we have no possible candidate replacements
810 : continue;
811 : };
812 :
813 : let candidate_affinity_score = schedule_context
814 : .nodes
815 : .get(&candidate_node)
816 : .unwrap_or(&AffinityScore::FREE);
817 :
818 : // The best alternative must be more than 1 better than us, otherwise we could end
819 : // up flapping back next time we're called.
820 : if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
821 : // If some other node is available and has a lower score than this node, then
822 : // that other node is a good place to migrate to.
823 : tracing::info!(
824 : "Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
825 : self.intent.get_secondary()
826 : );
827 : return Some(ScheduleOptimization {
828 : sequence: self.sequence,
829 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
830 : old_node_id: *secondary,
831 : new_node_id: candidate_node,
832 : }),
833 : });
834 : }
835 : }
836 :
837 : None
838 : }
839 :
840 : /// Return true if the optimization was really applied: it will not be applied if the optimization's
841 : /// sequence is behind this tenant shard's
842 9 : pub(crate) fn apply_optimization(
843 9 : &mut self,
844 9 : scheduler: &mut Scheduler,
845 9 : optimization: ScheduleOptimization,
846 9 : ) -> bool {
847 9 : if optimization.sequence != self.sequence {
848 0 : return false;
849 9 : }
850 9 :
851 9 : metrics::METRICS_REGISTRY
852 9 : .metrics_group
853 9 : .storage_controller_schedule_optimization
854 9 : .inc();
855 9 :
856 9 : match optimization.action {
857 : ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
858 4 : old_attached_node_id,
859 4 : new_attached_node_id,
860 4 : }) => {
861 4 : self.intent.demote_attached(scheduler, old_attached_node_id);
862 4 : self.intent
863 4 : .promote_attached(scheduler, new_attached_node_id);
864 4 : }
865 : ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
866 5 : old_node_id,
867 5 : new_node_id,
868 5 : }) => {
869 5 : self.intent.remove_secondary(scheduler, old_node_id);
870 5 : self.intent.push_secondary(scheduler, new_node_id);
871 5 : }
872 : }
873 :
874 9 : true
875 9 : }
876 :
877 : /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
878 : /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
879 : /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
880 : ///
881 : /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
882 : /// funciton should not be used to decide whether to reconcile.
883 0 : pub(crate) fn stably_attached(&self) -> Option<NodeId> {
884 0 : if let Some(attach_intent) = self.intent.attached {
885 0 : match self.observed.locations.get(&attach_intent) {
886 0 : Some(loc) => match &loc.conf {
887 0 : Some(conf) => match conf.mode {
888 : LocationConfigMode::AttachedMulti
889 : | LocationConfigMode::AttachedSingle
890 : | LocationConfigMode::AttachedStale => {
891 : // Our intent and observed state agree that this node is in an attached state.
892 0 : Some(attach_intent)
893 : }
894 : // Our observed config is not an attached state
895 0 : _ => None,
896 : },
897 : // Our observed state is None, i.e. in flux
898 0 : None => None,
899 : },
900 : // We have no observed state for this node
901 0 : None => None,
902 : }
903 : } else {
904 : // Our intent is not to attach
905 0 : None
906 : }
907 0 : }
908 :
909 0 : fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
910 0 : let mut dirty_nodes = HashSet::new();
911 :
912 0 : if let Some(node_id) = self.intent.attached {
913 : // Maybe panic: it is a severe bug if we try to attach while generation is null.
914 0 : let generation = self
915 0 : .generation
916 0 : .expect("Attempted to enter attached state without a generation");
917 0 :
918 0 : let wanted_conf =
919 0 : attached_location_conf(generation, &self.shard, &self.config, &self.policy);
920 0 : match self.observed.locations.get(&node_id) {
921 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
922 0 : Some(_) | None => {
923 0 : dirty_nodes.insert(node_id);
924 0 : }
925 : }
926 0 : }
927 :
928 0 : for node_id in &self.intent.secondary {
929 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
930 0 : match self.observed.locations.get(node_id) {
931 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
932 0 : Some(_) | None => {
933 0 : dirty_nodes.insert(*node_id);
934 0 : }
935 : }
936 : }
937 :
938 0 : for node_id in self.observed.locations.keys() {
939 0 : if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
940 0 : // We have observed state that isn't part of our intent: need to clean it up.
941 0 : dirty_nodes.insert(*node_id);
942 0 : }
943 : }
944 :
945 0 : dirty_nodes.retain(|node_id| {
946 0 : nodes
947 0 : .get(node_id)
948 0 : .map(|n| n.is_available())
949 0 : .unwrap_or(false)
950 0 : });
951 0 :
952 0 : !dirty_nodes.is_empty()
953 0 : }
954 :
955 : #[allow(clippy::too_many_arguments)]
956 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
957 : pub(crate) fn get_reconcile_needed(
958 : &mut self,
959 : pageservers: &Arc<HashMap<NodeId, Node>>,
960 : ) -> ReconcileNeeded {
961 : // If there are any ambiguous observed states, and the nodes they refer to are available,
962 : // we should reconcile to clean them up.
963 : let mut dirty_observed = false;
964 : for (node_id, observed_loc) in &self.observed.locations {
965 : let node = pageservers
966 : .get(node_id)
967 : .expect("Nodes may not be removed while referenced");
968 : if observed_loc.conf.is_none() && node.is_available() {
969 : dirty_observed = true;
970 : break;
971 : }
972 : }
973 :
974 : let active_nodes_dirty = self.dirty(pageservers);
975 :
976 : // Even if there is no pageserver work to be done, if we have a pending notification to computes,
977 : // wake up a reconciler to send it.
978 : let do_reconcile =
979 : active_nodes_dirty || dirty_observed || self.pending_compute_notification;
980 :
981 : if !do_reconcile {
982 : tracing::debug!("Not dirty, no reconciliation needed.");
983 : return ReconcileNeeded::No;
984 : }
985 :
986 : // If we are currently splitting, then never start a reconciler task: the splitting logic
987 : // requires that shards are not interfered with while it runs. Do this check here rather than
988 : // up top, so that we only log this message if we would otherwise have done a reconciliation.
989 : if !matches!(self.splitting, SplitState::Idle) {
990 : tracing::info!("Refusing to reconcile, splitting in progress");
991 : return ReconcileNeeded::No;
992 : }
993 :
994 : // Reconcile already in flight for the current sequence?
995 : if let Some(handle) = &self.reconciler {
996 : if handle.sequence == self.sequence {
997 : tracing::info!(
998 : "Reconciliation already in progress for sequence {:?}",
999 : self.sequence,
1000 : );
1001 : return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
1002 : tenant_shard_id: self.tenant_shard_id,
1003 : seq_wait: self.waiter.clone(),
1004 : error_seq_wait: self.error_waiter.clone(),
1005 : error: self.last_error.clone(),
1006 : seq: self.sequence,
1007 : });
1008 : }
1009 : }
1010 :
1011 : // Pre-checks done: finally check whether we may actually do the work
1012 : match self.scheduling_policy {
1013 : ShardSchedulingPolicy::Active
1014 : | ShardSchedulingPolicy::Essential
1015 : | ShardSchedulingPolicy::Pause => {}
1016 : ShardSchedulingPolicy::Stop => {
1017 : // We only reach this point if there is work to do and we're going to skip
1018 : // doing it: warn it obvious why this tenant isn't doing what it ought to.
1019 : tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
1020 : return ReconcileNeeded::No;
1021 : }
1022 : }
1023 :
1024 : ReconcileNeeded::Yes
1025 : }
1026 :
1027 : /// Ensure the sequence number is set to a value where waiting for this value will make us wait
1028 : /// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
1029 : ///
1030 : /// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
1031 : /// that the waiter will not complete until some future Reconciler is constructed and run.
1032 0 : fn ensure_sequence_ahead(&mut self) {
1033 0 : // Find the highest sequence for which a Reconciler has previously run or is currently
1034 0 : // running
1035 0 : let max_seen = std::cmp::max(
1036 0 : self.reconciler
1037 0 : .as_ref()
1038 0 : .map(|r| r.sequence)
1039 0 : .unwrap_or(Sequence(0)),
1040 0 : std::cmp::max(self.waiter.load(), self.error_waiter.load()),
1041 0 : );
1042 0 :
1043 0 : if self.sequence <= max_seen {
1044 0 : self.sequence = max_seen.next();
1045 0 : }
1046 0 : }
1047 :
1048 : /// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
1049 : ///
1050 : /// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
1051 : /// you would like to wait on the next reconciler that gets spawned in the background.
1052 0 : pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
1053 0 : self.ensure_sequence_ahead();
1054 0 :
1055 0 : ReconcilerWaiter {
1056 0 : tenant_shard_id: self.tenant_shard_id,
1057 0 : seq_wait: self.waiter.clone(),
1058 0 : error_seq_wait: self.error_waiter.clone(),
1059 0 : error: self.last_error.clone(),
1060 0 : seq: self.sequence,
1061 0 : }
1062 0 : }
1063 :
1064 : #[allow(clippy::too_many_arguments)]
1065 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1066 : pub(crate) fn spawn_reconciler(
1067 : &mut self,
1068 : result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
1069 : pageservers: &Arc<HashMap<NodeId, Node>>,
1070 : compute_hook: &Arc<ComputeHook>,
1071 : reconciler_config: ReconcilerConfig,
1072 : service_config: &service::Config,
1073 : persistence: &Arc<Persistence>,
1074 : units: ReconcileUnits,
1075 : gate_guard: GateGuard,
1076 : cancel: &CancellationToken,
1077 : ) -> Option<ReconcilerWaiter> {
1078 : // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
1079 : // doing our sequence's work.
1080 : let old_handle = self.reconciler.take();
1081 :
1082 : // Build list of nodes from which the reconciler should detach
1083 : let mut detach = Vec::new();
1084 : for node_id in self.observed.locations.keys() {
1085 : if self.intent.get_attached() != &Some(*node_id)
1086 : && !self.intent.secondary.contains(node_id)
1087 : {
1088 : detach.push(
1089 : pageservers
1090 : .get(node_id)
1091 : .expect("Intent references non-existent pageserver")
1092 : .clone(),
1093 : )
1094 : }
1095 : }
1096 :
1097 : // Advance the sequence before spawning a reconciler, so that sequence waiters
1098 : // can distinguish between before+after the reconcile completes.
1099 : self.ensure_sequence_ahead();
1100 :
1101 : let reconciler_cancel = cancel.child_token();
1102 : let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
1103 : let mut reconciler = Reconciler {
1104 : tenant_shard_id: self.tenant_shard_id,
1105 : shard: self.shard,
1106 : placement_policy: self.policy.clone(),
1107 : generation: self.generation,
1108 : intent: reconciler_intent,
1109 : detach,
1110 : reconciler_config,
1111 : config: self.config.clone(),
1112 : observed: self.observed.clone(),
1113 : compute_hook: compute_hook.clone(),
1114 : service_config: service_config.clone(),
1115 : _gate_guard: gate_guard,
1116 : _resource_units: units,
1117 : cancel: reconciler_cancel.clone(),
1118 : persistence: persistence.clone(),
1119 : compute_notify_failure: false,
1120 : };
1121 :
1122 : let reconcile_seq = self.sequence;
1123 :
1124 : tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
1125 : let must_notify = self.pending_compute_notification;
1126 : let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
1127 : tenant_id=%reconciler.tenant_shard_id.tenant_id,
1128 : shard_id=%reconciler.tenant_shard_id.shard_slug());
1129 : metrics::METRICS_REGISTRY
1130 : .metrics_group
1131 : .storage_controller_reconcile_spawn
1132 : .inc();
1133 : let result_tx = result_tx.clone();
1134 : let join_handle = tokio::task::spawn(
1135 0 : async move {
1136 : // Wait for any previous reconcile task to complete before we start
1137 0 : if let Some(old_handle) = old_handle {
1138 0 : old_handle.cancel.cancel();
1139 0 : if let Err(e) = old_handle.handle.await {
1140 : // We can't do much with this other than log it: the task is done, so
1141 : // we may proceed with our work.
1142 0 : tracing::error!("Unexpected join error waiting for reconcile task: {e}");
1143 0 : }
1144 0 : }
1145 :
1146 : // Early check for cancellation before doing any work
1147 : // TODO: wrap all remote API operations in cancellation check
1148 : // as well.
1149 0 : if reconciler.cancel.is_cancelled() {
1150 0 : metrics::METRICS_REGISTRY
1151 0 : .metrics_group
1152 0 : .storage_controller_reconcile_complete
1153 0 : .inc(ReconcileCompleteLabelGroup {
1154 0 : status: ReconcileOutcome::Cancel,
1155 0 : });
1156 0 : return;
1157 0 : }
1158 :
1159 : // Attempt to make observed state match intent state
1160 0 : let result = reconciler.reconcile().await;
1161 :
1162 : // If we know we had a pending compute notification from some previous action, send a notification irrespective
1163 : // of whether the above reconcile() did any work
1164 0 : if result.is_ok() && must_notify {
1165 : // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
1166 0 : reconciler.compute_notify().await.ok();
1167 0 : }
1168 :
1169 : // Update result counter
1170 0 : let outcome_label = match &result {
1171 0 : Ok(_) => ReconcileOutcome::Success,
1172 0 : Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
1173 0 : Err(_) => ReconcileOutcome::Error,
1174 : };
1175 :
1176 0 : metrics::METRICS_REGISTRY
1177 0 : .metrics_group
1178 0 : .storage_controller_reconcile_complete
1179 0 : .inc(ReconcileCompleteLabelGroup {
1180 0 : status: outcome_label,
1181 0 : });
1182 0 :
1183 0 : // Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
1184 0 : // try and schedule more work in response to our result.
1185 0 : let result = ReconcileResult {
1186 0 : sequence: reconcile_seq,
1187 0 : result,
1188 0 : tenant_shard_id: reconciler.tenant_shard_id,
1189 0 : generation: reconciler.generation,
1190 0 : observed: reconciler.observed,
1191 0 : pending_compute_notification: reconciler.compute_notify_failure,
1192 0 : };
1193 0 :
1194 0 : result_tx
1195 0 : .send(ReconcileResultRequest::ReconcileResult(result))
1196 0 : .ok();
1197 0 : }
1198 : .instrument(reconciler_span),
1199 : );
1200 :
1201 : self.reconciler = Some(ReconcilerHandle {
1202 : sequence: self.sequence,
1203 : handle: join_handle,
1204 : cancel: reconciler_cancel,
1205 : });
1206 :
1207 : Some(ReconcilerWaiter {
1208 : tenant_shard_id: self.tenant_shard_id,
1209 : seq_wait: self.waiter.clone(),
1210 : error_seq_wait: self.error_waiter.clone(),
1211 : error: self.last_error.clone(),
1212 : seq: self.sequence,
1213 : })
1214 : }
1215 :
1216 : /// Get a waiter for any reconciliation in flight, but do not start reconciliation
1217 : /// if it is not already running
1218 0 : pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
1219 0 : if self.reconciler.is_some() {
1220 0 : Some(ReconcilerWaiter {
1221 0 : tenant_shard_id: self.tenant_shard_id,
1222 0 : seq_wait: self.waiter.clone(),
1223 0 : error_seq_wait: self.error_waiter.clone(),
1224 0 : error: self.last_error.clone(),
1225 0 : seq: self.sequence,
1226 0 : })
1227 : } else {
1228 0 : None
1229 : }
1230 0 : }
1231 :
1232 : /// Called when a ReconcileResult has been emitted and the service is updating
1233 : /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
1234 : /// the handle to indicate there is no longer a reconciliation in progress.
1235 0 : pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
1236 0 : if let Some(reconcile_handle) = &self.reconciler {
1237 0 : if reconcile_handle.sequence <= sequence {
1238 0 : self.reconciler = None;
1239 0 : }
1240 0 : }
1241 0 : }
1242 :
1243 : /// If we had any state at all referring to this node ID, drop it. Does not
1244 : /// attempt to reschedule.
1245 : ///
1246 : /// Returns true if we modified the node's intent state.
1247 0 : pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
1248 0 : let mut intent_modified = false;
1249 0 :
1250 0 : // Drop if this node was our attached intent
1251 0 : if self.intent.attached == Some(node_id) {
1252 0 : self.intent.attached = None;
1253 0 : intent_modified = true;
1254 0 : }
1255 :
1256 : // Drop from the list of secondaries, and check if we modified it
1257 0 : let had_secondaries = self.intent.secondary.len();
1258 0 : self.intent.secondary.retain(|n| n != &node_id);
1259 0 : intent_modified |= self.intent.secondary.len() != had_secondaries;
1260 0 :
1261 0 : debug_assert!(!self.intent.all_pageservers().contains(&node_id));
1262 :
1263 0 : intent_modified
1264 0 : }
1265 :
1266 0 : pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
1267 0 : self.scheduling_policy = p;
1268 0 : }
1269 :
1270 0 : pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
1271 0 : &self.scheduling_policy
1272 0 : }
1273 :
1274 0 : pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
1275 0 : // Ordering: always set last_error before advancing sequence, so that sequence
1276 0 : // waiters are guaranteed to see a Some value when they see an error.
1277 0 : *(self.last_error.lock().unwrap()) = Some(Arc::new(error));
1278 0 : self.error_waiter.advance(sequence);
1279 0 : }
1280 :
1281 0 : pub(crate) fn from_persistent(
1282 0 : tsp: TenantShardPersistence,
1283 0 : intent: IntentState,
1284 0 : ) -> anyhow::Result<Self> {
1285 0 : let tenant_shard_id = tsp.get_tenant_shard_id()?;
1286 0 : let shard_identity = tsp.get_shard_identity()?;
1287 :
1288 0 : Ok(Self {
1289 0 : tenant_shard_id,
1290 0 : shard: shard_identity,
1291 0 : sequence: Sequence::initial(),
1292 0 : generation: tsp.generation.map(|g| Generation::new(g as u32)),
1293 0 : policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
1294 0 : intent,
1295 0 : observed: ObservedState::new(),
1296 0 : config: serde_json::from_str(&tsp.config).unwrap(),
1297 0 : reconciler: None,
1298 0 : splitting: tsp.splitting,
1299 0 : waiter: Arc::new(SeqWait::new(Sequence::initial())),
1300 0 : error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
1301 0 : last_error: Arc::default(),
1302 0 : pending_compute_notification: false,
1303 0 : delayed_reconcile: false,
1304 0 : scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
1305 0 : preferred_az_id: tsp.preferred_az_id,
1306 0 : })
1307 0 : }
1308 :
1309 0 : pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
1310 0 : TenantShardPersistence {
1311 0 : tenant_id: self.tenant_shard_id.tenant_id.to_string(),
1312 0 : shard_number: self.tenant_shard_id.shard_number.0 as i32,
1313 0 : shard_count: self.tenant_shard_id.shard_count.literal() as i32,
1314 0 : shard_stripe_size: self.shard.stripe_size.0 as i32,
1315 0 : generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
1316 0 : generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
1317 0 : placement_policy: serde_json::to_string(&self.policy).unwrap(),
1318 0 : config: serde_json::to_string(&self.config).unwrap(),
1319 0 : splitting: SplitState::default(),
1320 0 : scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
1321 0 : preferred_az_id: self.preferred_az_id.clone(),
1322 0 : }
1323 0 : }
1324 :
1325 0 : pub(crate) fn preferred_az(&self) -> Option<&str> {
1326 0 : self.preferred_az_id.as_deref()
1327 0 : }
1328 :
1329 0 : pub(crate) fn set_preferred_az(&mut self, preferred_az_id: String) {
1330 0 : self.preferred_az_id = Some(preferred_az_id);
1331 0 : }
1332 : }
1333 :
1334 : #[cfg(test)]
1335 : pub(crate) mod tests {
1336 : use pageserver_api::{
1337 : controller_api::NodeAvailability,
1338 : shard::{ShardCount, ShardNumber},
1339 : };
1340 : use utils::id::TenantId;
1341 :
1342 : use crate::scheduler::test_utils::make_test_nodes;
1343 :
1344 : use super::*;
1345 :
1346 7 : fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
1347 7 : let tenant_id = TenantId::generate();
1348 7 : let shard_number = ShardNumber(0);
1349 7 : let shard_count = ShardCount::new(1);
1350 7 :
1351 7 : let tenant_shard_id = TenantShardId {
1352 7 : tenant_id,
1353 7 : shard_number,
1354 7 : shard_count,
1355 7 : };
1356 7 : TenantShard::new(
1357 7 : tenant_shard_id,
1358 7 : ShardIdentity::new(
1359 7 : shard_number,
1360 7 : shard_count,
1361 7 : pageserver_api::shard::ShardStripeSize(32768),
1362 7 : )
1363 7 : .unwrap(),
1364 7 : policy,
1365 7 : )
1366 7 : }
1367 :
1368 1 : fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec<TenantShard> {
1369 1 : let tenant_id = TenantId::generate();
1370 1 :
1371 1 : (0..shard_count.count())
1372 4 : .map(|i| {
1373 4 : let shard_number = ShardNumber(i);
1374 4 :
1375 4 : let tenant_shard_id = TenantShardId {
1376 4 : tenant_id,
1377 4 : shard_number,
1378 4 : shard_count,
1379 4 : };
1380 4 : TenantShard::new(
1381 4 : tenant_shard_id,
1382 4 : ShardIdentity::new(
1383 4 : shard_number,
1384 4 : shard_count,
1385 4 : pageserver_api::shard::ShardStripeSize(32768),
1386 4 : )
1387 4 : .unwrap(),
1388 4 : policy.clone(),
1389 4 : )
1390 4 : })
1391 1 : .collect()
1392 1 : }
1393 :
1394 : /// Test the scheduling behaviors used when a tenant configured for HA is subject
1395 : /// to nodes being marked offline.
1396 : #[test]
1397 1 : fn tenant_ha_scheduling() -> anyhow::Result<()> {
1398 1 : // Start with three nodes. Our tenant will only use two. The third one is
1399 1 : // expected to remain unused.
1400 1 : let mut nodes = make_test_nodes(3);
1401 1 :
1402 1 : let mut scheduler = Scheduler::new(nodes.values());
1403 1 : let mut context = ScheduleContext::default();
1404 1 :
1405 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1406 1 : tenant_shard
1407 1 : .schedule(&mut scheduler, &mut context)
1408 1 : .expect("we have enough nodes, scheduling should work");
1409 1 :
1410 1 : // Expect to initially be schedule on to different nodes
1411 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
1412 1 : assert!(tenant_shard.intent.attached.is_some());
1413 :
1414 1 : let attached_node_id = tenant_shard.intent.attached.unwrap();
1415 1 : let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
1416 1 : assert_ne!(attached_node_id, secondary_node_id);
1417 :
1418 : // Notifying the attached node is offline should demote it to a secondary
1419 1 : let changed = tenant_shard
1420 1 : .intent
1421 1 : .demote_attached(&mut scheduler, attached_node_id);
1422 1 : assert!(changed);
1423 1 : assert!(tenant_shard.intent.attached.is_none());
1424 1 : assert_eq!(tenant_shard.intent.secondary.len(), 2);
1425 :
1426 : // Update the scheduler state to indicate the node is offline
1427 1 : nodes
1428 1 : .get_mut(&attached_node_id)
1429 1 : .unwrap()
1430 1 : .set_availability(NodeAvailability::Offline);
1431 1 : scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
1432 1 :
1433 1 : // Scheduling the node should promote the still-available secondary node to attached
1434 1 : tenant_shard
1435 1 : .schedule(&mut scheduler, &mut context)
1436 1 : .expect("active nodes are available");
1437 1 : assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
1438 :
1439 : // The original attached node should have been retained as a secondary
1440 1 : assert_eq!(
1441 1 : *tenant_shard.intent.secondary.iter().last().unwrap(),
1442 1 : attached_node_id
1443 1 : );
1444 :
1445 1 : tenant_shard.intent.clear(&mut scheduler);
1446 1 :
1447 1 : Ok(())
1448 1 : }
1449 :
1450 : #[test]
1451 1 : fn intent_from_observed() -> anyhow::Result<()> {
1452 1 : let nodes = make_test_nodes(3);
1453 1 : let mut scheduler = Scheduler::new(nodes.values());
1454 1 :
1455 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1456 1 :
1457 1 : tenant_shard.observed.locations.insert(
1458 1 : NodeId(3),
1459 1 : ObservedStateLocation {
1460 1 : conf: Some(LocationConfig {
1461 1 : mode: LocationConfigMode::AttachedMulti,
1462 1 : generation: Some(2),
1463 1 : secondary_conf: None,
1464 1 : shard_number: tenant_shard.shard.number.0,
1465 1 : shard_count: tenant_shard.shard.count.literal(),
1466 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1467 1 : tenant_conf: TenantConfig::default(),
1468 1 : }),
1469 1 : },
1470 1 : );
1471 1 :
1472 1 : tenant_shard.observed.locations.insert(
1473 1 : NodeId(2),
1474 1 : ObservedStateLocation {
1475 1 : conf: Some(LocationConfig {
1476 1 : mode: LocationConfigMode::AttachedStale,
1477 1 : generation: Some(1),
1478 1 : secondary_conf: None,
1479 1 : shard_number: tenant_shard.shard.number.0,
1480 1 : shard_count: tenant_shard.shard.count.literal(),
1481 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1482 1 : tenant_conf: TenantConfig::default(),
1483 1 : }),
1484 1 : },
1485 1 : );
1486 1 :
1487 1 : tenant_shard.intent_from_observed(&mut scheduler);
1488 1 :
1489 1 : // The highest generationed attached location gets used as attached
1490 1 : assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
1491 : // Other locations get used as secondary
1492 1 : assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
1493 :
1494 1 : scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
1495 :
1496 1 : tenant_shard.intent.clear(&mut scheduler);
1497 1 : Ok(())
1498 1 : }
1499 :
1500 : #[test]
1501 1 : fn scheduling_mode() -> anyhow::Result<()> {
1502 1 : let nodes = make_test_nodes(3);
1503 1 : let mut scheduler = Scheduler::new(nodes.values());
1504 1 :
1505 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1506 1 :
1507 1 : // In pause mode, schedule() shouldn't do anything
1508 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
1509 1 : assert!(tenant_shard
1510 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
1511 1 : .is_ok());
1512 1 : assert!(tenant_shard.intent.all_pageservers().is_empty());
1513 :
1514 : // In active mode, schedule() works
1515 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
1516 1 : assert!(tenant_shard
1517 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
1518 1 : .is_ok());
1519 1 : assert!(!tenant_shard.intent.all_pageservers().is_empty());
1520 :
1521 1 : tenant_shard.intent.clear(&mut scheduler);
1522 1 : Ok(())
1523 1 : }
1524 :
1525 : #[test]
1526 1 : fn optimize_attachment() -> anyhow::Result<()> {
1527 1 : let nodes = make_test_nodes(3);
1528 1 : let mut scheduler = Scheduler::new(nodes.values());
1529 1 :
1530 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
1531 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
1532 1 :
1533 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
1534 1 : // on different nodes.
1535 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1536 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
1537 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1538 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
1539 1 :
1540 1 : let mut schedule_context = ScheduleContext::default();
1541 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
1542 1 : schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
1543 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
1544 1 : schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
1545 1 :
1546 1 : let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
1547 1 :
1548 1 : // Either shard should recognize that it has the option to switch to a secondary location where there
1549 1 : // would be no other shards from the same tenant, and request to do so.
1550 1 : assert_eq!(
1551 1 : optimization_a,
1552 1 : Some(ScheduleOptimization {
1553 1 : sequence: shard_a.sequence,
1554 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1555 1 : old_attached_node_id: NodeId(1),
1556 1 : new_attached_node_id: NodeId(2)
1557 1 : })
1558 1 : })
1559 1 : );
1560 :
1561 : // Note that these optimizing two shards in the same tenant with the same ScheduleContext is
1562 : // mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
1563 : // of [`Service::optimize_all`] to avoid trying
1564 : // to do optimizations for multiple shards in the same tenant at the same time. Generating
1565 : // both optimizations is just done for test purposes
1566 1 : let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
1567 1 : assert_eq!(
1568 1 : optimization_b,
1569 1 : Some(ScheduleOptimization {
1570 1 : sequence: shard_b.sequence,
1571 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1572 1 : old_attached_node_id: NodeId(1),
1573 1 : new_attached_node_id: NodeId(3)
1574 1 : })
1575 1 : })
1576 1 : );
1577 :
1578 : // Applying these optimizations should result in the end state proposed
1579 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
1580 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
1581 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
1582 1 : shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
1583 1 : assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
1584 1 : assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
1585 :
1586 1 : shard_a.intent.clear(&mut scheduler);
1587 1 : shard_b.intent.clear(&mut scheduler);
1588 1 :
1589 1 : Ok(())
1590 1 : }
1591 :
1592 : #[test]
1593 1 : fn optimize_secondary() -> anyhow::Result<()> {
1594 1 : let nodes = make_test_nodes(4);
1595 1 : let mut scheduler = Scheduler::new(nodes.values());
1596 1 :
1597 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
1598 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
1599 1 :
1600 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
1601 1 : // on different nodes.
1602 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1603 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
1604 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
1605 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
1606 1 :
1607 1 : let mut schedule_context = ScheduleContext::default();
1608 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
1609 1 : schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
1610 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
1611 1 : schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
1612 1 :
1613 1 : let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);
1614 1 :
1615 1 : // Since there is a node with no locations available, the node with two locations for the
1616 1 : // same tenant should generate an optimization to move one away
1617 1 : assert_eq!(
1618 1 : optimization_a,
1619 1 : Some(ScheduleOptimization {
1620 1 : sequence: shard_a.sequence,
1621 1 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
1622 1 : old_node_id: NodeId(3),
1623 1 : new_node_id: NodeId(4)
1624 1 : })
1625 1 : })
1626 1 : );
1627 :
1628 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
1629 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
1630 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
1631 :
1632 1 : shard_a.intent.clear(&mut scheduler);
1633 1 : shard_b.intent.clear(&mut scheduler);
1634 1 :
1635 1 : Ok(())
1636 1 : }
1637 :
1638 : // Optimize til quiescent: this emulates what Service::optimize_all does, when
1639 : // called repeatedly in the background.
1640 1 : fn optimize_til_idle(
1641 1 : nodes: &HashMap<NodeId, Node>,
1642 1 : scheduler: &mut Scheduler,
1643 1 : shards: &mut [TenantShard],
1644 1 : ) {
1645 1 : let mut loop_n = 0;
1646 : loop {
1647 7 : let mut schedule_context = ScheduleContext::default();
1648 7 : let mut any_changed = false;
1649 :
1650 28 : for shard in shards.iter() {
1651 28 : schedule_context.avoid(&shard.intent.all_pageservers());
1652 28 : if let Some(attached) = shard.intent.get_attached() {
1653 28 : schedule_context.push_attached(*attached);
1654 28 : }
1655 : }
1656 :
1657 13 : for shard in shards.iter_mut() {
1658 13 : let optimization = shard.optimize_attachment(nodes, &schedule_context);
1659 13 : if let Some(optimization) = optimization {
1660 2 : shard.apply_optimization(scheduler, optimization);
1661 2 : any_changed = true;
1662 2 : break;
1663 11 : }
1664 11 :
1665 11 : let optimization = shard.optimize_secondary(scheduler, &schedule_context);
1666 11 : if let Some(optimization) = optimization {
1667 4 : shard.apply_optimization(scheduler, optimization);
1668 4 : any_changed = true;
1669 4 : break;
1670 7 : }
1671 : }
1672 :
1673 7 : if !any_changed {
1674 1 : break;
1675 6 : }
1676 6 :
1677 6 : // Assert no infinite loop
1678 6 : loop_n += 1;
1679 6 : assert!(loop_n < 1000);
1680 : }
1681 1 : }
1682 :
1683 : /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
1684 : /// that it converges.
1685 : #[test]
1686 1 : fn optimize_add_nodes() -> anyhow::Result<()> {
1687 1 : let nodes = make_test_nodes(4);
1688 1 :
1689 1 : // Only show the scheduler a couple of nodes
1690 1 : let mut scheduler = Scheduler::new([].iter());
1691 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1692 1 : scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
1693 1 :
1694 1 : let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
1695 1 : let mut schedule_context = ScheduleContext::default();
1696 5 : for shard in &mut shards {
1697 4 : assert!(shard
1698 4 : .schedule(&mut scheduler, &mut schedule_context)
1699 4 : .is_ok());
1700 : }
1701 :
1702 : // We should see equal number of locations on the two nodes.
1703 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
1704 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
1705 :
1706 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
1707 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
1708 :
1709 : // Add another two nodes: we should see the shards spread out when their optimize
1710 : // methods are called
1711 1 : scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
1712 1 : scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
1713 1 : optimize_til_idle(&nodes, &mut scheduler, &mut shards);
1714 1 :
1715 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
1716 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
1717 :
1718 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
1719 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
1720 :
1721 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
1722 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1);
1723 :
1724 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
1725 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1);
1726 :
1727 4 : for shard in shards.iter_mut() {
1728 4 : shard.intent.clear(&mut scheduler);
1729 4 : }
1730 :
1731 1 : Ok(())
1732 1 : }
1733 : }
|