Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : sync::Arc,
4 : time::Duration,
5 : };
6 :
7 : use crate::{
8 : metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
9 : persistence::TenantShardPersistence,
10 : reconciler::ReconcileUnits,
11 : scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext},
12 : };
13 : use pageserver_api::controller_api::{
14 : NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
15 : };
16 : use pageserver_api::{
17 : models::{LocationConfig, LocationConfigMode, TenantConfig},
18 : shard::{ShardIdentity, TenantShardId},
19 : };
20 : use serde::Serialize;
21 : use tokio::task::JoinHandle;
22 : use tokio_util::sync::CancellationToken;
23 : use tracing::{instrument, Instrument};
24 : use utils::{
25 : generation::Generation,
26 : id::NodeId,
27 : seqwait::{SeqWait, SeqWaitError},
28 : sync::gate::GateGuard,
29 : };
30 :
31 : use crate::{
32 : compute_hook::ComputeHook,
33 : node::Node,
34 : persistence::{split_state::SplitState, Persistence},
35 : reconciler::{
36 : attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
37 : },
38 : scheduler::{ScheduleError, Scheduler},
39 : service, Sequence,
40 : };
41 :
42 : /// Serialization helper
43 0 : fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
44 0 : where
45 0 : S: serde::ser::Serializer,
46 0 : T: std::fmt::Display,
47 0 : {
48 0 : serializer.collect_str(
49 0 : &v.lock()
50 0 : .unwrap()
51 0 : .as_ref()
52 0 : .map(|e| format!("{e}"))
53 0 : .unwrap_or("".to_string()),
54 0 : )
55 0 : }
56 :
57 : /// In-memory state for a particular tenant shard.
58 : ///
59 : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
60 : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
61 0 : #[derive(Serialize)]
62 : pub(crate) struct TenantShard {
63 : pub(crate) tenant_shard_id: TenantShardId,
64 :
65 : pub(crate) shard: ShardIdentity,
66 :
67 : // Runtime only: sequence used to coordinate when updating this object while
68 : // with background reconcilers may be running. A reconciler runs to a particular
69 : // sequence.
70 : pub(crate) sequence: Sequence,
71 :
72 : // Latest generation number: next time we attach, increment this
73 : // and use the incremented number when attaching.
74 : //
75 : // None represents an incompletely onboarded tenant via the [`Service::location_config`]
76 : // API, where this tenant may only run in PlacementPolicy::Secondary.
77 : pub(crate) generation: Option<Generation>,
78 :
79 : // High level description of how the tenant should be set up. Provided
80 : // externally.
81 : pub(crate) policy: PlacementPolicy,
82 :
83 : // Low level description of exactly which pageservers should fulfil
84 : // which role. Generated by `Self::schedule`.
85 : pub(crate) intent: IntentState,
86 :
87 : // Low level description of how the tenant is configured on pageservers:
88 : // if this does not match `Self::intent` then the tenant needs reconciliation
89 : // with `Self::reconcile`.
90 : pub(crate) observed: ObservedState,
91 :
92 : // Tenant configuration, passed through opaquely to the pageserver. Identical
93 : // for all shards in a tenant.
94 : pub(crate) config: TenantConfig,
95 :
96 : /// If a reconcile task is currently in flight, it may be joined here (it is
97 : /// only safe to join if either the result has been received or the reconciler's
98 : /// cancellation token has been fired)
99 : #[serde(skip)]
100 : pub(crate) reconciler: Option<ReconcilerHandle>,
101 :
102 : /// If a tenant is being split, then all shards with that TenantId will have a
103 : /// SplitState set, this acts as a guard against other operations such as background
104 : /// reconciliation, and timeline creation.
105 : pub(crate) splitting: SplitState,
106 :
107 : /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
108 : /// is set. This flag is cleared when the tenant is popped off the delay queue.
109 : pub(crate) delayed_reconcile: bool,
110 :
111 : /// Optionally wait for reconciliation to complete up to a particular
112 : /// sequence number.
113 : #[serde(skip)]
114 : pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
115 :
116 : /// Indicates sequence number for which we have encountered an error reconciling. If
117 : /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
118 : /// and callers should stop waiting for `waiter` and propagate the error.
119 : #[serde(skip)]
120 : pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
121 :
122 : /// The most recent error from a reconcile on this tenant. This is a nested Arc
123 : /// because:
124 : /// - ReconcileWaiters need to Arc-clone the overall object to read it later
125 : /// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
126 : /// many waiters for one shard, and the underlying error types are not Clone.
127 : ///
128 : /// TODO: generalize to an array of recent events
129 : /// TOOD: use a ArcSwap instead of mutex for faster reads?
130 : #[serde(serialize_with = "read_last_error")]
131 : pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
132 :
133 : /// If we have a pending compute notification that for some reason we weren't able to send,
134 : /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
135 : /// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
136 : /// of state that we publish externally in an eventually consistent way.
137 : pub(crate) pending_compute_notification: bool,
138 :
139 : // Support/debug tool: if something is going wrong or flapping with scheduling, this may
140 : // be set to a non-active state to avoid making changes while the issue is fixed.
141 : scheduling_policy: ShardSchedulingPolicy,
142 : }
143 :
144 : #[derive(Default, Clone, Debug, Serialize)]
145 : pub(crate) struct IntentState {
146 : attached: Option<NodeId>,
147 : secondary: Vec<NodeId>,
148 : }
149 :
150 : impl IntentState {
151 4 : pub(crate) fn new() -> Self {
152 4 : Self {
153 4 : attached: None,
154 4 : secondary: vec![],
155 4 : }
156 4 : }
157 0 : pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
158 0 : if let Some(node_id) = node_id {
159 0 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach);
160 0 : }
161 0 : Self {
162 0 : attached: node_id,
163 0 : secondary: vec![],
164 0 : }
165 0 : }
166 :
167 26 : pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
168 26 : if self.attached != new_attached {
169 26 : if let Some(old_attached) = self.attached.take() {
170 0 : scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
171 26 : }
172 26 : if let Some(new_attached) = &new_attached {
173 26 : scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach);
174 26 : }
175 26 : self.attached = new_attached;
176 0 : }
177 26 : }
178 :
179 : /// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
180 : /// secondary to attached while maintaining the scheduler's reference counts.
181 10 : pub(crate) fn promote_attached(
182 10 : &mut self,
183 10 : scheduler: &mut Scheduler,
184 10 : promote_secondary: NodeId,
185 10 : ) {
186 10 : // If we call this with a node that isn't in secondary, it would cause incorrect
187 10 : // scheduler reference counting, since we assume the node is already referenced as a secondary.
188 10 : debug_assert!(self.secondary.contains(&promote_secondary));
189 :
190 20 : self.secondary.retain(|n| n != &promote_secondary);
191 10 :
192 10 : let demoted = self.attached;
193 10 : self.attached = Some(promote_secondary);
194 10 :
195 10 : scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary);
196 10 : if let Some(demoted) = demoted {
197 0 : scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached);
198 10 : }
199 10 : }
200 :
201 34 : pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
202 34 : debug_assert!(!self.secondary.contains(&new_secondary));
203 34 : scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary);
204 34 : self.secondary.push(new_secondary);
205 34 : }
206 :
207 : /// It is legal to call this with a node that is not currently a secondary: that is a no-op
208 10 : pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
209 10 : let index = self.secondary.iter().position(|n| *n == node_id);
210 10 : if let Some(index) = index {
211 10 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
212 10 : self.secondary.remove(index);
213 10 : }
214 10 : }
215 :
216 24 : pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
217 24 : for secondary in self.secondary.drain(..) {
218 24 : scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary);
219 24 : }
220 24 : }
221 :
222 : /// Remove the last secondary node from the list of secondaries
223 0 : pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
224 0 : if let Some(node_id) = self.secondary.pop() {
225 0 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
226 0 : }
227 0 : }
228 :
229 24 : pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
230 24 : if let Some(old_attached) = self.attached.take() {
231 24 : scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
232 24 : }
233 :
234 24 : self.clear_secondary(scheduler);
235 24 : }
236 :
237 140 : pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
238 140 : let mut result = Vec::new();
239 140 : if let Some(p) = self.attached {
240 136 : result.push(p)
241 4 : }
242 :
243 140 : result.extend(self.secondary.iter().copied());
244 140 :
245 140 : result
246 140 : }
247 :
248 118 : pub(crate) fn get_attached(&self) -> &Option<NodeId> {
249 118 : &self.attached
250 118 : }
251 :
252 32 : pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
253 32 : &self.secondary
254 32 : }
255 :
256 : /// If the node is in use as the attached location, demote it into
257 : /// the list of secondary locations. This is used when a node goes offline,
258 : /// and we want to use a different node for attachment, but not permanently
259 : /// forget the location on the offline node.
260 : ///
261 : /// Returns true if a change was made
262 10 : pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
263 10 : if self.attached == Some(node_id) {
264 10 : self.attached = None;
265 10 : self.secondary.push(node_id);
266 10 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached);
267 10 : true
268 : } else {
269 0 : false
270 : }
271 10 : }
272 : }
273 :
274 : impl Drop for IntentState {
275 26 : fn drop(&mut self) {
276 26 : // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
277 26 : // We do not check this while panicking, to avoid polluting unit test failures or
278 26 : // other assertions with this assertion's output. It's still wrong to leak these,
279 26 : // but if we already have a panic then we don't need to independently flag this case.
280 26 : if !(std::thread::panicking()) {
281 26 : debug_assert!(self.attached.is_none() && self.secondary.is_empty());
282 0 : }
283 24 : }
284 : }
285 :
286 : #[derive(Default, Clone, Serialize)]
287 : pub(crate) struct ObservedState {
288 : pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
289 : }
290 :
291 : /// Our latest knowledge of how this tenant is configured in the outside world.
292 : ///
293 : /// Meaning:
294 : /// * No instance of this type exists for a node: we are certain that we have nothing configured on that
295 : /// node for this shard.
296 : /// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
297 : /// what it is (e.g. we failed partway through configuring it)
298 : /// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
299 : /// and that configuration will still be present unless something external interfered.
300 : #[derive(Clone, Serialize)]
301 : pub(crate) struct ObservedStateLocation {
302 : /// If None, it means we do not know the status of this shard's location on this node, but
303 : /// we know that we might have some state on this node.
304 : pub(crate) conf: Option<LocationConfig>,
305 : }
306 : pub(crate) struct ReconcilerWaiter {
307 : // For observability purposes, remember the ID of the shard we're
308 : // waiting for.
309 : pub(crate) tenant_shard_id: TenantShardId,
310 :
311 : seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
312 : error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
313 : error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
314 : seq: Sequence,
315 : }
316 :
317 : pub(crate) enum ReconcilerStatus {
318 : Done,
319 : Failed,
320 : InProgress,
321 : }
322 :
323 0 : #[derive(thiserror::Error, Debug)]
324 : pub(crate) enum ReconcileWaitError {
325 : #[error("Timeout waiting for shard {0}")]
326 : Timeout(TenantShardId),
327 : #[error("shutting down")]
328 : Shutdown,
329 : #[error("Reconcile error on shard {0}: {1}")]
330 : Failed(TenantShardId, Arc<ReconcileError>),
331 : }
332 :
333 : #[derive(Eq, PartialEq, Debug)]
334 : pub(crate) struct ReplaceSecondary {
335 : old_node_id: NodeId,
336 : new_node_id: NodeId,
337 : }
338 :
339 : #[derive(Eq, PartialEq, Debug)]
340 : pub(crate) struct MigrateAttachment {
341 : pub(crate) old_attached_node_id: NodeId,
342 : pub(crate) new_attached_node_id: NodeId,
343 : }
344 :
345 : #[derive(Eq, PartialEq, Debug)]
346 : pub(crate) enum ScheduleOptimizationAction {
347 : // Replace one of our secondary locations with a different node
348 : ReplaceSecondary(ReplaceSecondary),
349 : // Migrate attachment to an existing secondary location
350 : MigrateAttachment(MigrateAttachment),
351 : }
352 :
353 : #[derive(Eq, PartialEq, Debug)]
354 : pub(crate) struct ScheduleOptimization {
355 : // What was the reconcile sequence when we generated this optimization? The optimization
356 : // should only be applied if the shard's sequence is still at this value, in case other changes
357 : // happened between planning the optimization and applying it.
358 : sequence: Sequence,
359 :
360 : pub(crate) action: ScheduleOptimizationAction,
361 : }
362 :
363 : impl ReconcilerWaiter {
364 0 : pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
365 : tokio::select! {
366 : result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
367 0 : result.map_err(|e| match e {
368 0 : SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
369 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
370 0 : })?;
371 : },
372 : result = self.error_seq_wait.wait_for(self.seq) => {
373 0 : result.map_err(|e| match e {
374 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
375 0 : SeqWaitError::Timeout => unreachable!()
376 0 : })?;
377 :
378 : return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
379 : self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
380 : }
381 : }
382 :
383 0 : Ok(())
384 0 : }
385 :
386 0 : pub(crate) fn get_status(&self) -> ReconcilerStatus {
387 0 : if self.seq_wait.would_wait_for(self.seq).is_ok() {
388 0 : ReconcilerStatus::Done
389 0 : } else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
390 0 : ReconcilerStatus::Failed
391 : } else {
392 0 : ReconcilerStatus::InProgress
393 : }
394 0 : }
395 : }
396 :
397 : /// Having spawned a reconciler task, the tenant shard's state will carry enough
398 : /// information to optionally cancel & await it later.
399 : pub(crate) struct ReconcilerHandle {
400 : sequence: Sequence,
401 : handle: JoinHandle<()>,
402 : cancel: CancellationToken,
403 : }
404 :
405 : pub(crate) enum ReconcileNeeded {
406 : /// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
407 : /// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
408 : No,
409 : /// shard has a reconciler running, and its intent hasn't changed since that one was
410 : /// spawned: wait for the existing reconciler rather than spawning a new one.
411 : WaitExisting(ReconcilerWaiter),
412 : /// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
413 : Yes,
414 : }
415 :
416 : /// When a reconcile task completes, it sends this result object
417 : /// to be applied to the primary TenantShard.
418 : pub(crate) struct ReconcileResult {
419 : pub(crate) sequence: Sequence,
420 : /// On errors, `observed` should be treated as an incompleted description
421 : /// of state (i.e. any nodes present in the result should override nodes
422 : /// present in the parent tenant state, but any unmentioned nodes should
423 : /// not be removed from parent tenant state)
424 : pub(crate) result: Result<(), ReconcileError>,
425 :
426 : pub(crate) tenant_shard_id: TenantShardId,
427 : pub(crate) generation: Option<Generation>,
428 : pub(crate) observed: ObservedState,
429 :
430 : /// Set [`TenantShard::pending_compute_notification`] from this flag
431 : pub(crate) pending_compute_notification: bool,
432 : }
433 :
434 : impl ObservedState {
435 0 : pub(crate) fn new() -> Self {
436 0 : Self {
437 0 : locations: HashMap::new(),
438 0 : }
439 0 : }
440 : }
441 :
442 : impl TenantShard {
443 22 : pub(crate) fn new(
444 22 : tenant_shard_id: TenantShardId,
445 22 : shard: ShardIdentity,
446 22 : policy: PlacementPolicy,
447 22 : ) -> Self {
448 22 : Self {
449 22 : tenant_shard_id,
450 22 : policy,
451 22 : intent: IntentState::default(),
452 22 : generation: Some(Generation::new(0)),
453 22 : shard,
454 22 : observed: ObservedState::default(),
455 22 : config: TenantConfig::default(),
456 22 : reconciler: None,
457 22 : splitting: SplitState::Idle,
458 22 : sequence: Sequence(1),
459 22 : delayed_reconcile: false,
460 22 : waiter: Arc::new(SeqWait::new(Sequence(0))),
461 22 : error_waiter: Arc::new(SeqWait::new(Sequence(0))),
462 22 : last_error: Arc::default(),
463 22 : pending_compute_notification: false,
464 22 : scheduling_policy: ShardSchedulingPolicy::default(),
465 22 : }
466 22 : }
467 :
468 : /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
469 : /// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
470 : /// to get an intent state that complies with placement policy. The overall goal is to do scheduling
471 : /// in a way that makes use of any configured locations that already exist in the outside world.
472 2 : pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
473 2 : // Choose an attached location by filtering observed locations, and then sorting to get the highest
474 2 : // generation
475 2 : let mut attached_locs = self
476 2 : .observed
477 2 : .locations
478 2 : .iter()
479 4 : .filter_map(|(node_id, l)| {
480 4 : if let Some(conf) = &l.conf {
481 4 : if conf.mode == LocationConfigMode::AttachedMulti
482 2 : || conf.mode == LocationConfigMode::AttachedSingle
483 2 : || conf.mode == LocationConfigMode::AttachedStale
484 : {
485 4 : Some((node_id, conf.generation))
486 : } else {
487 0 : None
488 : }
489 : } else {
490 0 : None
491 : }
492 4 : })
493 2 : .collect::<Vec<_>>();
494 2 :
495 4 : attached_locs.sort_by_key(|i| i.1);
496 2 : if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
497 2 : self.intent.set_attached(scheduler, Some(*node_id));
498 2 : }
499 :
500 : // All remaining observed locations generate secondary intents. This includes None
501 : // observations, as these may well have some local content on disk that is usable (this
502 : // is an edge case that might occur if we restarted during a migration or other change)
503 : //
504 : // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
505 : // will take care of promoting one of these secondaries to be attached.
506 4 : self.observed.locations.keys().for_each(|node_id| {
507 4 : if Some(*node_id) != self.intent.attached {
508 2 : self.intent.push_secondary(scheduler, *node_id);
509 2 : }
510 4 : });
511 2 : }
512 :
513 : /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
514 : /// attached pageserver for a shard.
515 : ///
516 : /// Returns whether we modified it, and the NodeId selected.
517 14 : fn schedule_attached(
518 14 : &mut self,
519 14 : scheduler: &mut Scheduler,
520 14 : context: &ScheduleContext,
521 14 : ) -> Result<(bool, NodeId), ScheduleError> {
522 : // No work to do if we already have an attached tenant
523 14 : if let Some(node_id) = self.intent.attached {
524 0 : return Ok((false, node_id));
525 14 : }
526 :
527 14 : if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
528 : // Promote a secondary
529 2 : tracing::debug!("Promoted secondary {} to attached", promote_secondary);
530 2 : self.intent.promote_attached(scheduler, promote_secondary);
531 2 : Ok((true, promote_secondary))
532 : } else {
533 : // Pick a fresh node: either we had no secondaries or none were schedulable
534 12 : let node_id = scheduler.schedule_shard(&self.intent.secondary, context)?;
535 12 : tracing::debug!("Selected {} as attached", node_id);
536 12 : self.intent.set_attached(scheduler, Some(node_id));
537 12 : Ok((true, node_id))
538 : }
539 14 : }
540 :
541 16 : pub(crate) fn schedule(
542 16 : &mut self,
543 16 : scheduler: &mut Scheduler,
544 16 : context: &mut ScheduleContext,
545 16 : ) -> Result<(), ScheduleError> {
546 16 : let r = self.do_schedule(scheduler, context);
547 16 :
548 16 : context.avoid(&self.intent.all_pageservers());
549 16 : if let Some(attached) = self.intent.get_attached() {
550 14 : context.push_attached(*attached);
551 14 : }
552 :
553 16 : r
554 16 : }
555 :
556 16 : pub(crate) fn do_schedule(
557 16 : &mut self,
558 16 : scheduler: &mut Scheduler,
559 16 : context: &ScheduleContext,
560 16 : ) -> Result<(), ScheduleError> {
561 16 : // TODO: before scheduling new nodes, check if any existing content in
562 16 : // self.intent refers to pageservers that are offline, and pick other
563 16 : // pageservers if so.
564 16 :
565 16 : // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
566 16 : // change their attach location.
567 16 :
568 16 : match self.scheduling_policy {
569 14 : ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
570 : ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
571 : // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
572 2 : tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
573 0 : "Scheduling is disabled by policy {:?}", self.scheduling_policy);
574 2 : return Ok(());
575 : }
576 : }
577 :
578 : // Build the set of pageservers already in use by this tenant, to avoid scheduling
579 : // more work on the same pageservers we're already using.
580 14 : let mut modified = false;
581 14 :
582 14 : // Add/remove nodes to fulfil policy
583 14 : use PlacementPolicy::*;
584 14 : match self.policy {
585 14 : Attached(secondary_count) => {
586 14 : let retain_secondaries = if self.intent.attached.is_none()
587 14 : && scheduler.node_preferred(&self.intent.secondary).is_some()
588 : {
589 : // If we have no attached, and one of the secondaries is elegible to be promoted, retain
590 : // one more secondary than we usually would, as one of them will become attached futher down this function.
591 2 : secondary_count + 1
592 : } else {
593 12 : secondary_count
594 : };
595 :
596 14 : while self.intent.secondary.len() > retain_secondaries {
597 0 : // We have no particular preference for one secondary location over another: just
598 0 : // arbitrarily drop from the end
599 0 : self.intent.pop_secondary(scheduler);
600 0 : modified = true;
601 0 : }
602 :
603 : // Should have exactly one attached, and N secondaries
604 14 : let (modified_attached, attached_node_id) =
605 14 : self.schedule_attached(scheduler, context)?;
606 14 : modified |= modified_attached;
607 14 :
608 14 : let mut used_pageservers = vec![attached_node_id];
609 26 : while self.intent.secondary.len() < secondary_count {
610 12 : let node_id = scheduler.schedule_shard(&used_pageservers, context)?;
611 12 : self.intent.push_secondary(scheduler, node_id);
612 12 : used_pageservers.push(node_id);
613 12 : modified = true;
614 : }
615 : }
616 : Secondary => {
617 0 : if let Some(node_id) = self.intent.get_attached() {
618 0 : // Populate secondary by demoting the attached node
619 0 : self.intent.demote_attached(scheduler, *node_id);
620 0 : modified = true;
621 0 : } else if self.intent.secondary.is_empty() {
622 0 : // Populate secondary by scheduling a fresh node
623 0 : let node_id = scheduler.schedule_shard(&[], context)?;
624 0 : self.intent.push_secondary(scheduler, node_id);
625 0 : modified = true;
626 0 : }
627 0 : while self.intent.secondary.len() > 1 {
628 0 : // We have no particular preference for one secondary location over another: just
629 0 : // arbitrarily drop from the end
630 0 : self.intent.pop_secondary(scheduler);
631 0 : modified = true;
632 0 : }
633 : }
634 : Detached => {
635 : // Never add locations in this mode
636 0 : if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
637 0 : self.intent.clear(scheduler);
638 0 : modified = true;
639 0 : }
640 : }
641 : }
642 :
643 14 : if modified {
644 14 : self.sequence.0 += 1;
645 14 : }
646 :
647 14 : Ok(())
648 16 : }
649 :
650 : /// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
651 : /// if the swap is not possible and leaves the intent state in its original state.
652 : ///
653 : /// Arguments:
654 : /// `attached_to`: the currently attached location matching the intent state (may be None if the
655 : /// shard is not attached)
656 : /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
657 : /// the scheduler to recommend a node
658 0 : pub(crate) fn reschedule_to_secondary(
659 0 : &mut self,
660 0 : promote_to: Option<NodeId>,
661 0 : scheduler: &mut Scheduler,
662 0 : ) -> Result<(), ScheduleError> {
663 0 : let promote_to = match promote_to {
664 0 : Some(node) => node,
665 0 : None => match scheduler.node_preferred(self.intent.get_secondary()) {
666 0 : Some(node) => node,
667 : None => {
668 0 : return Err(ScheduleError::ImpossibleConstraint);
669 : }
670 : },
671 : };
672 :
673 0 : assert!(self.intent.get_secondary().contains(&promote_to));
674 :
675 0 : if let Some(node) = self.intent.get_attached() {
676 0 : let demoted = self.intent.demote_attached(scheduler, *node);
677 0 : if !demoted {
678 0 : return Err(ScheduleError::ImpossibleConstraint);
679 0 : }
680 0 : }
681 :
682 0 : self.intent.promote_attached(scheduler, promote_to);
683 0 :
684 0 : // Increment the sequence number for the edge case where a
685 0 : // reconciler is already running to avoid waiting on the
686 0 : // current reconcile instead of spawning a new one.
687 0 : self.sequence = self.sequence.next();
688 0 :
689 0 : Ok(())
690 0 : }
691 :
692 : /// Optimize attachments: if a shard has a secondary location that is preferable to
693 : /// its primary location based on soft constraints, switch that secondary location
694 : /// to be attached.
695 30 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
696 : pub(crate) fn optimize_attachment(
697 : &self,
698 : nodes: &HashMap<NodeId, Node>,
699 : schedule_context: &ScheduleContext,
700 : ) -> Option<ScheduleOptimization> {
701 : let attached = (*self.intent.get_attached())?;
702 : if self.intent.secondary.is_empty() {
703 : // We can only do useful work if we have both attached and secondary locations: this
704 : // function doesn't schedule new locations, only swaps between attached and secondaries.
705 : return None;
706 : }
707 :
708 : let current_affinity_score = schedule_context.get_node_affinity(attached);
709 : let current_attachment_count = schedule_context.get_node_attachments(attached);
710 :
711 : // Generate score for each node, dropping any un-schedulable nodes.
712 : let all_pageservers = self.intent.all_pageservers();
713 : let mut scores = all_pageservers
714 : .iter()
715 60 : .flat_map(|node_id| {
716 60 : let node = nodes.get(node_id);
717 60 : if node.is_none() {
718 0 : None
719 60 : } else if matches!(
720 60 : node.unwrap().get_scheduling(),
721 : NodeSchedulingPolicy::Filling
722 : ) {
723 : // If the node is currently filling, don't count it as a candidate to avoid,
724 : // racing with the background fill.
725 0 : None
726 60 : } else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
727 0 : None
728 : } else {
729 60 : let affinity_score = schedule_context.get_node_affinity(*node_id);
730 60 : let attachment_count = schedule_context.get_node_attachments(*node_id);
731 60 : Some((*node_id, affinity_score, attachment_count))
732 : }
733 60 : })
734 : .collect::<Vec<_>>();
735 :
736 : // Sort precedence:
737 : // 1st - prefer nodes with the lowest total affinity score
738 : // 2nd - prefer nodes with the lowest number of attachments in this context
739 : // 3rd - if all else is equal, sort by node ID for determinism in tests.
740 60 : scores.sort_by_key(|i| (i.1, i.2, i.0));
741 :
742 : if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
743 : scores.first()
744 : {
745 : if attached != *preferred_node {
746 : // The best alternative must be more than 1 better than us, otherwise we could end
747 : // up flapping back next time we're called (e.g. there's no point migrating from
748 : // a location with score 1 to a score zero, because on next location the situation
749 : // would be the same, but in reverse).
750 : if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
751 : || current_attachment_count > *preferred_attachment_count + 1
752 : {
753 : tracing::info!(
754 : "Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
755 : self.intent.get_secondary()
756 : );
757 : return Some(ScheduleOptimization {
758 : sequence: self.sequence,
759 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
760 : old_attached_node_id: attached,
761 : new_attached_node_id: *preferred_node,
762 : }),
763 : });
764 : }
765 : } else {
766 : tracing::debug!(
767 : "Node {} is already preferred (score {:?})",
768 : preferred_node,
769 : preferred_affinity_score
770 : );
771 : }
772 : }
773 :
774 : // Fall-through: we didn't find an optimization
775 : None
776 : }
777 :
778 24 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
779 : pub(crate) fn optimize_secondary(
780 : &self,
781 : scheduler: &Scheduler,
782 : schedule_context: &ScheduleContext,
783 : ) -> Option<ScheduleOptimization> {
784 : if self.intent.secondary.is_empty() {
785 : // We can only do useful work if we have both attached and secondary locations: this
786 : // function doesn't schedule new locations, only swaps between attached and secondaries.
787 : return None;
788 : }
789 :
790 : for secondary in self.intent.get_secondary() {
791 : let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
792 : // We're already on a node unaffected any affinity constraints,
793 : // so we won't change it.
794 : continue;
795 : };
796 :
797 : // Let the scheduler suggest a node, where it would put us if we were scheduling afresh
798 : // This implicitly limits the choice to nodes that are available, and prefers nodes
799 : // with lower utilization.
800 : let Ok(candidate_node) =
801 : scheduler.schedule_shard(&self.intent.all_pageservers(), schedule_context)
802 : else {
803 : // A scheduling error means we have no possible candidate replacements
804 : continue;
805 : };
806 :
807 : let candidate_affinity_score = schedule_context
808 : .nodes
809 : .get(&candidate_node)
810 : .unwrap_or(&AffinityScore::FREE);
811 :
812 : // The best alternative must be more than 1 better than us, otherwise we could end
813 : // up flapping back next time we're called.
814 : if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
815 : // If some other node is available and has a lower score than this node, then
816 : // that other node is a good place to migrate to.
817 : tracing::info!(
818 : "Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
819 : self.intent.get_secondary()
820 : );
821 : return Some(ScheduleOptimization {
822 : sequence: self.sequence,
823 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
824 : old_node_id: *secondary,
825 : new_node_id: candidate_node,
826 : }),
827 : });
828 : }
829 : }
830 :
831 : None
832 : }
833 :
834 : /// Return true if the optimization was really applied: it will not be applied if the optimization's
835 : /// sequence is behind this tenant shard's
836 18 : pub(crate) fn apply_optimization(
837 18 : &mut self,
838 18 : scheduler: &mut Scheduler,
839 18 : optimization: ScheduleOptimization,
840 18 : ) -> bool {
841 18 : if optimization.sequence != self.sequence {
842 0 : return false;
843 18 : }
844 18 :
845 18 : metrics::METRICS_REGISTRY
846 18 : .metrics_group
847 18 : .storage_controller_schedule_optimization
848 18 : .inc();
849 18 :
850 18 : match optimization.action {
851 : ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
852 8 : old_attached_node_id,
853 8 : new_attached_node_id,
854 8 : }) => {
855 8 : self.intent.demote_attached(scheduler, old_attached_node_id);
856 8 : self.intent
857 8 : .promote_attached(scheduler, new_attached_node_id);
858 8 : }
859 : ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
860 10 : old_node_id,
861 10 : new_node_id,
862 10 : }) => {
863 10 : self.intent.remove_secondary(scheduler, old_node_id);
864 10 : self.intent.push_secondary(scheduler, new_node_id);
865 10 : }
866 : }
867 :
868 18 : true
869 18 : }
870 :
871 : /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
872 : /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
873 : /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
874 : ///
875 : /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
876 : /// funciton should not be used to decide whether to reconcile.
877 0 : pub(crate) fn stably_attached(&self) -> Option<NodeId> {
878 0 : if let Some(attach_intent) = self.intent.attached {
879 0 : match self.observed.locations.get(&attach_intent) {
880 0 : Some(loc) => match &loc.conf {
881 0 : Some(conf) => match conf.mode {
882 : LocationConfigMode::AttachedMulti
883 : | LocationConfigMode::AttachedSingle
884 : | LocationConfigMode::AttachedStale => {
885 : // Our intent and observed state agree that this node is in an attached state.
886 0 : Some(attach_intent)
887 : }
888 : // Our observed config is not an attached state
889 0 : _ => None,
890 : },
891 : // Our observed state is None, i.e. in flux
892 0 : None => None,
893 : },
894 : // We have no observed state for this node
895 0 : None => None,
896 : }
897 : } else {
898 : // Our intent is not to attach
899 0 : None
900 : }
901 0 : }
902 :
903 0 : fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
904 0 : let mut dirty_nodes = HashSet::new();
905 :
906 0 : if let Some(node_id) = self.intent.attached {
907 : // Maybe panic: it is a severe bug if we try to attach while generation is null.
908 0 : let generation = self
909 0 : .generation
910 0 : .expect("Attempted to enter attached state without a generation");
911 0 :
912 0 : let wanted_conf =
913 0 : attached_location_conf(generation, &self.shard, &self.config, &self.policy);
914 0 : match self.observed.locations.get(&node_id) {
915 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
916 0 : Some(_) | None => {
917 0 : dirty_nodes.insert(node_id);
918 0 : }
919 : }
920 0 : }
921 :
922 0 : for node_id in &self.intent.secondary {
923 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
924 0 : match self.observed.locations.get(node_id) {
925 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
926 0 : Some(_) | None => {
927 0 : dirty_nodes.insert(*node_id);
928 0 : }
929 : }
930 : }
931 :
932 0 : for node_id in self.observed.locations.keys() {
933 0 : if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
934 0 : // We have observed state that isn't part of our intent: need to clean it up.
935 0 : dirty_nodes.insert(*node_id);
936 0 : }
937 : }
938 :
939 0 : dirty_nodes.retain(|node_id| {
940 0 : nodes
941 0 : .get(node_id)
942 0 : .map(|n| n.is_available())
943 0 : .unwrap_or(false)
944 0 : });
945 0 :
946 0 : !dirty_nodes.is_empty()
947 0 : }
948 :
949 : #[allow(clippy::too_many_arguments)]
950 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
951 : pub(crate) fn get_reconcile_needed(
952 : &mut self,
953 : pageservers: &Arc<HashMap<NodeId, Node>>,
954 : ) -> ReconcileNeeded {
955 : // If there are any ambiguous observed states, and the nodes they refer to are available,
956 : // we should reconcile to clean them up.
957 : let mut dirty_observed = false;
958 : for (node_id, observed_loc) in &self.observed.locations {
959 : let node = pageservers
960 : .get(node_id)
961 : .expect("Nodes may not be removed while referenced");
962 : if observed_loc.conf.is_none() && node.is_available() {
963 : dirty_observed = true;
964 : break;
965 : }
966 : }
967 :
968 : let active_nodes_dirty = self.dirty(pageservers);
969 :
970 : // Even if there is no pageserver work to be done, if we have a pending notification to computes,
971 : // wake up a reconciler to send it.
972 : let do_reconcile =
973 : active_nodes_dirty || dirty_observed || self.pending_compute_notification;
974 :
975 : if !do_reconcile {
976 : tracing::debug!("Not dirty, no reconciliation needed.");
977 : return ReconcileNeeded::No;
978 : }
979 :
980 : // If we are currently splitting, then never start a reconciler task: the splitting logic
981 : // requires that shards are not interfered with while it runs. Do this check here rather than
982 : // up top, so that we only log this message if we would otherwise have done a reconciliation.
983 : if !matches!(self.splitting, SplitState::Idle) {
984 : tracing::info!("Refusing to reconcile, splitting in progress");
985 : return ReconcileNeeded::No;
986 : }
987 :
988 : // Reconcile already in flight for the current sequence?
989 : if let Some(handle) = &self.reconciler {
990 : if handle.sequence == self.sequence {
991 : tracing::info!(
992 : "Reconciliation already in progress for sequence {:?}",
993 : self.sequence,
994 : );
995 : return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
996 : tenant_shard_id: self.tenant_shard_id,
997 : seq_wait: self.waiter.clone(),
998 : error_seq_wait: self.error_waiter.clone(),
999 : error: self.last_error.clone(),
1000 : seq: self.sequence,
1001 : });
1002 : }
1003 : }
1004 :
1005 : // Pre-checks done: finally check whether we may actually do the work
1006 : match self.scheduling_policy {
1007 : ShardSchedulingPolicy::Active
1008 : | ShardSchedulingPolicy::Essential
1009 : | ShardSchedulingPolicy::Pause => {}
1010 : ShardSchedulingPolicy::Stop => {
1011 : // We only reach this point if there is work to do and we're going to skip
1012 : // doing it: warn it obvious why this tenant isn't doing what it ought to.
1013 : tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
1014 : return ReconcileNeeded::No;
1015 : }
1016 : }
1017 :
1018 : ReconcileNeeded::Yes
1019 : }
1020 :
1021 : /// Ensure the sequence number is set to a value where waiting for this value will make us wait
1022 : /// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
1023 : ///
1024 : /// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
1025 : /// that the waiter will not complete until some future Reconciler is constructed and run.
1026 0 : fn ensure_sequence_ahead(&mut self) {
1027 0 : // Find the highest sequence for which a Reconciler has previously run or is currently
1028 0 : // running
1029 0 : let max_seen = std::cmp::max(
1030 0 : self.reconciler
1031 0 : .as_ref()
1032 0 : .map(|r| r.sequence)
1033 0 : .unwrap_or(Sequence(0)),
1034 0 : std::cmp::max(self.waiter.load(), self.error_waiter.load()),
1035 0 : );
1036 0 :
1037 0 : if self.sequence <= max_seen {
1038 0 : self.sequence = max_seen.next();
1039 0 : }
1040 0 : }
1041 :
1042 : /// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
1043 : ///
1044 : /// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
1045 : /// you would like to wait on the next reconciler that gets spawned in the background.
1046 0 : pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
1047 0 : self.ensure_sequence_ahead();
1048 0 :
1049 0 : ReconcilerWaiter {
1050 0 : tenant_shard_id: self.tenant_shard_id,
1051 0 : seq_wait: self.waiter.clone(),
1052 0 : error_seq_wait: self.error_waiter.clone(),
1053 0 : error: self.last_error.clone(),
1054 0 : seq: self.sequence,
1055 0 : }
1056 0 : }
1057 :
1058 : #[allow(clippy::too_many_arguments)]
1059 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1060 : pub(crate) fn spawn_reconciler(
1061 : &mut self,
1062 : result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
1063 : pageservers: &Arc<HashMap<NodeId, Node>>,
1064 : compute_hook: &Arc<ComputeHook>,
1065 : service_config: &service::Config,
1066 : persistence: &Arc<Persistence>,
1067 : units: ReconcileUnits,
1068 : gate_guard: GateGuard,
1069 : cancel: &CancellationToken,
1070 : ) -> Option<ReconcilerWaiter> {
1071 : // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
1072 : // doing our sequence's work.
1073 : let old_handle = self.reconciler.take();
1074 :
1075 : // Build list of nodes from which the reconciler should detach
1076 : let mut detach = Vec::new();
1077 : for node_id in self.observed.locations.keys() {
1078 : if self.intent.get_attached() != &Some(*node_id)
1079 : && !self.intent.secondary.contains(node_id)
1080 : {
1081 : detach.push(
1082 : pageservers
1083 : .get(node_id)
1084 : .expect("Intent references non-existent pageserver")
1085 : .clone(),
1086 : )
1087 : }
1088 : }
1089 :
1090 : // Advance the sequence before spawning a reconciler, so that sequence waiters
1091 : // can distinguish between before+after the reconcile completes.
1092 : self.ensure_sequence_ahead();
1093 :
1094 : let reconciler_cancel = cancel.child_token();
1095 : let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
1096 : let mut reconciler = Reconciler {
1097 : tenant_shard_id: self.tenant_shard_id,
1098 : shard: self.shard,
1099 : placement_policy: self.policy.clone(),
1100 : generation: self.generation,
1101 : intent: reconciler_intent,
1102 : detach,
1103 : config: self.config.clone(),
1104 : observed: self.observed.clone(),
1105 : compute_hook: compute_hook.clone(),
1106 : service_config: service_config.clone(),
1107 : _gate_guard: gate_guard,
1108 : _resource_units: units,
1109 : cancel: reconciler_cancel.clone(),
1110 : persistence: persistence.clone(),
1111 : compute_notify_failure: false,
1112 : };
1113 :
1114 : let reconcile_seq = self.sequence;
1115 :
1116 : tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
1117 : let must_notify = self.pending_compute_notification;
1118 : let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
1119 : tenant_id=%reconciler.tenant_shard_id.tenant_id,
1120 : shard_id=%reconciler.tenant_shard_id.shard_slug());
1121 : metrics::METRICS_REGISTRY
1122 : .metrics_group
1123 : .storage_controller_reconcile_spawn
1124 : .inc();
1125 : let result_tx = result_tx.clone();
1126 : let join_handle = tokio::task::spawn(
1127 0 : async move {
1128 : // Wait for any previous reconcile task to complete before we start
1129 0 : if let Some(old_handle) = old_handle {
1130 0 : old_handle.cancel.cancel();
1131 0 : if let Err(e) = old_handle.handle.await {
1132 : // We can't do much with this other than log it: the task is done, so
1133 : // we may proceed with our work.
1134 0 : tracing::error!("Unexpected join error waiting for reconcile task: {e}");
1135 0 : }
1136 0 : }
1137 :
1138 : // Early check for cancellation before doing any work
1139 : // TODO: wrap all remote API operations in cancellation check
1140 : // as well.
1141 0 : if reconciler.cancel.is_cancelled() {
1142 0 : metrics::METRICS_REGISTRY
1143 0 : .metrics_group
1144 0 : .storage_controller_reconcile_complete
1145 0 : .inc(ReconcileCompleteLabelGroup {
1146 0 : status: ReconcileOutcome::Cancel,
1147 0 : });
1148 0 : return;
1149 0 : }
1150 :
1151 : // Attempt to make observed state match intent state
1152 0 : let result = reconciler.reconcile().await;
1153 :
1154 : // If we know we had a pending compute notification from some previous action, send a notification irrespective
1155 : // of whether the above reconcile() did any work
1156 0 : if result.is_ok() && must_notify {
1157 : // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
1158 0 : reconciler.compute_notify().await.ok();
1159 0 : }
1160 :
1161 : // Update result counter
1162 0 : let outcome_label = match &result {
1163 0 : Ok(_) => ReconcileOutcome::Success,
1164 0 : Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
1165 0 : Err(_) => ReconcileOutcome::Error,
1166 : };
1167 :
1168 0 : metrics::METRICS_REGISTRY
1169 0 : .metrics_group
1170 0 : .storage_controller_reconcile_complete
1171 0 : .inc(ReconcileCompleteLabelGroup {
1172 0 : status: outcome_label,
1173 0 : });
1174 0 :
1175 0 : // Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
1176 0 : // try and schedule more work in response to our result.
1177 0 : let result = ReconcileResult {
1178 0 : sequence: reconcile_seq,
1179 0 : result,
1180 0 : tenant_shard_id: reconciler.tenant_shard_id,
1181 0 : generation: reconciler.generation,
1182 0 : observed: reconciler.observed,
1183 0 : pending_compute_notification: reconciler.compute_notify_failure,
1184 0 : };
1185 0 :
1186 0 : result_tx.send(result).ok();
1187 0 : }
1188 : .instrument(reconciler_span),
1189 : );
1190 :
1191 : self.reconciler = Some(ReconcilerHandle {
1192 : sequence: self.sequence,
1193 : handle: join_handle,
1194 : cancel: reconciler_cancel,
1195 : });
1196 :
1197 : Some(ReconcilerWaiter {
1198 : tenant_shard_id: self.tenant_shard_id,
1199 : seq_wait: self.waiter.clone(),
1200 : error_seq_wait: self.error_waiter.clone(),
1201 : error: self.last_error.clone(),
1202 : seq: self.sequence,
1203 : })
1204 : }
1205 :
1206 : /// Get a waiter for any reconciliation in flight, but do not start reconciliation
1207 : /// if it is not already running
1208 0 : pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
1209 0 : if self.reconciler.is_some() {
1210 0 : Some(ReconcilerWaiter {
1211 0 : tenant_shard_id: self.tenant_shard_id,
1212 0 : seq_wait: self.waiter.clone(),
1213 0 : error_seq_wait: self.error_waiter.clone(),
1214 0 : error: self.last_error.clone(),
1215 0 : seq: self.sequence,
1216 0 : })
1217 : } else {
1218 0 : None
1219 : }
1220 0 : }
1221 :
1222 : /// Called when a ReconcileResult has been emitted and the service is updating
1223 : /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
1224 : /// the handle to indicate there is no longer a reconciliation in progress.
1225 0 : pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
1226 0 : if let Some(reconcile_handle) = &self.reconciler {
1227 0 : if reconcile_handle.sequence <= sequence {
1228 0 : self.reconciler = None;
1229 0 : }
1230 0 : }
1231 0 : }
1232 :
1233 : /// If we had any state at all referring to this node ID, drop it. Does not
1234 : /// attempt to reschedule.
1235 : ///
1236 : /// Returns true if we modified the node's intent state.
1237 0 : pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
1238 0 : let mut intent_modified = false;
1239 0 :
1240 0 : // Drop if this node was our attached intent
1241 0 : if self.intent.attached == Some(node_id) {
1242 0 : self.intent.attached = None;
1243 0 : intent_modified = true;
1244 0 : }
1245 :
1246 : // Drop from the list of secondaries, and check if we modified it
1247 0 : let had_secondaries = self.intent.secondary.len();
1248 0 : self.intent.secondary.retain(|n| n != &node_id);
1249 0 : intent_modified |= self.intent.secondary.len() != had_secondaries;
1250 0 :
1251 0 : debug_assert!(!self.intent.all_pageservers().contains(&node_id));
1252 :
1253 0 : intent_modified
1254 0 : }
1255 :
1256 0 : pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
1257 0 : self.scheduling_policy = p;
1258 0 : }
1259 :
1260 0 : pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
1261 0 : &self.scheduling_policy
1262 0 : }
1263 :
1264 0 : pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
1265 0 : // Ordering: always set last_error before advancing sequence, so that sequence
1266 0 : // waiters are guaranteed to see a Some value when they see an error.
1267 0 : *(self.last_error.lock().unwrap()) = Some(Arc::new(error));
1268 0 : self.error_waiter.advance(sequence);
1269 0 : }
1270 :
1271 0 : pub(crate) fn from_persistent(
1272 0 : tsp: TenantShardPersistence,
1273 0 : intent: IntentState,
1274 0 : ) -> anyhow::Result<Self> {
1275 0 : let tenant_shard_id = tsp.get_tenant_shard_id()?;
1276 0 : let shard_identity = tsp.get_shard_identity()?;
1277 :
1278 0 : Ok(Self {
1279 0 : tenant_shard_id,
1280 0 : shard: shard_identity,
1281 0 : sequence: Sequence::initial(),
1282 0 : generation: tsp.generation.map(|g| Generation::new(g as u32)),
1283 0 : policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
1284 0 : intent,
1285 0 : observed: ObservedState::new(),
1286 0 : config: serde_json::from_str(&tsp.config).unwrap(),
1287 0 : reconciler: None,
1288 0 : splitting: tsp.splitting,
1289 0 : waiter: Arc::new(SeqWait::new(Sequence::initial())),
1290 0 : error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
1291 0 : last_error: Arc::default(),
1292 0 : pending_compute_notification: false,
1293 0 : delayed_reconcile: false,
1294 0 : scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
1295 0 : })
1296 0 : }
1297 :
1298 0 : pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
1299 0 : TenantShardPersistence {
1300 0 : tenant_id: self.tenant_shard_id.tenant_id.to_string(),
1301 0 : shard_number: self.tenant_shard_id.shard_number.0 as i32,
1302 0 : shard_count: self.tenant_shard_id.shard_count.literal() as i32,
1303 0 : shard_stripe_size: self.shard.stripe_size.0 as i32,
1304 0 : generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
1305 0 : generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
1306 0 : placement_policy: serde_json::to_string(&self.policy).unwrap(),
1307 0 : config: serde_json::to_string(&self.config).unwrap(),
1308 0 : splitting: SplitState::default(),
1309 0 : scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
1310 0 : }
1311 0 : }
1312 : }
1313 :
1314 : #[cfg(test)]
1315 : pub(crate) mod tests {
1316 : use pageserver_api::{
1317 : controller_api::NodeAvailability,
1318 : shard::{ShardCount, ShardNumber},
1319 : };
1320 : use utils::id::TenantId;
1321 :
1322 : use crate::scheduler::test_utils::make_test_nodes;
1323 :
1324 : use super::*;
1325 :
1326 14 : fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
1327 14 : let tenant_id = TenantId::generate();
1328 14 : let shard_number = ShardNumber(0);
1329 14 : let shard_count = ShardCount::new(1);
1330 14 :
1331 14 : let tenant_shard_id = TenantShardId {
1332 14 : tenant_id,
1333 14 : shard_number,
1334 14 : shard_count,
1335 14 : };
1336 14 : TenantShard::new(
1337 14 : tenant_shard_id,
1338 14 : ShardIdentity::new(
1339 14 : shard_number,
1340 14 : shard_count,
1341 14 : pageserver_api::shard::ShardStripeSize(32768),
1342 14 : )
1343 14 : .unwrap(),
1344 14 : policy,
1345 14 : )
1346 14 : }
1347 :
1348 2 : fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec<TenantShard> {
1349 2 : let tenant_id = TenantId::generate();
1350 2 :
1351 2 : (0..shard_count.count())
1352 8 : .map(|i| {
1353 8 : let shard_number = ShardNumber(i);
1354 8 :
1355 8 : let tenant_shard_id = TenantShardId {
1356 8 : tenant_id,
1357 8 : shard_number,
1358 8 : shard_count,
1359 8 : };
1360 8 : TenantShard::new(
1361 8 : tenant_shard_id,
1362 8 : ShardIdentity::new(
1363 8 : shard_number,
1364 8 : shard_count,
1365 8 : pageserver_api::shard::ShardStripeSize(32768),
1366 8 : )
1367 8 : .unwrap(),
1368 8 : policy.clone(),
1369 8 : )
1370 8 : })
1371 2 : .collect()
1372 2 : }
1373 :
1374 : /// Test the scheduling behaviors used when a tenant configured for HA is subject
1375 : /// to nodes being marked offline.
1376 : #[test]
1377 2 : fn tenant_ha_scheduling() -> anyhow::Result<()> {
1378 2 : // Start with three nodes. Our tenant will only use two. The third one is
1379 2 : // expected to remain unused.
1380 2 : let mut nodes = make_test_nodes(3);
1381 2 :
1382 2 : let mut scheduler = Scheduler::new(nodes.values());
1383 2 : let mut context = ScheduleContext::default();
1384 2 :
1385 2 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1386 2 : tenant_shard
1387 2 : .schedule(&mut scheduler, &mut context)
1388 2 : .expect("we have enough nodes, scheduling should work");
1389 2 :
1390 2 : // Expect to initially be schedule on to different nodes
1391 2 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
1392 2 : assert!(tenant_shard.intent.attached.is_some());
1393 :
1394 2 : let attached_node_id = tenant_shard.intent.attached.unwrap();
1395 2 : let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
1396 2 : assert_ne!(attached_node_id, secondary_node_id);
1397 :
1398 : // Notifying the attached node is offline should demote it to a secondary
1399 2 : let changed = tenant_shard
1400 2 : .intent
1401 2 : .demote_attached(&mut scheduler, attached_node_id);
1402 2 : assert!(changed);
1403 2 : assert!(tenant_shard.intent.attached.is_none());
1404 2 : assert_eq!(tenant_shard.intent.secondary.len(), 2);
1405 :
1406 : // Update the scheduler state to indicate the node is offline
1407 2 : nodes
1408 2 : .get_mut(&attached_node_id)
1409 2 : .unwrap()
1410 2 : .set_availability(NodeAvailability::Offline);
1411 2 : scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
1412 2 :
1413 2 : // Scheduling the node should promote the still-available secondary node to attached
1414 2 : tenant_shard
1415 2 : .schedule(&mut scheduler, &mut context)
1416 2 : .expect("active nodes are available");
1417 2 : assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
1418 :
1419 : // The original attached node should have been retained as a secondary
1420 2 : assert_eq!(
1421 2 : *tenant_shard.intent.secondary.iter().last().unwrap(),
1422 2 : attached_node_id
1423 2 : );
1424 :
1425 2 : tenant_shard.intent.clear(&mut scheduler);
1426 2 :
1427 2 : Ok(())
1428 2 : }
1429 :
1430 : #[test]
1431 2 : fn intent_from_observed() -> anyhow::Result<()> {
1432 2 : let nodes = make_test_nodes(3);
1433 2 : let mut scheduler = Scheduler::new(nodes.values());
1434 2 :
1435 2 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1436 2 :
1437 2 : tenant_shard.observed.locations.insert(
1438 2 : NodeId(3),
1439 2 : ObservedStateLocation {
1440 2 : conf: Some(LocationConfig {
1441 2 : mode: LocationConfigMode::AttachedMulti,
1442 2 : generation: Some(2),
1443 2 : secondary_conf: None,
1444 2 : shard_number: tenant_shard.shard.number.0,
1445 2 : shard_count: tenant_shard.shard.count.literal(),
1446 2 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1447 2 : tenant_conf: TenantConfig::default(),
1448 2 : }),
1449 2 : },
1450 2 : );
1451 2 :
1452 2 : tenant_shard.observed.locations.insert(
1453 2 : NodeId(2),
1454 2 : ObservedStateLocation {
1455 2 : conf: Some(LocationConfig {
1456 2 : mode: LocationConfigMode::AttachedStale,
1457 2 : generation: Some(1),
1458 2 : secondary_conf: None,
1459 2 : shard_number: tenant_shard.shard.number.0,
1460 2 : shard_count: tenant_shard.shard.count.literal(),
1461 2 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1462 2 : tenant_conf: TenantConfig::default(),
1463 2 : }),
1464 2 : },
1465 2 : );
1466 2 :
1467 2 : tenant_shard.intent_from_observed(&mut scheduler);
1468 2 :
1469 2 : // The highest generationed attached location gets used as attached
1470 2 : assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
1471 : // Other locations get used as secondary
1472 2 : assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
1473 :
1474 2 : scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
1475 :
1476 2 : tenant_shard.intent.clear(&mut scheduler);
1477 2 : Ok(())
1478 2 : }
1479 :
1480 : #[test]
1481 2 : fn scheduling_mode() -> anyhow::Result<()> {
1482 2 : let nodes = make_test_nodes(3);
1483 2 : let mut scheduler = Scheduler::new(nodes.values());
1484 2 :
1485 2 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1486 2 :
1487 2 : // In pause mode, schedule() shouldn't do anything
1488 2 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
1489 2 : assert!(tenant_shard
1490 2 : .schedule(&mut scheduler, &mut ScheduleContext::default())
1491 2 : .is_ok());
1492 2 : assert!(tenant_shard.intent.all_pageservers().is_empty());
1493 :
1494 : // In active mode, schedule() works
1495 2 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
1496 2 : assert!(tenant_shard
1497 2 : .schedule(&mut scheduler, &mut ScheduleContext::default())
1498 2 : .is_ok());
1499 2 : assert!(!tenant_shard.intent.all_pageservers().is_empty());
1500 :
1501 2 : tenant_shard.intent.clear(&mut scheduler);
1502 2 : Ok(())
1503 2 : }
1504 :
1505 : #[test]
1506 2 : fn optimize_attachment() -> anyhow::Result<()> {
1507 2 : let nodes = make_test_nodes(3);
1508 2 : let mut scheduler = Scheduler::new(nodes.values());
1509 2 :
1510 2 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
1511 2 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
1512 2 :
1513 2 : // Initially: both nodes attached on shard 1, and both have secondary locations
1514 2 : // on different nodes.
1515 2 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1516 2 : shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
1517 2 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1518 2 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
1519 2 :
1520 2 : let mut schedule_context = ScheduleContext::default();
1521 2 : schedule_context.avoid(&shard_a.intent.all_pageservers());
1522 2 : schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
1523 2 : schedule_context.avoid(&shard_b.intent.all_pageservers());
1524 2 : schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
1525 2 :
1526 2 : let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
1527 2 :
1528 2 : // Either shard should recognize that it has the option to switch to a secondary location where there
1529 2 : // would be no other shards from the same tenant, and request to do so.
1530 2 : assert_eq!(
1531 2 : optimization_a,
1532 2 : Some(ScheduleOptimization {
1533 2 : sequence: shard_a.sequence,
1534 2 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1535 2 : old_attached_node_id: NodeId(1),
1536 2 : new_attached_node_id: NodeId(2)
1537 2 : })
1538 2 : })
1539 2 : );
1540 :
1541 : // Note that these optimizing two shards in the same tenant with the same ScheduleContext is
1542 : // mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
1543 : // of [`Service::optimize_all`] to avoid trying
1544 : // to do optimizations for multiple shards in the same tenant at the same time. Generating
1545 : // both optimizations is just done for test purposes
1546 2 : let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
1547 2 : assert_eq!(
1548 2 : optimization_b,
1549 2 : Some(ScheduleOptimization {
1550 2 : sequence: shard_b.sequence,
1551 2 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1552 2 : old_attached_node_id: NodeId(1),
1553 2 : new_attached_node_id: NodeId(3)
1554 2 : })
1555 2 : })
1556 2 : );
1557 :
1558 : // Applying these optimizations should result in the end state proposed
1559 2 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
1560 2 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
1561 2 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
1562 2 : shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
1563 2 : assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
1564 2 : assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
1565 :
1566 2 : shard_a.intent.clear(&mut scheduler);
1567 2 : shard_b.intent.clear(&mut scheduler);
1568 2 :
1569 2 : Ok(())
1570 2 : }
1571 :
1572 : #[test]
1573 2 : fn optimize_secondary() -> anyhow::Result<()> {
1574 2 : let nodes = make_test_nodes(4);
1575 2 : let mut scheduler = Scheduler::new(nodes.values());
1576 2 :
1577 2 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
1578 2 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
1579 2 :
1580 2 : // Initially: both nodes attached on shard 1, and both have secondary locations
1581 2 : // on different nodes.
1582 2 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1583 2 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
1584 2 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
1585 2 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
1586 2 :
1587 2 : let mut schedule_context = ScheduleContext::default();
1588 2 : schedule_context.avoid(&shard_a.intent.all_pageservers());
1589 2 : schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
1590 2 : schedule_context.avoid(&shard_b.intent.all_pageservers());
1591 2 : schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
1592 2 :
1593 2 : let optimization_a = shard_a.optimize_secondary(&scheduler, &schedule_context);
1594 2 :
1595 2 : // Since there is a node with no locations available, the node with two locations for the
1596 2 : // same tenant should generate an optimization to move one away
1597 2 : assert_eq!(
1598 2 : optimization_a,
1599 2 : Some(ScheduleOptimization {
1600 2 : sequence: shard_a.sequence,
1601 2 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
1602 2 : old_node_id: NodeId(3),
1603 2 : new_node_id: NodeId(4)
1604 2 : })
1605 2 : })
1606 2 : );
1607 :
1608 2 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
1609 2 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
1610 2 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
1611 :
1612 2 : shard_a.intent.clear(&mut scheduler);
1613 2 : shard_b.intent.clear(&mut scheduler);
1614 2 :
1615 2 : Ok(())
1616 2 : }
1617 :
1618 : // Optimize til quiescent: this emulates what Service::optimize_all does, when
1619 : // called repeatedly in the background.
1620 2 : fn optimize_til_idle(
1621 2 : nodes: &HashMap<NodeId, Node>,
1622 2 : scheduler: &mut Scheduler,
1623 2 : shards: &mut [TenantShard],
1624 2 : ) {
1625 2 : let mut loop_n = 0;
1626 : loop {
1627 14 : let mut schedule_context = ScheduleContext::default();
1628 14 : let mut any_changed = false;
1629 :
1630 56 : for shard in shards.iter() {
1631 56 : schedule_context.avoid(&shard.intent.all_pageservers());
1632 56 : if let Some(attached) = shard.intent.get_attached() {
1633 56 : schedule_context.push_attached(*attached);
1634 56 : }
1635 : }
1636 :
1637 26 : for shard in shards.iter_mut() {
1638 26 : let optimization = shard.optimize_attachment(nodes, &schedule_context);
1639 26 : if let Some(optimization) = optimization {
1640 4 : shard.apply_optimization(scheduler, optimization);
1641 4 : any_changed = true;
1642 4 : break;
1643 22 : }
1644 22 :
1645 22 : let optimization = shard.optimize_secondary(scheduler, &schedule_context);
1646 22 : if let Some(optimization) = optimization {
1647 8 : shard.apply_optimization(scheduler, optimization);
1648 8 : any_changed = true;
1649 8 : break;
1650 14 : }
1651 : }
1652 :
1653 14 : if !any_changed {
1654 2 : break;
1655 12 : }
1656 12 :
1657 12 : // Assert no infinite loop
1658 12 : loop_n += 1;
1659 12 : assert!(loop_n < 1000);
1660 : }
1661 2 : }
1662 :
1663 : /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
1664 : /// that it converges.
1665 : #[test]
1666 2 : fn optimize_add_nodes() -> anyhow::Result<()> {
1667 2 : let nodes = make_test_nodes(4);
1668 2 :
1669 2 : // Only show the scheduler a couple of nodes
1670 2 : let mut scheduler = Scheduler::new([].iter());
1671 2 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1672 2 : scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
1673 2 :
1674 2 : let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
1675 2 : let mut schedule_context = ScheduleContext::default();
1676 10 : for shard in &mut shards {
1677 8 : assert!(shard
1678 8 : .schedule(&mut scheduler, &mut schedule_context)
1679 8 : .is_ok());
1680 : }
1681 :
1682 : // We should see equal number of locations on the two nodes.
1683 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
1684 2 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
1685 :
1686 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
1687 2 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
1688 :
1689 : // Add another two nodes: we should see the shards spread out when their optimize
1690 : // methods are called
1691 2 : scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
1692 2 : scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
1693 2 : optimize_til_idle(&nodes, &mut scheduler, &mut shards);
1694 2 :
1695 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
1696 2 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
1697 :
1698 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
1699 2 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
1700 :
1701 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
1702 2 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1);
1703 :
1704 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
1705 2 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1);
1706 :
1707 8 : for shard in shards.iter_mut() {
1708 8 : shard.intent.clear(&mut scheduler);
1709 8 : }
1710 :
1711 2 : Ok(())
1712 2 : }
1713 : }
|