Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::sync::Arc;
3 : use std::time::Duration;
4 :
5 : use futures::future::{self, Either};
6 : use itertools::Itertools;
7 : use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy, ShardSchedulingPolicy};
8 : use pageserver_api::models::{LocationConfig, LocationConfigMode, TenantConfig};
9 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
10 : use serde::{Deserialize, Serialize};
11 : use tokio::task::JoinHandle;
12 : use tokio_util::sync::CancellationToken;
13 : use tracing::{Instrument, instrument};
14 : use utils::generation::Generation;
15 : use utils::id::NodeId;
16 : use utils::seqwait::{SeqWait, SeqWaitError};
17 : use utils::shard::ShardCount;
18 : use utils::sync::gate::GateGuard;
19 :
20 : use crate::compute_hook::ComputeHook;
21 : use crate::metrics::{
22 : self, ReconcileCompleteLabelGroup, ReconcileLongRunningLabelGroup, ReconcileOutcome,
23 : };
24 : use crate::node::Node;
25 : use crate::persistence::split_state::SplitState;
26 : use crate::persistence::{Persistence, TenantShardPersistence};
27 : use crate::reconciler::{
28 : ReconcileError, ReconcileUnits, Reconciler, ReconcilerConfig, TargetState,
29 : attached_location_conf, secondary_location_conf,
30 : };
31 : use crate::scheduler::{
32 : AffinityScore, AttachedShardTag, NodeSchedulingScore, NodeSecondarySchedulingScore,
33 : RefCountUpdate, ScheduleContext, ScheduleError, Scheduler, SecondaryShardTag, ShardTag,
34 : };
35 : use crate::service::ReconcileResultRequest;
36 : use crate::{Sequence, service};
37 :
38 : /// Serialization helper
39 0 : fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
40 0 : where
41 0 : S: serde::ser::Serializer,
42 0 : T: std::fmt::Display,
43 0 : {
44 0 : serializer.collect_str(
45 0 : &v.lock()
46 0 : .unwrap()
47 0 : .as_ref()
48 0 : .map(|e| format!("{e}"))
49 0 : .unwrap_or("".to_string()),
50 0 : )
51 0 : }
52 :
53 : /// In-memory state for a particular tenant shard.
54 : ///
55 : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
56 : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
57 0 : #[derive(Serialize)]
58 : pub(crate) struct TenantShard {
59 : pub(crate) tenant_shard_id: TenantShardId,
60 :
61 : pub(crate) shard: ShardIdentity,
62 :
63 : // Runtime only: sequence used to coordinate when updating this object while
64 : // with background reconcilers may be running. A reconciler runs to a particular
65 : // sequence.
66 : pub(crate) sequence: Sequence,
67 :
68 : // Latest generation number: next time we attach, increment this
69 : // and use the incremented number when attaching.
70 : //
71 : // None represents an incompletely onboarded tenant via the [`Service::location_config`]
72 : // API, where this tenant may only run in PlacementPolicy::Secondary.
73 : pub(crate) generation: Option<Generation>,
74 :
75 : // High level description of how the tenant should be set up. Provided
76 : // externally.
77 : pub(crate) policy: PlacementPolicy,
78 :
79 : // Low level description of exactly which pageservers should fulfil
80 : // which role. Generated by `Self::schedule`.
81 : pub(crate) intent: IntentState,
82 :
83 : // Low level description of how the tenant is configured on pageservers:
84 : // if this does not match `Self::intent` then the tenant needs reconciliation
85 : // with `Self::reconcile`.
86 : pub(crate) observed: ObservedState,
87 :
88 : // Tenant configuration, passed through opaquely to the pageserver. Identical
89 : // for all shards in a tenant.
90 : pub(crate) config: TenantConfig,
91 :
92 : /// If a reconcile task is currently in flight, it may be joined here (it is
93 : /// only safe to join if either the result has been received or the reconciler's
94 : /// cancellation token has been fired)
95 : #[serde(skip)]
96 : pub(crate) reconciler: Option<ReconcilerHandle>,
97 :
98 : /// If a tenant is being split, then all shards with that TenantId will have a
99 : /// SplitState set, this acts as a guard against other operations such as background
100 : /// reconciliation, and timeline creation.
101 : pub(crate) splitting: SplitState,
102 :
103 : /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
104 : /// is set. This flag is cleared when the tenant is popped off the delay queue.
105 : pub(crate) delayed_reconcile: bool,
106 :
107 : /// Optionally wait for reconciliation to complete up to a particular
108 : /// sequence number.
109 : #[serde(skip)]
110 : pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
111 :
112 : /// Indicates sequence number for which we have encountered an error reconciling. If
113 : /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
114 : /// and callers should stop waiting for `waiter` and propagate the error.
115 : #[serde(skip)]
116 : pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
117 :
118 : /// The most recent error from a reconcile on this tenant. This is a nested Arc
119 : /// because:
120 : /// - ReconcileWaiters need to Arc-clone the overall object to read it later
121 : /// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
122 : /// many waiters for one shard, and the underlying error types are not Clone.
123 : ///
124 : /// TODO: generalize to an array of recent events
125 : /// TOOD: use a ArcSwap instead of mutex for faster reads?
126 : #[serde(serialize_with = "read_last_error")]
127 : pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
128 :
129 : /// If we have a pending compute notification that for some reason we weren't able to send,
130 : /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
131 : /// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
132 : /// of state that we publish externally in an eventually consistent way.
133 : pub(crate) pending_compute_notification: bool,
134 :
135 : // Support/debug tool: if something is going wrong or flapping with scheduling, this may
136 : // be set to a non-active state to avoid making changes while the issue is fixed.
137 : scheduling_policy: ShardSchedulingPolicy,
138 : }
139 :
140 : #[derive(Clone, Debug, Serialize)]
141 : pub(crate) struct IntentState {
142 : attached: Option<NodeId>,
143 : secondary: Vec<NodeId>,
144 :
145 : // We should attempt to schedule this shard in the provided AZ to
146 : // decrease chances of cross-AZ compute.
147 : preferred_az_id: Option<AvailabilityZone>,
148 : }
149 :
150 : impl IntentState {
151 12858 : pub(crate) fn new(preferred_az_id: Option<AvailabilityZone>) -> Self {
152 12858 : Self {
153 12858 : attached: None,
154 12858 : secondary: vec![],
155 12858 : preferred_az_id,
156 12858 : }
157 12858 : }
158 0 : pub(crate) fn single(
159 0 : scheduler: &mut Scheduler,
160 0 : node_id: Option<NodeId>,
161 0 : preferred_az_id: Option<AvailabilityZone>,
162 0 : ) -> Self {
163 0 : if let Some(node_id) = node_id {
164 0 : scheduler.update_node_ref_counts(
165 0 : node_id,
166 0 : preferred_az_id.as_ref(),
167 0 : RefCountUpdate::Attach,
168 0 : );
169 0 : }
170 0 : Self {
171 0 : attached: node_id,
172 0 : secondary: vec![],
173 0 : preferred_az_id,
174 0 : }
175 0 : }
176 :
177 12857 : pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
178 12857 : if self.attached != new_attached {
179 12857 : if let Some(old_attached) = self.attached.take() {
180 0 : scheduler.update_node_ref_counts(
181 0 : old_attached,
182 0 : self.preferred_az_id.as_ref(),
183 0 : RefCountUpdate::Detach,
184 0 : );
185 12857 : }
186 12857 : if let Some(new_attached) = &new_attached {
187 12857 : scheduler.update_node_ref_counts(
188 12857 : *new_attached,
189 12857 : self.preferred_az_id.as_ref(),
190 12857 : RefCountUpdate::Attach,
191 12857 : );
192 12857 : }
193 12857 : self.attached = new_attached;
194 0 : }
195 :
196 12857 : if let Some(new_attached) = &new_attached {
197 12857 : assert!(!self.secondary.contains(new_attached));
198 0 : }
199 12857 : }
200 :
201 : /// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
202 : /// secondary to attached while maintaining the scheduler's reference counts.
203 6 : pub(crate) fn promote_attached(
204 6 : &mut self,
205 6 : scheduler: &mut Scheduler,
206 6 : promote_secondary: NodeId,
207 6 : ) {
208 6 : // If we call this with a node that isn't in secondary, it would cause incorrect
209 6 : // scheduler reference counting, since we assume the node is already referenced as a secondary.
210 6 : debug_assert!(self.secondary.contains(&promote_secondary));
211 :
212 13 : self.secondary.retain(|n| n != &promote_secondary);
213 6 :
214 6 : let demoted = self.attached;
215 6 : self.attached = Some(promote_secondary);
216 6 :
217 6 : scheduler.update_node_ref_counts(
218 6 : promote_secondary,
219 6 : self.preferred_az_id.as_ref(),
220 6 : RefCountUpdate::PromoteSecondary,
221 6 : );
222 6 : if let Some(demoted) = demoted {
223 0 : scheduler.update_node_ref_counts(
224 0 : demoted,
225 0 : self.preferred_az_id.as_ref(),
226 0 : RefCountUpdate::DemoteAttached,
227 0 : );
228 6 : }
229 6 : }
230 :
231 12845 : pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
232 12845 : assert!(!self.secondary.contains(&new_secondary));
233 12845 : assert!(self.attached != Some(new_secondary));
234 12845 : scheduler.update_node_ref_counts(
235 12845 : new_secondary,
236 12845 : self.preferred_az_id.as_ref(),
237 12845 : RefCountUpdate::AddSecondary,
238 12845 : );
239 12845 : self.secondary.push(new_secondary);
240 12845 : }
241 :
242 : /// It is legal to call this with a node that is not currently a secondary: that is a no-op
243 8 : pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
244 11 : let index = self.secondary.iter().position(|n| *n == node_id);
245 8 : if let Some(index) = index {
246 8 : scheduler.update_node_ref_counts(
247 8 : node_id,
248 8 : self.preferred_az_id.as_ref(),
249 8 : RefCountUpdate::RemoveSecondary,
250 8 : );
251 8 : self.secondary.remove(index);
252 8 : }
253 8 : }
254 :
255 12857 : pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
256 12857 : for secondary in self.secondary.drain(..) {
257 12838 : scheduler.update_node_ref_counts(
258 12838 : secondary,
259 12838 : self.preferred_az_id.as_ref(),
260 12838 : RefCountUpdate::RemoveSecondary,
261 12838 : );
262 12838 : }
263 12857 : }
264 :
265 : /// Remove the last secondary node from the list of secondaries
266 0 : pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
267 0 : if let Some(node_id) = self.secondary.pop() {
268 0 : scheduler.update_node_ref_counts(
269 0 : node_id,
270 0 : self.preferred_az_id.as_ref(),
271 0 : RefCountUpdate::RemoveSecondary,
272 0 : );
273 0 : }
274 0 : }
275 :
276 12857 : pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
277 12857 : if let Some(old_attached) = self.attached.take() {
278 12855 : scheduler.update_node_ref_counts(
279 12855 : old_attached,
280 12855 : self.preferred_az_id.as_ref(),
281 12855 : RefCountUpdate::Detach,
282 12855 : );
283 12855 : }
284 :
285 12857 : self.clear_secondary(scheduler);
286 12857 : }
287 :
288 12897 : pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
289 12897 : let mut result = Vec::new();
290 12897 : if let Some(p) = self.attached {
291 12892 : result.push(p)
292 5 : }
293 :
294 12897 : result.extend(self.secondary.iter().copied());
295 12897 :
296 12897 : result
297 12897 : }
298 :
299 12618 : pub(crate) fn get_attached(&self) -> &Option<NodeId> {
300 12618 : &self.attached
301 12618 : }
302 :
303 12696 : pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
304 12696 : &self.secondary
305 12696 : }
306 :
307 : /// If the node is in use as the attached location, demote it into
308 : /// the list of secondary locations. This is used when a node goes offline,
309 : /// and we want to use a different node for attachment, but not permanently
310 : /// forget the location on the offline node.
311 : ///
312 : /// Returns true if a change was made
313 7 : pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
314 7 : if self.attached == Some(node_id) {
315 7 : self.attached = None;
316 7 : self.secondary.push(node_id);
317 7 : scheduler.update_node_ref_counts(
318 7 : node_id,
319 7 : self.preferred_az_id.as_ref(),
320 7 : RefCountUpdate::DemoteAttached,
321 7 : );
322 7 : true
323 : } else {
324 0 : false
325 : }
326 7 : }
327 : }
328 :
329 : impl Drop for IntentState {
330 12858 : fn drop(&mut self) {
331 12858 : // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
332 12858 : // We do not check this while panicking, to avoid polluting unit test failures or
333 12858 : // other assertions with this assertion's output. It's still wrong to leak these,
334 12858 : // but if we already have a panic then we don't need to independently flag this case.
335 12858 : if !(std::thread::panicking()) {
336 12858 : debug_assert!(self.attached.is_none() && self.secondary.is_empty());
337 0 : }
338 12857 : }
339 : }
340 :
341 0 : #[derive(Default, Clone, Serialize, Deserialize, Debug)]
342 : pub(crate) struct ObservedState {
343 : pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
344 : }
345 :
346 : /// Our latest knowledge of how this tenant is configured in the outside world.
347 : ///
348 : /// Meaning:
349 : /// * No instance of this type exists for a node: we are certain that we have nothing configured on that
350 : /// node for this shard.
351 : /// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
352 : /// what it is (e.g. we failed partway through configuring it)
353 : /// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
354 : /// and that configuration will still be present unless something external interfered.
355 0 : #[derive(Clone, Serialize, Deserialize, Debug)]
356 : pub(crate) struct ObservedStateLocation {
357 : /// If None, it means we do not know the status of this shard's location on this node, but
358 : /// we know that we might have some state on this node.
359 : pub(crate) conf: Option<LocationConfig>,
360 : }
361 :
362 : pub(crate) struct ReconcilerWaiter {
363 : // For observability purposes, remember the ID of the shard we're
364 : // waiting for.
365 : pub(crate) tenant_shard_id: TenantShardId,
366 :
367 : seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
368 : error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
369 : error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
370 : seq: Sequence,
371 : }
372 :
373 : pub(crate) enum ReconcilerStatus {
374 : Done,
375 : Failed,
376 : InProgress,
377 : }
378 :
379 : #[derive(thiserror::Error, Debug)]
380 : pub(crate) enum ReconcileWaitError {
381 : #[error("Timeout waiting for shard {0}")]
382 : Timeout(TenantShardId),
383 : #[error("shutting down")]
384 : Shutdown,
385 : #[error("Reconcile error on shard {0}: {1}")]
386 : Failed(TenantShardId, Arc<ReconcileError>),
387 : }
388 :
389 : #[derive(Eq, PartialEq, Debug, Clone)]
390 : pub(crate) struct ReplaceSecondary {
391 : old_node_id: NodeId,
392 : new_node_id: NodeId,
393 : }
394 :
395 : #[derive(Eq, PartialEq, Debug, Clone)]
396 : pub(crate) struct MigrateAttachment {
397 : pub(crate) old_attached_node_id: NodeId,
398 : pub(crate) new_attached_node_id: NodeId,
399 : }
400 :
401 : #[derive(Eq, PartialEq, Debug, Clone)]
402 : pub(crate) enum ScheduleOptimizationAction {
403 : // Replace one of our secondary locations with a different node
404 : ReplaceSecondary(ReplaceSecondary),
405 : // Migrate attachment to an existing secondary location
406 : MigrateAttachment(MigrateAttachment),
407 : // Create a secondary location, with the intent of later migrating to it
408 : CreateSecondary(NodeId),
409 : // Remove a secondary location that we previously created to facilitate a migration
410 : RemoveSecondary(NodeId),
411 : }
412 :
413 : #[derive(Eq, PartialEq, Debug, Clone)]
414 : pub(crate) struct ScheduleOptimization {
415 : // What was the reconcile sequence when we generated this optimization? The optimization
416 : // should only be applied if the shard's sequence is still at this value, in case other changes
417 : // happened between planning the optimization and applying it.
418 : sequence: Sequence,
419 :
420 : pub(crate) action: ScheduleOptimizationAction,
421 : }
422 :
423 : impl ReconcilerWaiter {
424 0 : pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
425 0 : tokio::select! {
426 0 : result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
427 0 : result.map_err(|e| match e {
428 0 : SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
429 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
430 0 : })?;
431 : },
432 0 : result = self.error_seq_wait.wait_for(self.seq) => {
433 0 : result.map_err(|e| match e {
434 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
435 0 : SeqWaitError::Timeout => unreachable!()
436 0 : })?;
437 :
438 0 : return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
439 0 : self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
440 : }
441 : }
442 :
443 0 : Ok(())
444 0 : }
445 :
446 0 : pub(crate) fn get_status(&self) -> ReconcilerStatus {
447 0 : if self.seq_wait.would_wait_for(self.seq).is_ok() {
448 0 : ReconcilerStatus::Done
449 0 : } else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
450 0 : ReconcilerStatus::Failed
451 : } else {
452 0 : ReconcilerStatus::InProgress
453 : }
454 0 : }
455 : }
456 :
457 : /// Having spawned a reconciler task, the tenant shard's state will carry enough
458 : /// information to optionally cancel & await it later.
459 : pub(crate) struct ReconcilerHandle {
460 : sequence: Sequence,
461 : handle: JoinHandle<()>,
462 : cancel: CancellationToken,
463 : }
464 :
465 : pub(crate) enum ReconcileNeeded {
466 : /// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
467 : /// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
468 : No,
469 : /// shard has a reconciler running, and its intent hasn't changed since that one was
470 : /// spawned: wait for the existing reconciler rather than spawning a new one.
471 : WaitExisting(ReconcilerWaiter),
472 : /// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
473 : Yes(ReconcileReason),
474 : }
475 :
476 : #[derive(Debug)]
477 : pub(crate) enum ReconcileReason {
478 : ActiveNodesDirty,
479 : UnknownLocation,
480 : PendingComputeNotification,
481 : }
482 :
483 : /// Pending modification to the observed state of a tenant shard.
484 : /// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
485 : pub(crate) enum ObservedStateDelta {
486 : Upsert(Box<(NodeId, ObservedStateLocation)>),
487 : Delete(NodeId),
488 : }
489 :
490 : impl ObservedStateDelta {
491 0 : pub(crate) fn node_id(&self) -> &NodeId {
492 0 : match self {
493 0 : Self::Upsert(up) => &up.0,
494 0 : Self::Delete(nid) => nid,
495 : }
496 0 : }
497 : }
498 :
499 : /// When a reconcile task completes, it sends this result object
500 : /// to be applied to the primary TenantShard.
501 : pub(crate) struct ReconcileResult {
502 : pub(crate) sequence: Sequence,
503 : /// On errors, `observed` should be treated as an incompleted description
504 : /// of state (i.e. any nodes present in the result should override nodes
505 : /// present in the parent tenant state, but any unmentioned nodes should
506 : /// not be removed from parent tenant state)
507 : pub(crate) result: Result<(), ReconcileError>,
508 :
509 : pub(crate) tenant_shard_id: TenantShardId,
510 : pub(crate) generation: Option<Generation>,
511 : pub(crate) observed_deltas: Vec<ObservedStateDelta>,
512 :
513 : /// Set [`TenantShard::pending_compute_notification`] from this flag
514 : pub(crate) pending_compute_notification: bool,
515 : }
516 :
517 : impl ObservedState {
518 0 : pub(crate) fn new() -> Self {
519 0 : Self {
520 0 : locations: HashMap::new(),
521 0 : }
522 0 : }
523 :
524 0 : pub(crate) fn is_empty(&self) -> bool {
525 0 : self.locations.is_empty()
526 0 : }
527 : }
528 :
529 : impl TenantShard {
530 12841 : pub(crate) fn new(
531 12841 : tenant_shard_id: TenantShardId,
532 12841 : shard: ShardIdentity,
533 12841 : policy: PlacementPolicy,
534 12841 : preferred_az_id: Option<AvailabilityZone>,
535 12841 : ) -> Self {
536 12841 : metrics::METRICS_REGISTRY
537 12841 : .metrics_group
538 12841 : .storage_controller_tenant_shards
539 12841 : .inc();
540 12841 :
541 12841 : Self {
542 12841 : tenant_shard_id,
543 12841 : policy,
544 12841 : intent: IntentState::new(preferred_az_id),
545 12841 : generation: Some(Generation::new(0)),
546 12841 : shard,
547 12841 : observed: ObservedState::default(),
548 12841 : config: TenantConfig::default(),
549 12841 : reconciler: None,
550 12841 : splitting: SplitState::Idle,
551 12841 : sequence: Sequence(1),
552 12841 : delayed_reconcile: false,
553 12841 : waiter: Arc::new(SeqWait::new(Sequence(0))),
554 12841 : error_waiter: Arc::new(SeqWait::new(Sequence(0))),
555 12841 : last_error: Arc::default(),
556 12841 : pending_compute_notification: false,
557 12841 : scheduling_policy: ShardSchedulingPolicy::default(),
558 12841 : }
559 12841 : }
560 :
561 : /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
562 : /// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
563 : /// to get an intent state that complies with placement policy. The overall goal is to do scheduling
564 : /// in a way that makes use of any configured locations that already exist in the outside world.
565 1 : pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
566 1 : // Choose an attached location by filtering observed locations, and then sorting to get the highest
567 1 : // generation
568 1 : let mut attached_locs = self
569 1 : .observed
570 1 : .locations
571 1 : .iter()
572 2 : .filter_map(|(node_id, l)| {
573 2 : if let Some(conf) = &l.conf {
574 2 : if conf.mode == LocationConfigMode::AttachedMulti
575 1 : || conf.mode == LocationConfigMode::AttachedSingle
576 1 : || conf.mode == LocationConfigMode::AttachedStale
577 : {
578 2 : Some((node_id, conf.generation))
579 : } else {
580 0 : None
581 : }
582 : } else {
583 0 : None
584 : }
585 2 : })
586 1 : .collect::<Vec<_>>();
587 1 :
588 2 : attached_locs.sort_by_key(|i| i.1);
589 1 : if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
590 1 : self.intent.set_attached(scheduler, Some(*node_id));
591 1 : }
592 :
593 : // All remaining observed locations generate secondary intents. This includes None
594 : // observations, as these may well have some local content on disk that is usable (this
595 : // is an edge case that might occur if we restarted during a migration or other change)
596 : //
597 : // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
598 : // will take care of promoting one of these secondaries to be attached.
599 2 : self.observed.locations.keys().for_each(|node_id| {
600 2 : if Some(*node_id) != self.intent.attached {
601 1 : self.intent.push_secondary(scheduler, *node_id);
602 1 : }
603 2 : });
604 1 : }
605 :
606 : /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
607 : /// attached pageserver for a shard.
608 : ///
609 : /// Returns whether we modified it, and the NodeId selected.
610 12831 : fn schedule_attached(
611 12831 : &mut self,
612 12831 : scheduler: &mut Scheduler,
613 12831 : context: &ScheduleContext,
614 12831 : ) -> Result<(bool, NodeId), ScheduleError> {
615 : // No work to do if we already have an attached tenant
616 12831 : if let Some(node_id) = self.intent.attached {
617 0 : return Ok((false, node_id));
618 12831 : }
619 :
620 12831 : if let Some(promote_secondary) = self.preferred_secondary(scheduler) {
621 : // Promote a secondary
622 2 : tracing::debug!("Promoted secondary {} to attached", promote_secondary);
623 2 : self.intent.promote_attached(scheduler, promote_secondary);
624 2 : Ok((true, promote_secondary))
625 : } else {
626 : // Pick a fresh node: either we had no secondaries or none were schedulable
627 12829 : let node_id = scheduler.schedule_shard::<AttachedShardTag>(
628 12829 : &self.intent.secondary,
629 12829 : &self.intent.preferred_az_id,
630 12829 : context,
631 12829 : )?;
632 12829 : tracing::debug!("Selected {} as attached", node_id);
633 12829 : self.intent.set_attached(scheduler, Some(node_id));
634 12829 : Ok((true, node_id))
635 : }
636 12831 : }
637 :
638 : #[instrument(skip_all, fields(
639 : tenant_id=%self.tenant_shard_id.tenant_id,
640 : shard_id=%self.tenant_shard_id.shard_slug(),
641 : sequence=%self.sequence
642 : ))]
643 : pub(crate) fn schedule(
644 : &mut self,
645 : scheduler: &mut Scheduler,
646 : context: &mut ScheduleContext,
647 : ) -> Result<(), ScheduleError> {
648 : let r = self.do_schedule(scheduler, context);
649 :
650 : context.avoid(&self.intent.all_pageservers());
651 :
652 : r
653 : }
654 :
655 12835 : pub(crate) fn do_schedule(
656 12835 : &mut self,
657 12835 : scheduler: &mut Scheduler,
658 12835 : context: &ScheduleContext,
659 12835 : ) -> Result<(), ScheduleError> {
660 12835 : // TODO: before scheduling new nodes, check if any existing content in
661 12835 : // self.intent refers to pageservers that are offline, and pick other
662 12835 : // pageservers if so.
663 12835 :
664 12835 : // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
665 12835 : // change their attach location.
666 12835 :
667 12835 : match self.scheduling_policy {
668 12834 : ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
669 : ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
670 : // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
671 1 : tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
672 0 : "Scheduling is disabled by policy {:?}", self.scheduling_policy);
673 1 : return Ok(());
674 : }
675 : }
676 :
677 : // Build the set of pageservers already in use by this tenant, to avoid scheduling
678 : // more work on the same pageservers we're already using.
679 12834 : let mut modified = false;
680 :
681 : // Add/remove nodes to fulfil policy
682 : use PlacementPolicy::*;
683 12834 : match self.policy {
684 12831 : Attached(secondary_count) => {
685 : // Should have exactly one attached, and at least N secondaries
686 12831 : let (modified_attached, attached_node_id) =
687 12831 : self.schedule_attached(scheduler, context)?;
688 12831 : modified |= modified_attached;
689 12831 :
690 12831 : let mut used_pageservers = vec![attached_node_id];
691 25661 : while self.intent.secondary.len() < secondary_count {
692 12830 : let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
693 12830 : &used_pageservers,
694 12830 : &self.intent.preferred_az_id,
695 12830 : context,
696 12830 : )?;
697 12830 : self.intent.push_secondary(scheduler, node_id);
698 12830 : used_pageservers.push(node_id);
699 12830 : modified = true;
700 : }
701 : }
702 : Secondary => {
703 3 : if let Some(node_id) = self.intent.get_attached() {
704 2 : // Populate secondary by demoting the attached node
705 2 : self.intent.demote_attached(scheduler, *node_id);
706 2 :
707 2 : modified = true;
708 2 : } else if self.intent.secondary.is_empty() {
709 : // Populate secondary by scheduling a fresh node
710 : //
711 : // We use [`AttachedShardTag`] because when a secondary location is the only one
712 : // a shard has, we expect that its next use will be as an attached location: we want
713 : // the tenant to be ready to warm up and run fast in their preferred AZ.
714 1 : let node_id = scheduler.schedule_shard::<AttachedShardTag>(
715 1 : &[],
716 1 : &self.intent.preferred_az_id,
717 1 : context,
718 1 : )?;
719 1 : self.intent.push_secondary(scheduler, node_id);
720 1 : modified = true;
721 0 : }
722 4 : while self.intent.secondary.len() > 1 {
723 1 : // If we have multiple secondaries (e.g. when transitioning from Attached to Secondary and
724 1 : // having just demoted our attached location), then we should prefer to keep the location
725 1 : // in our preferred AZ. Tenants in Secondary mode want to be in the preferred AZ so that
726 1 : // they have a warm location to become attached when transitioning back into Attached.
727 1 :
728 1 : let mut candidates = self.intent.get_secondary().clone();
729 1 : // Sort to get secondaries outside preferred AZ last
730 1 : candidates
731 2 : .sort_by_key(|n| scheduler.get_node_az(n).as_ref() != self.preferred_az());
732 1 : let secondary_to_remove = candidates.pop().unwrap();
733 1 : self.intent.remove_secondary(scheduler, secondary_to_remove);
734 1 : modified = true;
735 1 : }
736 : }
737 : Detached => {
738 : // Never add locations in this mode
739 0 : if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
740 0 : self.intent.clear(scheduler);
741 0 : modified = true;
742 0 : }
743 : }
744 : }
745 :
746 12834 : if modified {
747 12834 : self.sequence.0 += 1;
748 12834 : }
749 :
750 12834 : Ok(())
751 12835 : }
752 :
753 : /// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
754 : /// if the swap is not possible and leaves the intent state in its original state.
755 : ///
756 : /// Arguments:
757 : /// `attached_to`: the currently attached location matching the intent state (may be None if the
758 : /// shard is not attached)
759 : /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
760 : /// the scheduler to recommend a node
761 0 : pub(crate) fn reschedule_to_secondary(
762 0 : &mut self,
763 0 : promote_to: Option<NodeId>,
764 0 : scheduler: &mut Scheduler,
765 0 : ) -> Result<(), ScheduleError> {
766 0 : let promote_to = match promote_to {
767 0 : Some(node) => node,
768 0 : None => match self.preferred_secondary(scheduler) {
769 0 : Some(node) => node,
770 : None => {
771 0 : return Err(ScheduleError::ImpossibleConstraint);
772 : }
773 : },
774 : };
775 :
776 0 : assert!(self.intent.get_secondary().contains(&promote_to));
777 :
778 0 : if let Some(node) = self.intent.get_attached() {
779 0 : let demoted = self.intent.demote_attached(scheduler, *node);
780 0 : if !demoted {
781 0 : return Err(ScheduleError::ImpossibleConstraint);
782 0 : }
783 0 : }
784 :
785 0 : self.intent.promote_attached(scheduler, promote_to);
786 0 :
787 0 : // Increment the sequence number for the edge case where a
788 0 : // reconciler is already running to avoid waiting on the
789 0 : // current reconcile instead of spawning a new one.
790 0 : self.sequence = self.sequence.next();
791 0 :
792 0 : Ok(())
793 0 : }
794 :
795 : /// Returns None if the current location's score is unavailable, i.e. cannot draw a conclusion
796 63 : fn is_better_location<T: ShardTag>(
797 63 : &self,
798 63 : scheduler: &mut Scheduler,
799 63 : schedule_context: &ScheduleContext,
800 63 : current: NodeId,
801 63 : candidate: NodeId,
802 63 : ) -> Option<bool> {
803 63 : let Some(candidate_score) = scheduler.compute_node_score::<T::Score>(
804 63 : candidate,
805 63 : &self.intent.preferred_az_id,
806 63 : schedule_context,
807 63 : ) else {
808 : // The candidate node is unavailable for scheduling or otherwise couldn't get a score
809 1 : return None;
810 : };
811 :
812 62 : match scheduler.compute_node_score::<T::Score>(
813 62 : current,
814 62 : &self.intent.preferred_az_id,
815 62 : schedule_context,
816 62 : ) {
817 62 : Some(current_score) => {
818 62 : // Ignore utilization components when comparing scores: we don't want to migrate
819 62 : // because of transient load variations, it risks making the system thrash, and
820 62 : // migrating for utilization requires a separate high level view of the system to
821 62 : // e.g. prioritize moving larger or smaller tenants, rather than arbitrarily
822 62 : // moving things around in the order that we hit this function.
823 62 : let candidate_score = candidate_score.for_optimization();
824 62 : let current_score = current_score.for_optimization();
825 62 :
826 62 : if candidate_score < current_score {
827 8 : tracing::info!(
828 0 : "Found a lower scoring location! {candidate} is better than {current} ({candidate_score:?} is better than {current_score:?})"
829 : );
830 8 : Some(true)
831 : } else {
832 : // The candidate node is no better than our current location, so don't migrate
833 54 : tracing::debug!(
834 0 : "Candidate node {candidate} is no better than our current location {current} (candidate {candidate_score:?} vs current {current_score:?})",
835 : );
836 54 : Some(false)
837 : }
838 : }
839 : None => {
840 : // The current node is unavailable for scheduling, so we can't make any sensible
841 : // decisions about optimisation. This should be a transient state -- if the node
842 : // is offline then it will get evacuated, if is blocked by a scheduling mode
843 : // then we will respect that mode by doing nothing.
844 0 : tracing::debug!("Current node {current} is unavailable for scheduling");
845 0 : None
846 : }
847 : }
848 63 : }
849 :
850 47 : fn find_better_location<T: ShardTag>(
851 47 : &self,
852 47 : scheduler: &mut Scheduler,
853 47 : schedule_context: &ScheduleContext,
854 47 : current: NodeId,
855 47 : hard_exclude: &[NodeId],
856 47 : ) -> Option<NodeId> {
857 : // Look for a lower-scoring location to attach to
858 47 : let Ok(candidate_node) = scheduler.schedule_shard::<T>(
859 47 : hard_exclude,
860 47 : &self.intent.preferred_az_id,
861 47 : schedule_context,
862 47 : ) else {
863 : // A scheduling error means we have no possible candidate replacements
864 0 : tracing::debug!("No candidate node found");
865 0 : return None;
866 : };
867 :
868 47 : if candidate_node == current {
869 : // We're already at the best possible location, so don't migrate
870 24 : tracing::debug!("Candidate node {candidate_node} is already in use");
871 24 : return None;
872 23 : }
873 23 :
874 23 : self.is_better_location::<T>(scheduler, schedule_context, current, candidate_node)
875 23 : .and_then(|better| if better { Some(candidate_node) } else { None })
876 47 : }
877 :
878 : /// This function is an optimization, used to avoid doing large numbers of scheduling operations
879 : /// when looking for optimizations. This function uses knowledge of how scores work to do some
880 : /// fast checks for whether it may to be possible to improve a score.
881 : ///
882 : /// If we return true, it only means that optimization _might_ be possible, not that it necessarily is. If we
883 : /// return no, it definitely means that calling [`Self::optimize_attachment`] or [`Self::optimize_secondary`] would do no
884 : /// work.
885 3 : pub(crate) fn maybe_optimizable(
886 3 : &self,
887 3 : scheduler: &mut Scheduler,
888 3 : schedule_context: &ScheduleContext,
889 3 : ) -> bool {
890 3 : // Sharded tenant: check if any locations have a nonzero affinity score
891 3 : if self.shard.count >= ShardCount(1) {
892 3 : let schedule_context = schedule_context.project_detach(self);
893 5 : for node in self.intent.all_pageservers() {
894 5 : if let Some(af) = schedule_context.nodes.get(&node) {
895 5 : if *af > AffinityScore(0) {
896 3 : return true;
897 2 : }
898 0 : }
899 : }
900 0 : }
901 :
902 : // Attached tenant: check if the attachment is outside the preferred AZ
903 0 : if let PlacementPolicy::Attached(_) = self.policy {
904 0 : if let Some(attached) = self.intent.get_attached() {
905 0 : if scheduler.get_node_az(attached) != self.intent.preferred_az_id {
906 0 : return true;
907 0 : }
908 0 : }
909 0 : }
910 :
911 : // Tenant with secondary locations: check if any are within the preferred AZ
912 0 : for secondary in self.intent.get_secondary() {
913 0 : if scheduler.get_node_az(secondary) == self.intent.preferred_az_id {
914 0 : return true;
915 0 : }
916 : }
917 :
918 : // Does the tenant have excess secondaries?
919 0 : if self.intent.get_secondary().len() > self.policy.want_secondaries() {
920 0 : return true;
921 0 : }
922 0 :
923 0 : // Fall through: no optimizations possible
924 0 : false
925 3 : }
926 :
927 : /// Optimize attachments: if a shard has a secondary location that is preferable to
928 : /// its primary location based on soft constraints, switch that secondary location
929 : /// to be attached.
930 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
931 : pub(crate) fn optimize_attachment(
932 : &self,
933 : scheduler: &mut Scheduler,
934 : schedule_context: &ScheduleContext,
935 : ) -> Option<ScheduleOptimization> {
936 : let attached = (*self.intent.get_attached())?;
937 :
938 : let schedule_context = schedule_context.project_detach(self);
939 :
940 : // If we already have a secondary that is higher-scoring than out current location,
941 : // then simply migrate to it.
942 : for secondary in self.intent.get_secondary() {
943 : if let Some(true) = self.is_better_location::<AttachedShardTag>(
944 : scheduler,
945 : &schedule_context,
946 : attached,
947 : *secondary,
948 : ) {
949 : return Some(ScheduleOptimization {
950 : sequence: self.sequence,
951 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
952 : old_attached_node_id: attached,
953 : new_attached_node_id: *secondary,
954 : }),
955 : });
956 : }
957 : }
958 :
959 : // Given that none of our current secondaries is a better location than our current
960 : // attached location (checked above), we may trim any secondaries that are not needed
961 : // for the placement policy.
962 : if self.intent.get_secondary().len() > self.policy.want_secondaries() {
963 : // This code path cleans up extra secondaries after migrating, and/or
964 : // trims extra secondaries after a PlacementPolicy::Attached(N) was
965 : // modified to decrease N.
966 :
967 : let secondary_scores = self
968 : .intent
969 : .get_secondary()
970 : .iter()
971 10 : .map(|node_id| {
972 10 : (
973 10 : *node_id,
974 10 : scheduler.compute_node_score::<NodeSecondarySchedulingScore>(
975 10 : *node_id,
976 10 : &self.intent.preferred_az_id,
977 10 : &schedule_context,
978 10 : ),
979 10 : )
980 10 : })
981 : .collect::<HashMap<_, _>>();
982 :
983 10 : if secondary_scores.iter().any(|score| score.1.is_none()) {
984 : // Trivial case: if we only have one secondary, drop that one
985 : if self.intent.get_secondary().len() == 1 {
986 : return Some(ScheduleOptimization {
987 : sequence: self.sequence,
988 : action: ScheduleOptimizationAction::RemoveSecondary(
989 : *self.intent.get_secondary().first().unwrap(),
990 : ),
991 : });
992 : }
993 :
994 : // Try to find a "good" secondary to keep, without relying on scores (one or more nodes is in a state
995 : // where its score can't be calculated), and drop the others. This enables us to make progress in
996 : // most cases, even if some nodes are offline or have scheduling=pause set.
997 :
998 : debug_assert!(self.intent.attached.is_some()); // We should not make it here unless attached -- this
999 : // logic presumes we are in a mode where we want secondaries to be in non-home AZ
1000 1 : if let Some(retain_secondary) = self.intent.get_secondary().iter().find(|n| {
1001 1 : let in_home_az = scheduler.get_node_az(n) == self.intent.preferred_az_id;
1002 1 : let is_available = secondary_scores
1003 1 : .get(n)
1004 1 : .expect("Built from same list of nodes")
1005 1 : .is_some();
1006 1 : is_available && !in_home_az
1007 1 : }) {
1008 : // Great, we found one to retain. Pick some other to drop.
1009 : if let Some(victim) = self
1010 : .intent
1011 : .get_secondary()
1012 : .iter()
1013 2 : .find(|n| n != &retain_secondary)
1014 : {
1015 : return Some(ScheduleOptimization {
1016 : sequence: self.sequence,
1017 : action: ScheduleOptimizationAction::RemoveSecondary(*victim),
1018 : });
1019 : }
1020 : }
1021 :
1022 : // Fall through: we didn't identify one to remove. This ought to be rare.
1023 : tracing::warn!(
1024 : "Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)",
1025 : self.intent.get_secondary()
1026 : );
1027 : } else {
1028 : let victim = secondary_scores
1029 : .iter()
1030 8 : .max_by_key(|score| score.1.unwrap())
1031 : .unwrap()
1032 : .0;
1033 : return Some(ScheduleOptimization {
1034 : sequence: self.sequence,
1035 : action: ScheduleOptimizationAction::RemoveSecondary(*victim),
1036 : });
1037 : }
1038 : }
1039 :
1040 : let replacement = self.find_better_location::<AttachedShardTag>(
1041 : scheduler,
1042 : &schedule_context,
1043 : attached,
1044 : &[], // Don't exclude secondaries: our preferred attachment location may be a secondary
1045 : );
1046 :
1047 : // We have found a candidate and confirmed that its score is preferable
1048 : // to our current location. See if we have a secondary location in the preferred location already: if not,
1049 : // then create one.
1050 : if let Some(replacement) = replacement {
1051 : // If we are currently in non-preferred AZ, then the scheduler might suggest a location that is better, but still
1052 : // not in our preferred AZ. Migration has a cost in resources an impact to the workload, so we want to avoid doing
1053 : // multiple hops where we might go to some other AZ before eventually finding a suitable location in our preferred
1054 : // AZ: skip this optimization if it is not in our final, preferred AZ.
1055 : //
1056 : // This should be a transient state, there should always be capacity eventually in our preferred AZ (even if nodes
1057 : // there are too overloaded for scheduler to suggest them, more should be provisioned eventually).
1058 : if self.intent.preferred_az_id.is_some()
1059 : && scheduler.get_node_az(&replacement) != self.intent.preferred_az_id
1060 : {
1061 : tracing::debug!(
1062 : "Candidate node {replacement} is not in preferred AZ {:?}",
1063 : self.intent.preferred_az_id
1064 : );
1065 :
1066 : // This should only happen if our current location is not in the preferred AZ, otherwise
1067 : // [`Self::find_better_location`]` should have rejected any other location outside the preferred Az, because
1068 : // AZ is the highest priority part of NodeAttachmentSchedulingScore.
1069 : debug_assert!(scheduler.get_node_az(&attached) != self.intent.preferred_az_id);
1070 :
1071 : return None;
1072 : }
1073 :
1074 : if !self.intent.get_secondary().contains(&replacement) {
1075 : Some(ScheduleOptimization {
1076 : sequence: self.sequence,
1077 : action: ScheduleOptimizationAction::CreateSecondary(replacement),
1078 : })
1079 : } else {
1080 : // We already have a secondary in the preferred location, let's try migrating to it. Our caller
1081 : // will check the warmth of the destination before deciding whether to really execute this.
1082 : Some(ScheduleOptimization {
1083 : sequence: self.sequence,
1084 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1085 : old_attached_node_id: attached,
1086 : new_attached_node_id: replacement,
1087 : }),
1088 : })
1089 : }
1090 : } else {
1091 : // We didn't find somewhere we'd rather be, and we don't have any excess secondaries
1092 : // to clean up: no action required.
1093 : None
1094 : }
1095 : }
1096 :
1097 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1098 : pub(crate) fn optimize_secondary(
1099 : &self,
1100 : scheduler: &mut Scheduler,
1101 : schedule_context: &ScheduleContext,
1102 : ) -> Option<ScheduleOptimization> {
1103 : if self.intent.get_secondary().len() > self.policy.want_secondaries() {
1104 : // We have extra secondaries, perhaps to facilitate a migration of the attached location:
1105 : // do nothing, it is up to [`Self::optimize_attachment`] to clean them up. When that's done,
1106 : // and we are called again, we will proceed.
1107 : tracing::debug!("Too many secondaries: skipping");
1108 : return None;
1109 : }
1110 :
1111 : let schedule_context = schedule_context.project_detach(self);
1112 :
1113 : for secondary in self.intent.get_secondary() {
1114 : // Make sure we don't try to migrate a secondary to our attached location: this case happens
1115 : // easily in environments without multiple AZs.
1116 : let exclude = match self.intent.attached {
1117 : Some(attached) => vec![attached],
1118 : None => vec![],
1119 : };
1120 :
1121 : let replacement = match &self.policy {
1122 : PlacementPolicy::Attached(_) => {
1123 : // Secondaries for an attached shard should be scheduled using `SecondaryShardTag`
1124 : // to avoid placing them in the preferred AZ.
1125 : self.find_better_location::<SecondaryShardTag>(
1126 : scheduler,
1127 : &schedule_context,
1128 : *secondary,
1129 : &exclude,
1130 : )
1131 : }
1132 : PlacementPolicy::Secondary => {
1133 : // In secondary-only mode, we want our secondary locations in the preferred AZ,
1134 : // so that they're ready to take over as an attached location when we transition
1135 : // into PlacementPolicy::Attached.
1136 : self.find_better_location::<AttachedShardTag>(
1137 : scheduler,
1138 : &schedule_context,
1139 : *secondary,
1140 : &exclude,
1141 : )
1142 : }
1143 : PlacementPolicy::Detached => None,
1144 : };
1145 :
1146 : assert!(replacement != Some(*secondary));
1147 : if let Some(replacement) = replacement {
1148 : // We have found a candidate and confirmed that its score is preferable
1149 : // to our current location. See if we have a secondary location in the preferred location already: if not,
1150 : // then create one.
1151 : return Some(ScheduleOptimization {
1152 : sequence: self.sequence,
1153 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
1154 : old_node_id: *secondary,
1155 : new_node_id: replacement,
1156 : }),
1157 : });
1158 : }
1159 : }
1160 :
1161 : None
1162 : }
1163 :
1164 : /// Return true if the optimization was really applied: it will not be applied if the optimization's
1165 : /// sequence is behind this tenant shard's
1166 14 : pub(crate) fn apply_optimization(
1167 14 : &mut self,
1168 14 : scheduler: &mut Scheduler,
1169 14 : optimization: ScheduleOptimization,
1170 14 : ) -> bool {
1171 14 : if optimization.sequence != self.sequence {
1172 0 : return false;
1173 14 : }
1174 14 :
1175 14 : metrics::METRICS_REGISTRY
1176 14 : .metrics_group
1177 14 : .storage_controller_schedule_optimization
1178 14 : .inc();
1179 14 :
1180 14 : match optimization.action {
1181 : ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1182 4 : old_attached_node_id,
1183 4 : new_attached_node_id,
1184 4 : }) => {
1185 4 : self.intent.demote_attached(scheduler, old_attached_node_id);
1186 4 : self.intent
1187 4 : .promote_attached(scheduler, new_attached_node_id);
1188 4 : }
1189 : ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
1190 1 : old_node_id,
1191 1 : new_node_id,
1192 1 : }) => {
1193 1 : self.intent.remove_secondary(scheduler, old_node_id);
1194 1 : self.intent.push_secondary(scheduler, new_node_id);
1195 1 : }
1196 3 : ScheduleOptimizationAction::CreateSecondary(new_node_id) => {
1197 3 : self.intent.push_secondary(scheduler, new_node_id);
1198 3 : }
1199 6 : ScheduleOptimizationAction::RemoveSecondary(old_secondary) => {
1200 6 : self.intent.remove_secondary(scheduler, old_secondary);
1201 6 : }
1202 : }
1203 :
1204 14 : true
1205 14 : }
1206 :
1207 : /// When a shard has several secondary locations, we need to pick one in situations where
1208 : /// we promote one of them to an attached location:
1209 : /// - When draining a node for restart
1210 : /// - When responding to a node failure
1211 : ///
1212 : /// In this context, 'preferred' does not mean the node with the best scheduling score: instead
1213 : /// we want to pick the node which is best for use _temporarily_ while the previous attached location
1214 : /// is unavailable (e.g. because it's down or deploying). That means we prefer to use secondary
1215 : /// locations in a non-preferred AZ, as they're more likely to have awarm cache than a temporary
1216 : /// secondary in the preferred AZ (which are usually only created for migrations, and if they exist
1217 : /// they're probably not warmed up yet). The latter behavior is based oni
1218 : ///
1219 : /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
1220 : /// caller needs to a pick a node some other way.
1221 12831 : pub(crate) fn preferred_secondary(&self, scheduler: &Scheduler) -> Option<NodeId> {
1222 12831 : let candidates = scheduler.filter_usable_nodes(&self.intent.secondary);
1223 12831 :
1224 12831 : // We will sort candidates to prefer nodes which are _not_ in our preferred AZ, i.e. we prefer
1225 12831 : // to migrate to a long-lived secondary location (which would have been scheduled in a non-preferred AZ),
1226 12831 : // rather than a short-lived secondary location being used for optimization/migration (which would have
1227 12831 : // been scheduled in our preferred AZ).
1228 12831 : let mut candidates = candidates
1229 12831 : .iter()
1230 12831 : .map(|(node_id, node_az)| {
1231 2 : if node_az == &self.intent.preferred_az_id {
1232 1 : (1, *node_id)
1233 : } else {
1234 1 : (0, *node_id)
1235 : }
1236 12831 : })
1237 12831 : .collect::<Vec<_>>();
1238 12831 :
1239 12831 : candidates.sort();
1240 12831 :
1241 12831 : candidates.first().map(|i| i.1)
1242 12831 : }
1243 :
1244 : /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
1245 : /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
1246 : /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
1247 : ///
1248 : /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
1249 : /// funciton should not be used to decide whether to reconcile.
1250 0 : pub(crate) fn stably_attached(&self) -> Option<NodeId> {
1251 0 : if let Some(attach_intent) = self.intent.attached {
1252 0 : match self.observed.locations.get(&attach_intent) {
1253 0 : Some(loc) => match &loc.conf {
1254 0 : Some(conf) => match conf.mode {
1255 : LocationConfigMode::AttachedMulti
1256 : | LocationConfigMode::AttachedSingle
1257 : | LocationConfigMode::AttachedStale => {
1258 : // Our intent and observed state agree that this node is in an attached state.
1259 0 : Some(attach_intent)
1260 : }
1261 : // Our observed config is not an attached state
1262 0 : _ => None,
1263 : },
1264 : // Our observed state is None, i.e. in flux
1265 0 : None => None,
1266 : },
1267 : // We have no observed state for this node
1268 0 : None => None,
1269 : }
1270 : } else {
1271 : // Our intent is not to attach
1272 0 : None
1273 : }
1274 0 : }
1275 :
1276 0 : fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
1277 0 : let mut dirty_nodes = HashSet::new();
1278 :
1279 0 : if let Some(node_id) = self.intent.attached {
1280 : // Maybe panic: it is a severe bug if we try to attach while generation is null.
1281 0 : let generation = self
1282 0 : .generation
1283 0 : .expect("Attempted to enter attached state without a generation");
1284 0 :
1285 0 : let wanted_conf =
1286 0 : attached_location_conf(generation, &self.shard, &self.config, &self.policy);
1287 0 : match self.observed.locations.get(&node_id) {
1288 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
1289 0 : Some(_) | None => {
1290 0 : dirty_nodes.insert(node_id);
1291 0 : }
1292 : }
1293 0 : }
1294 :
1295 0 : for node_id in &self.intent.secondary {
1296 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
1297 0 : match self.observed.locations.get(node_id) {
1298 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
1299 0 : Some(_) | None => {
1300 0 : dirty_nodes.insert(*node_id);
1301 0 : }
1302 : }
1303 : }
1304 :
1305 0 : for node_id in self.observed.locations.keys() {
1306 0 : if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
1307 0 : // We have observed state that isn't part of our intent: need to clean it up.
1308 0 : dirty_nodes.insert(*node_id);
1309 0 : }
1310 : }
1311 :
1312 0 : dirty_nodes.retain(|node_id| {
1313 0 : nodes
1314 0 : .get(node_id)
1315 0 : .map(|n| n.is_available())
1316 0 : .unwrap_or(false)
1317 0 : });
1318 0 :
1319 0 : !dirty_nodes.is_empty()
1320 0 : }
1321 :
1322 : #[allow(clippy::too_many_arguments)]
1323 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1324 : pub(crate) fn get_reconcile_needed(
1325 : &mut self,
1326 : pageservers: &Arc<HashMap<NodeId, Node>>,
1327 : ) -> ReconcileNeeded {
1328 : // If there are any ambiguous observed states, and the nodes they refer to are available,
1329 : // we should reconcile to clean them up.
1330 : let mut dirty_observed = false;
1331 : for (node_id, observed_loc) in &self.observed.locations {
1332 : let node = pageservers
1333 : .get(node_id)
1334 : .expect("Nodes may not be removed while referenced");
1335 : if observed_loc.conf.is_none() && node.is_available() {
1336 : dirty_observed = true;
1337 : break;
1338 : }
1339 : }
1340 :
1341 : let active_nodes_dirty = self.dirty(pageservers);
1342 :
1343 : let reconcile_needed = match (
1344 : active_nodes_dirty,
1345 : dirty_observed,
1346 : self.pending_compute_notification,
1347 : ) {
1348 : (true, _, _) => ReconcileNeeded::Yes(ReconcileReason::ActiveNodesDirty),
1349 : (_, true, _) => ReconcileNeeded::Yes(ReconcileReason::UnknownLocation),
1350 : (_, _, true) => ReconcileNeeded::Yes(ReconcileReason::PendingComputeNotification),
1351 : _ => ReconcileNeeded::No,
1352 : };
1353 :
1354 : if matches!(reconcile_needed, ReconcileNeeded::No) {
1355 : tracing::debug!("Not dirty, no reconciliation needed.");
1356 : return ReconcileNeeded::No;
1357 : }
1358 :
1359 : // If we are currently splitting, then never start a reconciler task: the splitting logic
1360 : // requires that shards are not interfered with while it runs. Do this check here rather than
1361 : // up top, so that we only log this message if we would otherwise have done a reconciliation.
1362 : if !matches!(self.splitting, SplitState::Idle) {
1363 : tracing::info!("Refusing to reconcile, splitting in progress");
1364 : return ReconcileNeeded::No;
1365 : }
1366 :
1367 : // Reconcile already in flight for the current sequence?
1368 : if let Some(handle) = &self.reconciler {
1369 : if handle.sequence == self.sequence {
1370 : tracing::info!(
1371 : "Reconciliation already in progress for sequence {:?}",
1372 : self.sequence,
1373 : );
1374 : return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
1375 : tenant_shard_id: self.tenant_shard_id,
1376 : seq_wait: self.waiter.clone(),
1377 : error_seq_wait: self.error_waiter.clone(),
1378 : error: self.last_error.clone(),
1379 : seq: self.sequence,
1380 : });
1381 : }
1382 : }
1383 :
1384 : // Pre-checks done: finally check whether we may actually do the work
1385 : match self.scheduling_policy {
1386 : ShardSchedulingPolicy::Active
1387 : | ShardSchedulingPolicy::Essential
1388 : | ShardSchedulingPolicy::Pause => {}
1389 : ShardSchedulingPolicy::Stop => {
1390 : // We only reach this point if there is work to do and we're going to skip
1391 : // doing it: warn it obvious why this tenant isn't doing what it ought to.
1392 : tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
1393 : return ReconcileNeeded::No;
1394 : }
1395 : }
1396 :
1397 : reconcile_needed
1398 : }
1399 :
1400 : /// Ensure the sequence number is set to a value where waiting for this value will make us wait
1401 : /// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
1402 : ///
1403 : /// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
1404 : /// that the waiter will not complete until some future Reconciler is constructed and run.
1405 0 : fn ensure_sequence_ahead(&mut self) {
1406 0 : // Find the highest sequence for which a Reconciler has previously run or is currently
1407 0 : // running
1408 0 : let max_seen = std::cmp::max(
1409 0 : self.reconciler
1410 0 : .as_ref()
1411 0 : .map(|r| r.sequence)
1412 0 : .unwrap_or(Sequence(0)),
1413 0 : std::cmp::max(self.waiter.load(), self.error_waiter.load()),
1414 0 : );
1415 0 :
1416 0 : if self.sequence <= max_seen {
1417 0 : self.sequence = max_seen.next();
1418 0 : }
1419 0 : }
1420 :
1421 : /// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
1422 : ///
1423 : /// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
1424 : /// you would like to wait on the next reconciler that gets spawned in the background.
1425 0 : pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
1426 0 : self.ensure_sequence_ahead();
1427 0 :
1428 0 : ReconcilerWaiter {
1429 0 : tenant_shard_id: self.tenant_shard_id,
1430 0 : seq_wait: self.waiter.clone(),
1431 0 : error_seq_wait: self.error_waiter.clone(),
1432 0 : error: self.last_error.clone(),
1433 0 : seq: self.sequence,
1434 0 : }
1435 0 : }
1436 :
1437 0 : async fn reconcile(
1438 0 : sequence: Sequence,
1439 0 : mut reconciler: Reconciler,
1440 0 : must_notify: bool,
1441 0 : ) -> ReconcileResult {
1442 : // Attempt to make observed state match intent state
1443 0 : let result = reconciler.reconcile().await;
1444 :
1445 : // If we know we had a pending compute notification from some previous action, send a notification irrespective
1446 : // of whether the above reconcile() did any work. It has to be Ok() though, because otherwise we might be
1447 : // sending a notification of a location that isn't really attached.
1448 0 : if result.is_ok() && must_notify {
1449 : // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
1450 0 : reconciler.compute_notify().await.ok();
1451 0 : } else if must_notify {
1452 0 : // Carry this flag so that the reconciler's result will indicate that it still needs to retry
1453 0 : // the compute hook notification eventually.
1454 0 : reconciler.compute_notify_failure = true;
1455 0 : }
1456 :
1457 : // Update result counter
1458 0 : let outcome_label = match &result {
1459 0 : Ok(_) => ReconcileOutcome::Success,
1460 0 : Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
1461 0 : Err(_) => ReconcileOutcome::Error,
1462 : };
1463 :
1464 0 : metrics::METRICS_REGISTRY
1465 0 : .metrics_group
1466 0 : .storage_controller_reconcile_complete
1467 0 : .inc(ReconcileCompleteLabelGroup {
1468 0 : status: outcome_label,
1469 0 : });
1470 0 :
1471 0 : // Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
1472 0 : // try and schedule more work in response to our result.
1473 0 : ReconcileResult {
1474 0 : sequence,
1475 0 : result,
1476 0 : tenant_shard_id: reconciler.tenant_shard_id,
1477 0 : generation: reconciler.generation,
1478 0 : observed_deltas: reconciler.observed_deltas(),
1479 0 : pending_compute_notification: reconciler.compute_notify_failure,
1480 0 : }
1481 0 : }
1482 :
1483 : #[allow(clippy::too_many_arguments)]
1484 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1485 : pub(crate) fn spawn_reconciler(
1486 : &mut self,
1487 : reason: ReconcileReason,
1488 : result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
1489 : pageservers: &Arc<HashMap<NodeId, Node>>,
1490 : compute_hook: &Arc<ComputeHook>,
1491 : reconciler_config: ReconcilerConfig,
1492 : service_config: &service::Config,
1493 : persistence: &Arc<Persistence>,
1494 : units: ReconcileUnits,
1495 : gate_guard: GateGuard,
1496 : cancel: &CancellationToken,
1497 : ) -> Option<ReconcilerWaiter> {
1498 : // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
1499 : // doing our sequence's work.
1500 : let old_handle = self.reconciler.take();
1501 :
1502 : // Build list of nodes from which the reconciler should detach
1503 : let mut detach = Vec::new();
1504 : for node_id in self.observed.locations.keys() {
1505 : if self.intent.get_attached() != &Some(*node_id)
1506 : && !self.intent.secondary.contains(node_id)
1507 : {
1508 : detach.push(
1509 : pageservers
1510 : .get(node_id)
1511 : .expect("Intent references non-existent pageserver")
1512 : .clone(),
1513 : )
1514 : }
1515 : }
1516 :
1517 : // Advance the sequence before spawning a reconciler, so that sequence waiters
1518 : // can distinguish between before+after the reconcile completes.
1519 : self.ensure_sequence_ahead();
1520 :
1521 : let reconciler_cancel = cancel.child_token();
1522 : let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
1523 : let reconciler = Reconciler {
1524 : tenant_shard_id: self.tenant_shard_id,
1525 : shard: self.shard,
1526 : placement_policy: self.policy.clone(),
1527 : generation: self.generation,
1528 : intent: reconciler_intent,
1529 : detach,
1530 : reconciler_config,
1531 : config: self.config.clone(),
1532 : preferred_az: self.intent.preferred_az_id.clone(),
1533 : observed: self.observed.clone(),
1534 : original_observed: self.observed.clone(),
1535 : compute_hook: compute_hook.clone(),
1536 : service_config: service_config.clone(),
1537 : _gate_guard: gate_guard,
1538 : _resource_units: units,
1539 : cancel: reconciler_cancel.clone(),
1540 : persistence: persistence.clone(),
1541 : compute_notify_failure: false,
1542 : };
1543 :
1544 : let reconcile_seq = self.sequence;
1545 : let long_reconcile_threshold = service_config.long_reconcile_threshold;
1546 :
1547 : tracing::info!(seq=%reconcile_seq, "Spawning Reconciler ({reason:?})");
1548 : let must_notify = self.pending_compute_notification;
1549 : let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
1550 : tenant_id=%reconciler.tenant_shard_id.tenant_id,
1551 : shard_id=%reconciler.tenant_shard_id.shard_slug());
1552 : metrics::METRICS_REGISTRY
1553 : .metrics_group
1554 : .storage_controller_reconcile_spawn
1555 : .inc();
1556 : let result_tx = result_tx.clone();
1557 : let join_handle = tokio::task::spawn(
1558 0 : async move {
1559 : // Wait for any previous reconcile task to complete before we start
1560 0 : if let Some(old_handle) = old_handle {
1561 0 : old_handle.cancel.cancel();
1562 0 : if let Err(e) = old_handle.handle.await {
1563 : // We can't do much with this other than log it: the task is done, so
1564 : // we may proceed with our work.
1565 0 : tracing::error!("Unexpected join error waiting for reconcile task: {e}");
1566 0 : }
1567 0 : }
1568 :
1569 : // Early check for cancellation before doing any work
1570 : // TODO: wrap all remote API operations in cancellation check
1571 : // as well.
1572 0 : if reconciler.cancel.is_cancelled() {
1573 0 : metrics::METRICS_REGISTRY
1574 0 : .metrics_group
1575 0 : .storage_controller_reconcile_complete
1576 0 : .inc(ReconcileCompleteLabelGroup {
1577 0 : status: ReconcileOutcome::Cancel,
1578 0 : });
1579 0 : return;
1580 0 : }
1581 0 :
1582 0 : let (tenant_id_label, shard_number_label, sequence_label) = {
1583 0 : (
1584 0 : reconciler.tenant_shard_id.tenant_id.to_string(),
1585 0 : reconciler.tenant_shard_id.shard_number.0.to_string(),
1586 0 : reconcile_seq.to_string(),
1587 0 : )
1588 0 : };
1589 0 :
1590 0 : let label_group = ReconcileLongRunningLabelGroup {
1591 0 : tenant_id: &tenant_id_label,
1592 0 : shard_number: &shard_number_label,
1593 0 : sequence: &sequence_label,
1594 0 : };
1595 0 :
1596 0 : let reconcile_fut = Self::reconcile(reconcile_seq, reconciler, must_notify);
1597 0 : let long_reconcile_fut = {
1598 0 : let label_group = label_group.clone();
1599 0 : async move {
1600 0 : tokio::time::sleep(long_reconcile_threshold).await;
1601 :
1602 0 : tracing::warn!("Reconcile passed the long running threshold of {long_reconcile_threshold:?}");
1603 :
1604 0 : metrics::METRICS_REGISTRY
1605 0 : .metrics_group
1606 0 : .storage_controller_reconcile_long_running
1607 0 : .inc(label_group);
1608 0 : }
1609 : };
1610 :
1611 0 : let reconcile_fut = std::pin::pin!(reconcile_fut);
1612 0 : let long_reconcile_fut = std::pin::pin!(long_reconcile_fut);
1613 :
1614 0 : let (was_long, result) =
1615 0 : match future::select(reconcile_fut, long_reconcile_fut).await {
1616 0 : Either::Left((reconcile_result, _)) => (false, reconcile_result),
1617 0 : Either::Right((_, reconcile_fut)) => (true, reconcile_fut.await),
1618 : };
1619 :
1620 0 : if was_long {
1621 0 : let id = metrics::METRICS_REGISTRY
1622 0 : .metrics_group
1623 0 : .storage_controller_reconcile_long_running
1624 0 : .with_labels(label_group);
1625 0 : metrics::METRICS_REGISTRY
1626 0 : .metrics_group
1627 0 : .storage_controller_reconcile_long_running
1628 0 : .remove_metric(id);
1629 0 : }
1630 :
1631 0 : result_tx
1632 0 : .send(ReconcileResultRequest::ReconcileResult(result))
1633 0 : .ok();
1634 0 : }
1635 : .instrument(reconciler_span),
1636 : );
1637 :
1638 : self.reconciler = Some(ReconcilerHandle {
1639 : sequence: self.sequence,
1640 : handle: join_handle,
1641 : cancel: reconciler_cancel,
1642 : });
1643 :
1644 : Some(ReconcilerWaiter {
1645 : tenant_shard_id: self.tenant_shard_id,
1646 : seq_wait: self.waiter.clone(),
1647 : error_seq_wait: self.error_waiter.clone(),
1648 : error: self.last_error.clone(),
1649 : seq: self.sequence,
1650 : })
1651 : }
1652 :
1653 0 : pub(crate) fn cancel_reconciler(&self) {
1654 0 : if let Some(handle) = self.reconciler.as_ref() {
1655 0 : handle.cancel.cancel()
1656 0 : }
1657 0 : }
1658 :
1659 : /// Get a waiter for any reconciliation in flight, but do not start reconciliation
1660 : /// if it is not already running
1661 0 : pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
1662 0 : if self.reconciler.is_some() {
1663 0 : Some(ReconcilerWaiter {
1664 0 : tenant_shard_id: self.tenant_shard_id,
1665 0 : seq_wait: self.waiter.clone(),
1666 0 : error_seq_wait: self.error_waiter.clone(),
1667 0 : error: self.last_error.clone(),
1668 0 : seq: self.sequence,
1669 0 : })
1670 : } else {
1671 0 : None
1672 : }
1673 0 : }
1674 :
1675 : /// Called when a ReconcileResult has been emitted and the service is updating
1676 : /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
1677 : /// the handle to indicate there is no longer a reconciliation in progress.
1678 0 : pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
1679 0 : if let Some(reconcile_handle) = &self.reconciler {
1680 0 : if reconcile_handle.sequence <= sequence {
1681 0 : self.reconciler = None;
1682 0 : }
1683 0 : }
1684 0 : }
1685 :
1686 : /// If we had any state at all referring to this node ID, drop it. Does not
1687 : /// attempt to reschedule.
1688 : ///
1689 : /// Returns true if we modified the node's intent state.
1690 0 : pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
1691 0 : let mut intent_modified = false;
1692 0 :
1693 0 : // Drop if this node was our attached intent
1694 0 : if self.intent.attached == Some(node_id) {
1695 0 : self.intent.attached = None;
1696 0 : intent_modified = true;
1697 0 : }
1698 :
1699 : // Drop from the list of secondaries, and check if we modified it
1700 0 : let had_secondaries = self.intent.secondary.len();
1701 0 : self.intent.secondary.retain(|n| n != &node_id);
1702 0 : intent_modified |= self.intent.secondary.len() != had_secondaries;
1703 0 :
1704 0 : debug_assert!(!self.intent.all_pageservers().contains(&node_id));
1705 :
1706 0 : intent_modified
1707 0 : }
1708 :
1709 0 : pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
1710 0 : self.scheduling_policy = p;
1711 0 : }
1712 :
1713 0 : pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
1714 0 : &self.scheduling_policy
1715 0 : }
1716 :
1717 0 : pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
1718 0 : // Ordering: always set last_error before advancing sequence, so that sequence
1719 0 : // waiters are guaranteed to see a Some value when they see an error.
1720 0 : *(self.last_error.lock().unwrap()) = Some(Arc::new(error));
1721 0 : self.error_waiter.advance(sequence);
1722 0 : }
1723 :
1724 0 : pub(crate) fn from_persistent(
1725 0 : tsp: TenantShardPersistence,
1726 0 : intent: IntentState,
1727 0 : ) -> anyhow::Result<Self> {
1728 0 : let tenant_shard_id = tsp.get_tenant_shard_id()?;
1729 0 : let shard_identity = tsp.get_shard_identity()?;
1730 :
1731 0 : metrics::METRICS_REGISTRY
1732 0 : .metrics_group
1733 0 : .storage_controller_tenant_shards
1734 0 : .inc();
1735 0 :
1736 0 : Ok(Self {
1737 0 : tenant_shard_id,
1738 0 : shard: shard_identity,
1739 0 : sequence: Sequence::initial(),
1740 0 : generation: tsp.generation.map(|g| Generation::new(g as u32)),
1741 0 : policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
1742 0 : intent,
1743 0 : observed: ObservedState::new(),
1744 0 : config: serde_json::from_str(&tsp.config).unwrap(),
1745 0 : reconciler: None,
1746 0 : splitting: tsp.splitting,
1747 0 : waiter: Arc::new(SeqWait::new(Sequence::initial())),
1748 0 : error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
1749 0 : last_error: Arc::default(),
1750 0 : pending_compute_notification: false,
1751 0 : delayed_reconcile: false,
1752 0 : scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
1753 0 : })
1754 0 : }
1755 :
1756 0 : pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
1757 0 : TenantShardPersistence {
1758 0 : tenant_id: self.tenant_shard_id.tenant_id.to_string(),
1759 0 : shard_number: self.tenant_shard_id.shard_number.0 as i32,
1760 0 : shard_count: self.tenant_shard_id.shard_count.literal() as i32,
1761 0 : shard_stripe_size: self.shard.stripe_size.0 as i32,
1762 0 : generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
1763 0 : generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
1764 0 : placement_policy: serde_json::to_string(&self.policy).unwrap(),
1765 0 : config: serde_json::to_string(&self.config).unwrap(),
1766 0 : splitting: SplitState::default(),
1767 0 : scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
1768 0 : preferred_az_id: self.intent.preferred_az_id.as_ref().map(|az| az.0.clone()),
1769 0 : }
1770 0 : }
1771 :
1772 12508 : pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> {
1773 12508 : self.intent.preferred_az_id.as_ref()
1774 12508 : }
1775 :
1776 0 : pub(crate) fn set_preferred_az(&mut self, preferred_az_id: Option<AvailabilityZone>) {
1777 0 : self.intent.preferred_az_id = preferred_az_id;
1778 0 : }
1779 :
1780 : /// Returns all the nodes to which this tenant shard is attached according to the
1781 : /// observed state and the generations. Return vector is sorted from latest generation
1782 : /// to earliest.
1783 0 : pub(crate) fn attached_locations(&self) -> Vec<(NodeId, Generation)> {
1784 0 : self.observed
1785 0 : .locations
1786 0 : .iter()
1787 0 : .filter_map(|(node_id, observed)| {
1788 : use LocationConfigMode::{AttachedMulti, AttachedSingle, AttachedStale};
1789 :
1790 0 : let conf = observed.conf.as_ref()?;
1791 :
1792 0 : match (conf.generation, conf.mode) {
1793 0 : (Some(gen_), AttachedMulti | AttachedSingle | AttachedStale) => {
1794 0 : Some((*node_id, gen_))
1795 : }
1796 0 : _ => None,
1797 : }
1798 0 : })
1799 0 : .sorted_by(|(_lhs_node_id, lhs_gen), (_rhs_node_id, rhs_gen)| {
1800 0 : lhs_gen.cmp(rhs_gen).reverse()
1801 0 : })
1802 0 : .map(|(node_id, gen_)| (node_id, Generation::new(gen_)))
1803 0 : .collect()
1804 0 : }
1805 :
1806 : /// Update the observed state of the tenant by applying incremental deltas
1807 : ///
1808 : /// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
1809 : /// They are then filtered in [`crate::service::Service::process_result`].
1810 0 : pub(crate) fn apply_observed_deltas(
1811 0 : &mut self,
1812 0 : deltas: impl Iterator<Item = ObservedStateDelta>,
1813 0 : ) {
1814 0 : for delta in deltas {
1815 0 : match delta {
1816 0 : ObservedStateDelta::Upsert(ups) => {
1817 0 : let (node_id, loc) = *ups;
1818 0 :
1819 0 : // If the generation of the observed location in the delta is lagging
1820 0 : // behind the current one, then we have a race condition and cannot
1821 0 : // be certain about the true observed state. Set the observed state
1822 0 : // to None in order to reflect this.
1823 0 : let crnt_gen = self
1824 0 : .observed
1825 0 : .locations
1826 0 : .get(&node_id)
1827 0 : .and_then(|loc| loc.conf.as_ref())
1828 0 : .and_then(|conf| conf.generation);
1829 0 : let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
1830 0 : match (crnt_gen, new_gen) {
1831 0 : (Some(crnt), Some(new)) if crnt_gen > new_gen => {
1832 0 : tracing::warn!(
1833 0 : "Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
1834 : node_id,
1835 : loc,
1836 : crnt,
1837 : new
1838 : );
1839 :
1840 0 : self.observed
1841 0 : .locations
1842 0 : .insert(node_id, ObservedStateLocation { conf: None });
1843 0 :
1844 0 : continue;
1845 : }
1846 0 : _ => {}
1847 : }
1848 :
1849 0 : if let Some(conf) = &loc.conf {
1850 0 : tracing::info!("Updating observed location {}: {:?}", node_id, conf);
1851 : } else {
1852 0 : tracing::info!("Setting observed location {} to None", node_id,)
1853 : }
1854 :
1855 0 : self.observed.locations.insert(node_id, loc);
1856 : }
1857 0 : ObservedStateDelta::Delete(node_id) => {
1858 0 : tracing::info!("Deleting observed location {}", node_id);
1859 0 : self.observed.locations.remove(&node_id);
1860 : }
1861 : }
1862 : }
1863 0 : }
1864 :
1865 : /// Returns true if the tenant shard is attached to a node that is outside the preferred AZ.
1866 : ///
1867 : /// If the shard does not have a preferred AZ, returns false.
1868 0 : pub(crate) fn is_attached_outside_preferred_az(&self, nodes: &HashMap<NodeId, Node>) -> bool {
1869 0 : self.intent
1870 0 : .get_attached()
1871 0 : .map(|node_id| {
1872 0 : Some(
1873 0 : nodes
1874 0 : .get(&node_id)
1875 0 : .expect("referenced node exists")
1876 0 : .get_availability_zone_id(),
1877 0 : ) != self.intent.preferred_az_id.as_ref()
1878 0 : })
1879 0 : .unwrap_or(false)
1880 0 : }
1881 : }
1882 :
1883 : impl Drop for TenantShard {
1884 12841 : fn drop(&mut self) {
1885 12841 : metrics::METRICS_REGISTRY
1886 12841 : .metrics_group
1887 12841 : .storage_controller_tenant_shards
1888 12841 : .dec();
1889 12841 : }
1890 : }
1891 :
1892 : #[cfg(test)]
1893 : pub(crate) mod tests {
1894 : use std::cell::RefCell;
1895 : use std::rc::Rc;
1896 :
1897 : use pageserver_api::controller_api::NodeAvailability;
1898 : use pageserver_api::shard::{ShardCount, ShardNumber};
1899 : use rand::SeedableRng;
1900 : use rand::rngs::StdRng;
1901 : use utils::id::TenantId;
1902 :
1903 : use super::*;
1904 : use crate::scheduler::test_utils::make_test_nodes;
1905 :
1906 11 : fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
1907 11 : let tenant_id = TenantId::generate();
1908 11 : let shard_number = ShardNumber(0);
1909 11 : let shard_count = ShardCount::new(1);
1910 11 :
1911 11 : let tenant_shard_id = TenantShardId {
1912 11 : tenant_id,
1913 11 : shard_number,
1914 11 : shard_count,
1915 11 : };
1916 11 : TenantShard::new(
1917 11 : tenant_shard_id,
1918 11 : ShardIdentity::new(
1919 11 : shard_number,
1920 11 : shard_count,
1921 11 : pageserver_api::shard::ShardStripeSize(32768),
1922 11 : )
1923 11 : .unwrap(),
1924 11 : policy,
1925 11 : None,
1926 11 : )
1927 11 : }
1928 :
1929 5004 : pub(crate) fn make_test_tenant(
1930 5004 : policy: PlacementPolicy,
1931 5004 : shard_count: ShardCount,
1932 5004 : preferred_az: Option<AvailabilityZone>,
1933 5004 : ) -> Vec<TenantShard> {
1934 5004 : make_test_tenant_with_id(TenantId::generate(), policy, shard_count, preferred_az)
1935 5004 : }
1936 :
1937 5007 : pub(crate) fn make_test_tenant_with_id(
1938 5007 : tenant_id: TenantId,
1939 5007 : policy: PlacementPolicy,
1940 5007 : shard_count: ShardCount,
1941 5007 : preferred_az: Option<AvailabilityZone>,
1942 5007 : ) -> Vec<TenantShard> {
1943 5007 : (0..shard_count.count())
1944 12522 : .map(|i| {
1945 12522 : let shard_number = ShardNumber(i);
1946 12522 :
1947 12522 : let tenant_shard_id = TenantShardId {
1948 12522 : tenant_id,
1949 12522 : shard_number,
1950 12522 : shard_count,
1951 12522 : };
1952 12522 : TenantShard::new(
1953 12522 : tenant_shard_id,
1954 12522 : ShardIdentity::new(
1955 12522 : shard_number,
1956 12522 : shard_count,
1957 12522 : pageserver_api::shard::ShardStripeSize(32768),
1958 12522 : )
1959 12522 : .unwrap(),
1960 12522 : policy.clone(),
1961 12522 : preferred_az.clone(),
1962 12522 : )
1963 12522 : })
1964 5007 : .collect()
1965 5007 : }
1966 :
1967 : /// Test the scheduling behaviors used when a tenant configured for HA is subject
1968 : /// to nodes being marked offline.
1969 : #[test]
1970 1 : fn tenant_ha_scheduling() -> anyhow::Result<()> {
1971 1 : // Start with three nodes. Our tenant will only use two. The third one is
1972 1 : // expected to remain unused.
1973 1 : let mut nodes = make_test_nodes(3, &[]);
1974 1 :
1975 1 : let mut scheduler = Scheduler::new(nodes.values());
1976 1 : let mut context = ScheduleContext::default();
1977 1 :
1978 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1979 1 : tenant_shard
1980 1 : .schedule(&mut scheduler, &mut context)
1981 1 : .expect("we have enough nodes, scheduling should work");
1982 1 :
1983 1 : // Expect to initially be schedule on to different nodes
1984 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
1985 1 : assert!(tenant_shard.intent.attached.is_some());
1986 :
1987 1 : let attached_node_id = tenant_shard.intent.attached.unwrap();
1988 1 : let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
1989 1 : assert_ne!(attached_node_id, secondary_node_id);
1990 :
1991 : // Notifying the attached node is offline should demote it to a secondary
1992 1 : let changed = tenant_shard
1993 1 : .intent
1994 1 : .demote_attached(&mut scheduler, attached_node_id);
1995 1 : assert!(changed);
1996 1 : assert!(tenant_shard.intent.attached.is_none());
1997 1 : assert_eq!(tenant_shard.intent.secondary.len(), 2);
1998 :
1999 : // Update the scheduler state to indicate the node is offline
2000 1 : nodes
2001 1 : .get_mut(&attached_node_id)
2002 1 : .unwrap()
2003 1 : .set_availability(NodeAvailability::Offline);
2004 1 : scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
2005 1 :
2006 1 : // Scheduling the node should promote the still-available secondary node to attached
2007 1 : tenant_shard
2008 1 : .schedule(&mut scheduler, &mut context)
2009 1 : .expect("active nodes are available");
2010 1 : assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
2011 :
2012 : // The original attached node should have been retained as a secondary
2013 1 : assert_eq!(
2014 1 : *tenant_shard.intent.secondary.iter().last().unwrap(),
2015 1 : attached_node_id
2016 1 : );
2017 :
2018 1 : tenant_shard.intent.clear(&mut scheduler);
2019 1 :
2020 1 : Ok(())
2021 1 : }
2022 :
2023 : #[test]
2024 1 : fn intent_from_observed() -> anyhow::Result<()> {
2025 1 : let nodes = make_test_nodes(3, &[]);
2026 1 : let mut scheduler = Scheduler::new(nodes.values());
2027 1 :
2028 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
2029 1 :
2030 1 : tenant_shard.observed.locations.insert(
2031 1 : NodeId(3),
2032 1 : ObservedStateLocation {
2033 1 : conf: Some(LocationConfig {
2034 1 : mode: LocationConfigMode::AttachedMulti,
2035 1 : generation: Some(2),
2036 1 : secondary_conf: None,
2037 1 : shard_number: tenant_shard.shard.number.0,
2038 1 : shard_count: tenant_shard.shard.count.literal(),
2039 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
2040 1 : tenant_conf: TenantConfig::default(),
2041 1 : }),
2042 1 : },
2043 1 : );
2044 1 :
2045 1 : tenant_shard.observed.locations.insert(
2046 1 : NodeId(2),
2047 1 : ObservedStateLocation {
2048 1 : conf: Some(LocationConfig {
2049 1 : mode: LocationConfigMode::AttachedStale,
2050 1 : generation: Some(1),
2051 1 : secondary_conf: None,
2052 1 : shard_number: tenant_shard.shard.number.0,
2053 1 : shard_count: tenant_shard.shard.count.literal(),
2054 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
2055 1 : tenant_conf: TenantConfig::default(),
2056 1 : }),
2057 1 : },
2058 1 : );
2059 1 :
2060 1 : tenant_shard.intent_from_observed(&mut scheduler);
2061 1 :
2062 1 : // The highest generationed attached location gets used as attached
2063 1 : assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
2064 : // Other locations get used as secondary
2065 1 : assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
2066 :
2067 1 : scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
2068 :
2069 1 : tenant_shard.intent.clear(&mut scheduler);
2070 1 : Ok(())
2071 1 : }
2072 :
2073 : #[test]
2074 1 : fn scheduling_mode() -> anyhow::Result<()> {
2075 1 : let nodes = make_test_nodes(3, &[]);
2076 1 : let mut scheduler = Scheduler::new(nodes.values());
2077 1 :
2078 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
2079 1 :
2080 1 : // In pause mode, schedule() shouldn't do anything
2081 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
2082 1 : assert!(
2083 1 : tenant_shard
2084 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
2085 1 : .is_ok()
2086 1 : );
2087 1 : assert!(tenant_shard.intent.all_pageservers().is_empty());
2088 :
2089 : // In active mode, schedule() works
2090 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
2091 1 : assert!(
2092 1 : tenant_shard
2093 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
2094 1 : .is_ok()
2095 1 : );
2096 1 : assert!(!tenant_shard.intent.all_pageservers().is_empty());
2097 :
2098 1 : tenant_shard.intent.clear(&mut scheduler);
2099 1 : Ok(())
2100 1 : }
2101 :
2102 : #[test]
2103 : /// Simple case: moving attachment to somewhere better where we already have a secondary
2104 1 : fn optimize_attachment_simple() -> anyhow::Result<()> {
2105 1 : let nodes = make_test_nodes(
2106 1 : 3,
2107 1 : &[
2108 1 : AvailabilityZone("az-a".to_string()),
2109 1 : AvailabilityZone("az-b".to_string()),
2110 1 : AvailabilityZone("az-c".to_string()),
2111 1 : ],
2112 1 : );
2113 1 : let mut scheduler = Scheduler::new(nodes.values());
2114 1 :
2115 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2116 1 : shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2117 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
2118 1 : shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2119 1 :
2120 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
2121 1 : // on different nodes.
2122 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
2123 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(1));
2124 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
2125 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
2126 :
2127 1 : fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
2128 1 : let mut schedule_context = ScheduleContext::default();
2129 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
2130 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
2131 1 : schedule_context
2132 1 : }
2133 :
2134 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2135 1 : let optimization_a = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2136 1 : assert_eq!(
2137 1 : optimization_a,
2138 1 : Some(ScheduleOptimization {
2139 1 : sequence: shard_a.sequence,
2140 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2141 1 : old_attached_node_id: NodeId(2),
2142 1 : new_attached_node_id: NodeId(1)
2143 1 : })
2144 1 : })
2145 1 : );
2146 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
2147 1 :
2148 1 : // // Either shard should recognize that it has the option to switch to a secondary location where there
2149 1 : // // would be no other shards from the same tenant, and request to do so.
2150 1 : // assert_eq!(
2151 1 : // optimization_a_prepare,
2152 1 : // Some(ScheduleOptimization {
2153 1 : // sequence: shard_a.sequence,
2154 1 : // action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
2155 1 : // })
2156 1 : // );
2157 1 : // shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
2158 1 :
2159 1 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2160 1 : // let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2161 1 : // assert_eq!(
2162 1 : // optimization_a_migrate,
2163 1 : // Some(ScheduleOptimization {
2164 1 : // sequence: shard_a.sequence,
2165 1 : // action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2166 1 : // old_attached_node_id: NodeId(1),
2167 1 : // new_attached_node_id: NodeId(2)
2168 1 : // })
2169 1 : // })
2170 1 : // );
2171 1 : // shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
2172 1 :
2173 1 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2174 1 : // let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2175 1 : // assert_eq!(
2176 1 : // optimization_a_cleanup,
2177 1 : // Some(ScheduleOptimization {
2178 1 : // sequence: shard_a.sequence,
2179 1 : // action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
2180 1 : // })
2181 1 : // );
2182 1 : // shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
2183 1 :
2184 1 : // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
2185 1 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2186 1 : // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
2187 1 :
2188 1 : shard_a.intent.clear(&mut scheduler);
2189 1 : shard_b.intent.clear(&mut scheduler);
2190 1 :
2191 1 : Ok(())
2192 1 : }
2193 :
2194 : #[test]
2195 : /// Complicated case: moving attachment to somewhere better where we do not have a secondary
2196 : /// already, creating one as needed.
2197 1 : fn optimize_attachment_multistep() -> anyhow::Result<()> {
2198 1 : let nodes = make_test_nodes(
2199 1 : 3,
2200 1 : &[
2201 1 : AvailabilityZone("az-a".to_string()),
2202 1 : AvailabilityZone("az-b".to_string()),
2203 1 : AvailabilityZone("az-c".to_string()),
2204 1 : ],
2205 1 : );
2206 1 : let mut scheduler = Scheduler::new(nodes.values());
2207 1 :
2208 1 : // Two shards of a tenant that wants to be in AZ A
2209 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2210 1 : shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2211 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
2212 1 : shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2213 1 :
2214 1 : // Both shards are initially attached in non-home AZ _and_ have secondaries in non-home AZs
2215 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
2216 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
2217 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(3)));
2218 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
2219 :
2220 3 : fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
2221 3 : let mut schedule_context = ScheduleContext::default();
2222 3 : schedule_context.avoid(&shard_a.intent.all_pageservers());
2223 3 : schedule_context.avoid(&shard_b.intent.all_pageservers());
2224 3 : schedule_context
2225 3 : }
2226 :
2227 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2228 1 : let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2229 1 : assert_eq!(
2230 1 : optimization_a_prepare,
2231 1 : Some(ScheduleOptimization {
2232 1 : sequence: shard_a.sequence,
2233 1 : action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
2234 1 : })
2235 1 : );
2236 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
2237 1 :
2238 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2239 1 : let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2240 1 : assert_eq!(
2241 1 : optimization_a_migrate,
2242 1 : Some(ScheduleOptimization {
2243 1 : sequence: shard_a.sequence,
2244 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2245 1 : old_attached_node_id: NodeId(2),
2246 1 : new_attached_node_id: NodeId(1)
2247 1 : })
2248 1 : })
2249 1 : );
2250 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
2251 1 :
2252 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2253 1 : let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2254 1 : assert_eq!(
2255 1 : optimization_a_cleanup,
2256 1 : Some(ScheduleOptimization {
2257 1 : sequence: shard_a.sequence,
2258 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
2259 1 : })
2260 1 : );
2261 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
2262 1 :
2263 1 : // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
2264 1 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2265 1 : // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
2266 1 :
2267 1 : shard_a.intent.clear(&mut scheduler);
2268 1 : shard_b.intent.clear(&mut scheduler);
2269 1 :
2270 1 : Ok(())
2271 1 : }
2272 :
2273 : #[test]
2274 : /// Check that multi-step migration works when moving to somewhere that is only better by
2275 : /// 1 AffinityScore -- this ensures that we don't have a bug like the intermediate secondary
2276 : /// counting toward the affinity score such that it prevents the rest of the migration from happening.
2277 1 : fn optimize_attachment_marginal() -> anyhow::Result<()> {
2278 1 : let nodes = make_test_nodes(2, &[]);
2279 1 : let mut scheduler = Scheduler::new(nodes.values());
2280 1 :
2281 1 : // Multi-sharded tenant, we will craft a situation where affinity
2282 1 : // scores differ only slightly
2283 1 : let mut shards = make_test_tenant(PlacementPolicy::Attached(0), ShardCount::new(4), None);
2284 1 :
2285 1 : // 1 attached on node 1
2286 1 : shards[0]
2287 1 : .intent
2288 1 : .set_attached(&mut scheduler, Some(NodeId(1)));
2289 1 : // 3 attached on node 2
2290 1 : shards[1]
2291 1 : .intent
2292 1 : .set_attached(&mut scheduler, Some(NodeId(2)));
2293 1 : shards[2]
2294 1 : .intent
2295 1 : .set_attached(&mut scheduler, Some(NodeId(2)));
2296 1 : shards[3]
2297 1 : .intent
2298 1 : .set_attached(&mut scheduler, Some(NodeId(2)));
2299 :
2300 : // The scheduler should figure out that we need to:
2301 : // - Create a secondary for shard 3 on node 1
2302 : // - Migrate shard 3 to node 1
2303 : // - Remove shard 3's location on node 2
2304 :
2305 4 : fn make_schedule_context(shards: &Vec<TenantShard>) -> ScheduleContext {
2306 4 : let mut schedule_context = ScheduleContext::default();
2307 20 : for shard in shards {
2308 16 : schedule_context.avoid(&shard.intent.all_pageservers());
2309 16 : }
2310 4 : schedule_context
2311 4 : }
2312 :
2313 1 : let schedule_context = make_schedule_context(&shards);
2314 1 : let optimization_a_prepare =
2315 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context);
2316 1 : assert_eq!(
2317 1 : optimization_a_prepare,
2318 1 : Some(ScheduleOptimization {
2319 1 : sequence: shards[1].sequence,
2320 1 : action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
2321 1 : })
2322 1 : );
2323 1 : shards[1].apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
2324 1 :
2325 1 : let schedule_context = make_schedule_context(&shards);
2326 1 : let optimization_a_migrate =
2327 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context);
2328 1 : assert_eq!(
2329 1 : optimization_a_migrate,
2330 1 : Some(ScheduleOptimization {
2331 1 : sequence: shards[1].sequence,
2332 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2333 1 : old_attached_node_id: NodeId(2),
2334 1 : new_attached_node_id: NodeId(1)
2335 1 : })
2336 1 : })
2337 1 : );
2338 1 : shards[1].apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
2339 1 :
2340 1 : let schedule_context = make_schedule_context(&shards);
2341 1 : let optimization_a_cleanup =
2342 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context);
2343 1 : assert_eq!(
2344 1 : optimization_a_cleanup,
2345 1 : Some(ScheduleOptimization {
2346 1 : sequence: shards[1].sequence,
2347 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
2348 1 : })
2349 1 : );
2350 1 : shards[1].apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
2351 1 :
2352 1 : // Everything should be stable now
2353 1 : let schedule_context = make_schedule_context(&shards);
2354 1 : assert_eq!(
2355 1 : shards[0].optimize_attachment(&mut scheduler, &schedule_context),
2356 1 : None
2357 1 : );
2358 1 : assert_eq!(
2359 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context),
2360 1 : None
2361 1 : );
2362 1 : assert_eq!(
2363 1 : shards[2].optimize_attachment(&mut scheduler, &schedule_context),
2364 1 : None
2365 1 : );
2366 1 : assert_eq!(
2367 1 : shards[3].optimize_attachment(&mut scheduler, &schedule_context),
2368 1 : None
2369 1 : );
2370 :
2371 5 : for mut shard in shards {
2372 4 : shard.intent.clear(&mut scheduler);
2373 4 : }
2374 :
2375 1 : Ok(())
2376 1 : }
2377 :
2378 : #[test]
2379 1 : fn optimize_secondary() -> anyhow::Result<()> {
2380 1 : let nodes = make_test_nodes(4, &[]);
2381 1 : let mut scheduler = Scheduler::new(nodes.values());
2382 1 :
2383 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2384 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
2385 1 :
2386 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
2387 1 : // on different nodes.
2388 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
2389 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
2390 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
2391 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
2392 1 :
2393 1 : let mut schedule_context = ScheduleContext::default();
2394 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
2395 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
2396 1 :
2397 1 : let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);
2398 1 :
2399 1 : // Since there is a node with no locations available, the node with two locations for the
2400 1 : // same tenant should generate an optimization to move one away
2401 1 : assert_eq!(
2402 1 : optimization_a,
2403 1 : Some(ScheduleOptimization {
2404 1 : sequence: shard_a.sequence,
2405 1 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
2406 1 : old_node_id: NodeId(3),
2407 1 : new_node_id: NodeId(4)
2408 1 : })
2409 1 : })
2410 1 : );
2411 :
2412 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
2413 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
2414 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
2415 :
2416 1 : shard_a.intent.clear(&mut scheduler);
2417 1 : shard_b.intent.clear(&mut scheduler);
2418 1 :
2419 1 : Ok(())
2420 1 : }
2421 :
2422 : /// Test how the optimisation code behaves with an extra secondary
2423 : #[test]
2424 1 : fn optimize_removes_secondary() -> anyhow::Result<()> {
2425 1 : let az_a_tag = AvailabilityZone("az-a".to_string());
2426 1 : let az_b_tag = AvailabilityZone("az-b".to_string());
2427 1 : let mut nodes = make_test_nodes(
2428 1 : 4,
2429 1 : &[
2430 1 : az_a_tag.clone(),
2431 1 : az_b_tag.clone(),
2432 1 : az_a_tag.clone(),
2433 1 : az_b_tag.clone(),
2434 1 : ],
2435 1 : );
2436 1 : let mut scheduler = Scheduler::new(nodes.values());
2437 1 :
2438 1 : let mut schedule_context = ScheduleContext::default();
2439 1 :
2440 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2441 1 : shard_a.intent.preferred_az_id = Some(az_a_tag.clone());
2442 1 : shard_a
2443 1 : .schedule(&mut scheduler, &mut schedule_context)
2444 1 : .unwrap();
2445 1 :
2446 1 : // Attached on node 1, secondary on node 2
2447 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
2448 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(2)]);
2449 :
2450 : // Initially optimiser is idle
2451 1 : assert_eq!(
2452 1 : shard_a.optimize_attachment(&mut scheduler, &schedule_context),
2453 1 : None
2454 1 : );
2455 1 : assert_eq!(
2456 1 : shard_a.optimize_secondary(&mut scheduler, &schedule_context),
2457 1 : None
2458 1 : );
2459 :
2460 : // A spare secondary in the home AZ: it should be removed -- this is the situation when we're midway through a graceful migration, after cutting over
2461 : // to our new location
2462 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
2463 1 : let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2464 1 : assert_eq!(
2465 1 : optimization,
2466 1 : Some(ScheduleOptimization {
2467 1 : sequence: shard_a.sequence,
2468 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
2469 1 : })
2470 1 : );
2471 1 : shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
2472 1 :
2473 1 : // A spare secondary in the non-home AZ, and one of them is offline
2474 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(4));
2475 1 : nodes
2476 1 : .get_mut(&NodeId(4))
2477 1 : .unwrap()
2478 1 : .set_availability(NodeAvailability::Offline);
2479 1 : scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
2480 1 : let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2481 1 : assert_eq!(
2482 1 : optimization,
2483 1 : Some(ScheduleOptimization {
2484 1 : sequence: shard_a.sequence,
2485 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(4))
2486 1 : })
2487 1 : );
2488 1 : shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
2489 1 :
2490 1 : // A spare secondary when should have none
2491 1 : shard_a.policy = PlacementPolicy::Attached(0);
2492 1 : let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2493 1 : assert_eq!(
2494 1 : optimization,
2495 1 : Some(ScheduleOptimization {
2496 1 : sequence: shard_a.sequence,
2497 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
2498 1 : })
2499 1 : );
2500 1 : shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
2501 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
2502 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![]);
2503 :
2504 : // Check that in secondary mode, we preserve the secondary in the preferred AZ
2505 1 : let mut schedule_context = ScheduleContext::default(); // Fresh context, we're about to call schedule()
2506 1 : shard_a.policy = PlacementPolicy::Secondary;
2507 1 : shard_a
2508 1 : .schedule(&mut scheduler, &mut schedule_context)
2509 1 : .unwrap();
2510 1 : assert_eq!(shard_a.intent.get_attached(), &None);
2511 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
2512 1 : assert_eq!(
2513 1 : shard_a.optimize_attachment(&mut scheduler, &schedule_context),
2514 1 : None
2515 1 : );
2516 1 : assert_eq!(
2517 1 : shard_a.optimize_secondary(&mut scheduler, &schedule_context),
2518 1 : None
2519 1 : );
2520 :
2521 1 : shard_a.intent.clear(&mut scheduler);
2522 1 :
2523 1 : Ok(())
2524 1 : }
2525 :
2526 : // Optimize til quiescent: this emulates what Service::optimize_all does, when
2527 : // called repeatedly in the background.
2528 : // Returns the applied optimizations
2529 3 : fn optimize_til_idle(
2530 3 : scheduler: &mut Scheduler,
2531 3 : shards: &mut [TenantShard],
2532 3 : ) -> Vec<ScheduleOptimization> {
2533 3 : let mut loop_n = 0;
2534 3 : let mut optimizations = Vec::default();
2535 : loop {
2536 6 : let mut schedule_context = ScheduleContext::default();
2537 6 : let mut any_changed = false;
2538 :
2539 24 : for shard in shards.iter() {
2540 24 : schedule_context.avoid(&shard.intent.all_pageservers());
2541 24 : }
2542 :
2543 15 : for shard in shards.iter_mut() {
2544 15 : let optimization = shard.optimize_attachment(scheduler, &schedule_context);
2545 15 : tracing::info!(
2546 0 : "optimize_attachment({})={:?}",
2547 : shard.tenant_shard_id,
2548 : optimization
2549 : );
2550 15 : if let Some(optimization) = optimization {
2551 : // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
2552 3 : assert!(shard.maybe_optimizable(scheduler, &schedule_context));
2553 3 : optimizations.push(optimization.clone());
2554 3 : shard.apply_optimization(scheduler, optimization);
2555 3 : any_changed = true;
2556 3 : break;
2557 12 : }
2558 12 :
2559 12 : let optimization = shard.optimize_secondary(scheduler, &schedule_context);
2560 12 : tracing::info!(
2561 0 : "optimize_secondary({})={:?}",
2562 : shard.tenant_shard_id,
2563 : optimization
2564 : );
2565 12 : if let Some(optimization) = optimization {
2566 : // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
2567 0 : assert!(shard.maybe_optimizable(scheduler, &schedule_context));
2568 :
2569 0 : optimizations.push(optimization.clone());
2570 0 : shard.apply_optimization(scheduler, optimization);
2571 0 : any_changed = true;
2572 0 : break;
2573 12 : }
2574 : }
2575 :
2576 6 : if !any_changed {
2577 3 : break;
2578 3 : }
2579 3 :
2580 3 : // Assert no infinite loop
2581 3 : loop_n += 1;
2582 3 : assert!(loop_n < 1000);
2583 : }
2584 :
2585 3 : optimizations
2586 3 : }
2587 :
2588 : /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
2589 : /// that it converges.
2590 : #[test]
2591 1 : fn optimize_add_nodes() -> anyhow::Result<()> {
2592 1 : let nodes = make_test_nodes(
2593 1 : 9,
2594 1 : &[
2595 1 : // Initial 6 nodes
2596 1 : AvailabilityZone("az-a".to_string()),
2597 1 : AvailabilityZone("az-a".to_string()),
2598 1 : AvailabilityZone("az-b".to_string()),
2599 1 : AvailabilityZone("az-b".to_string()),
2600 1 : AvailabilityZone("az-c".to_string()),
2601 1 : AvailabilityZone("az-c".to_string()),
2602 1 : // Three we will add later
2603 1 : AvailabilityZone("az-a".to_string()),
2604 1 : AvailabilityZone("az-b".to_string()),
2605 1 : AvailabilityZone("az-c".to_string()),
2606 1 : ],
2607 1 : );
2608 1 :
2609 1 : // Only show the scheduler two nodes in each AZ to start with
2610 1 : let mut scheduler = Scheduler::new([].iter());
2611 7 : for i in 1..=6 {
2612 6 : scheduler.node_upsert(nodes.get(&NodeId(i)).unwrap());
2613 6 : }
2614 :
2615 1 : let mut shards = make_test_tenant(
2616 1 : PlacementPolicy::Attached(1),
2617 1 : ShardCount::new(4),
2618 1 : Some(AvailabilityZone("az-a".to_string())),
2619 1 : );
2620 1 : let mut schedule_context = ScheduleContext::default();
2621 5 : for shard in &mut shards {
2622 4 : assert!(
2623 4 : shard
2624 4 : .schedule(&mut scheduler, &mut schedule_context)
2625 4 : .is_ok()
2626 4 : );
2627 : }
2628 :
2629 : // Initial: attached locations land in the tenant's home AZ.
2630 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
2631 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
2632 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
2633 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
2634 :
2635 : // Initial: secondary locations in a remote AZ
2636 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
2637 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
2638 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
2639 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
2640 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
2641 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
2642 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
2643 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
2644 :
2645 : // Add another three nodes: we should see the shards spread out when their optimize
2646 : // methods are called
2647 1 : scheduler.node_upsert(nodes.get(&NodeId(7)).unwrap());
2648 1 : scheduler.node_upsert(nodes.get(&NodeId(8)).unwrap());
2649 1 : scheduler.node_upsert(nodes.get(&NodeId(9)).unwrap());
2650 1 : optimize_til_idle(&mut scheduler, &mut shards);
2651 1 :
2652 1 : // We expect one attached location was moved to the new node in the tenant's home AZ
2653 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(7)), 1);
2654 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(7)), 1);
2655 : // The original node has one less attached shard
2656 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
2657 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
2658 :
2659 : // One of the original nodes still has two attachments, since there are an odd number of nodes
2660 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
2661 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
2662 :
2663 : // None of our secondaries moved, since we already had enough nodes for those to be
2664 : // scheduled perfectly
2665 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
2666 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
2667 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
2668 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
2669 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
2670 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
2671 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
2672 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
2673 :
2674 4 : for shard in shards.iter_mut() {
2675 4 : shard.intent.clear(&mut scheduler);
2676 4 : }
2677 :
2678 1 : Ok(())
2679 1 : }
2680 :
2681 : /// Test that initial shard scheduling is optimal. By optimal we mean
2682 : /// that the optimizer cannot find a way to improve it.
2683 : ///
2684 : /// This test is an example of the scheduling issue described in
2685 : /// https://github.com/neondatabase/neon/issues/8969
2686 : #[test]
2687 1 : fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
2688 : use itertools::Itertools;
2689 :
2690 1 : let nodes = make_test_nodes(2, &[]);
2691 1 :
2692 1 : let mut scheduler = Scheduler::new([].iter());
2693 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
2694 1 : scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
2695 1 :
2696 1 : let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
2697 1 : let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
2698 1 :
2699 1 : let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
2700 1 : let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
2701 1 :
2702 4 : let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
2703 4 : let b_shards_with_context = b.iter_mut().map(|shard| (shard, b_context.clone()));
2704 1 :
2705 1 : let schedule_order = a_shards_with_context.interleave(b_shards_with_context);
2706 :
2707 9 : for (shard, context) in schedule_order {
2708 8 : let context = &mut *context.borrow_mut();
2709 8 : shard.schedule(&mut scheduler, context).unwrap();
2710 8 : }
2711 :
2712 1 : let applied_to_a = optimize_til_idle(&mut scheduler, &mut a);
2713 1 : assert_eq!(applied_to_a, vec![]);
2714 :
2715 1 : let applied_to_b = optimize_til_idle(&mut scheduler, &mut b);
2716 1 : assert_eq!(applied_to_b, vec![]);
2717 :
2718 8 : for shard in a.iter_mut().chain(b.iter_mut()) {
2719 8 : shard.intent.clear(&mut scheduler);
2720 8 : }
2721 :
2722 1 : Ok(())
2723 1 : }
2724 :
2725 : #[test]
2726 1 : fn random_az_shard_scheduling() -> anyhow::Result<()> {
2727 : use rand::seq::SliceRandom;
2728 :
2729 51 : for seed in 0..50 {
2730 50 : eprintln!("Running test with seed {seed}");
2731 50 : let mut rng = StdRng::seed_from_u64(seed);
2732 50 :
2733 50 : let az_a_tag = AvailabilityZone("az-a".to_string());
2734 50 : let az_b_tag = AvailabilityZone("az-b".to_string());
2735 50 : let azs = [az_a_tag, az_b_tag];
2736 50 : let nodes = make_test_nodes(4, &azs);
2737 50 : let mut shards_per_az: HashMap<AvailabilityZone, u32> = HashMap::new();
2738 50 :
2739 50 : let mut scheduler = Scheduler::new([].iter());
2740 200 : for node in nodes.values() {
2741 200 : scheduler.node_upsert(node);
2742 200 : }
2743 :
2744 50 : let mut shards = Vec::default();
2745 50 : let mut contexts = Vec::default();
2746 50 : let mut az_picker = azs.iter().cycle().cloned();
2747 5050 : for i in 0..100 {
2748 5000 : let az = az_picker.next().unwrap();
2749 5000 : let shard_count = i % 4 + 1;
2750 5000 : *shards_per_az.entry(az.clone()).or_default() += shard_count;
2751 5000 :
2752 5000 : let tenant_shards = make_test_tenant(
2753 5000 : PlacementPolicy::Attached(1),
2754 5000 : ShardCount::new(shard_count.try_into().unwrap()),
2755 5000 : Some(az),
2756 5000 : );
2757 5000 : let context = Rc::new(RefCell::new(ScheduleContext::default()));
2758 5000 :
2759 5000 : contexts.push(context.clone());
2760 5000 : let with_ctx = tenant_shards
2761 5000 : .into_iter()
2762 12500 : .map(|shard| (shard, context.clone()));
2763 17500 : for shard_with_ctx in with_ctx {
2764 12500 : shards.push(shard_with_ctx);
2765 12500 : }
2766 : }
2767 :
2768 50 : shards.shuffle(&mut rng);
2769 :
2770 : #[derive(Default, Debug)]
2771 : struct NodeStats {
2772 : attachments: u32,
2773 : secondaries: u32,
2774 : }
2775 :
2776 50 : let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::default();
2777 50 : let mut attachments_in_wrong_az = 0;
2778 50 : let mut secondaries_in_wrong_az = 0;
2779 :
2780 12550 : for (shard, context) in &mut shards {
2781 12500 : let context = &mut *context.borrow_mut();
2782 12500 : shard.schedule(&mut scheduler, context).unwrap();
2783 12500 :
2784 12500 : let attached_node = shard.intent.get_attached().unwrap();
2785 12500 : let stats = node_stats.entry(attached_node).or_default();
2786 12500 : stats.attachments += 1;
2787 12500 :
2788 12500 : let secondary_node = *shard.intent.get_secondary().first().unwrap();
2789 12500 : let stats = node_stats.entry(secondary_node).or_default();
2790 12500 : stats.secondaries += 1;
2791 12500 :
2792 12500 : let attached_node_az = nodes
2793 12500 : .get(&attached_node)
2794 12500 : .unwrap()
2795 12500 : .get_availability_zone_id();
2796 12500 : let secondary_node_az = nodes
2797 12500 : .get(&secondary_node)
2798 12500 : .unwrap()
2799 12500 : .get_availability_zone_id();
2800 12500 : let preferred_az = shard.preferred_az().unwrap();
2801 12500 :
2802 12500 : if attached_node_az != preferred_az {
2803 0 : eprintln!(
2804 0 : "{} attachment was scheduled in AZ {} but preferred AZ {}",
2805 0 : shard.tenant_shard_id, attached_node_az, preferred_az
2806 0 : );
2807 0 : attachments_in_wrong_az += 1;
2808 12500 : }
2809 :
2810 12500 : if secondary_node_az == preferred_az {
2811 0 : eprintln!(
2812 0 : "{} secondary was scheduled in AZ {} which matches preference",
2813 0 : shard.tenant_shard_id, attached_node_az
2814 0 : );
2815 0 : secondaries_in_wrong_az += 1;
2816 12500 : }
2817 : }
2818 :
2819 50 : let mut violations = Vec::default();
2820 50 :
2821 50 : if attachments_in_wrong_az > 0 {
2822 0 : violations.push(format!(
2823 0 : "{} attachments scheduled to the incorrect AZ",
2824 0 : attachments_in_wrong_az
2825 0 : ));
2826 50 : }
2827 :
2828 50 : if secondaries_in_wrong_az > 0 {
2829 0 : violations.push(format!(
2830 0 : "{} secondaries scheduled to the incorrect AZ",
2831 0 : secondaries_in_wrong_az
2832 0 : ));
2833 50 : }
2834 :
2835 50 : eprintln!(
2836 50 : "attachments_in_wrong_az={} secondaries_in_wrong_az={}",
2837 50 : attachments_in_wrong_az, secondaries_in_wrong_az
2838 50 : );
2839 :
2840 250 : for (node_id, stats) in &node_stats {
2841 200 : let node_az = nodes.get(node_id).unwrap().get_availability_zone_id();
2842 200 : let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2;
2843 200 : let allowed_attachment_load =
2844 200 : (ideal_attachment_load - 1)..(ideal_attachment_load + 2);
2845 200 :
2846 200 : if !allowed_attachment_load.contains(&stats.attachments) {
2847 0 : violations.push(format!(
2848 0 : "Found {} attachments on node {}, but expected {}",
2849 0 : stats.attachments, node_id, ideal_attachment_load
2850 0 : ));
2851 200 : }
2852 :
2853 200 : eprintln!(
2854 200 : "{}: attachments={} secondaries={} ideal_attachment_load={}",
2855 200 : node_id, stats.attachments, stats.secondaries, ideal_attachment_load
2856 200 : );
2857 : }
2858 :
2859 50 : assert!(violations.is_empty(), "{violations:?}");
2860 :
2861 12550 : for (mut shard, _ctx) in shards {
2862 12500 : shard.intent.clear(&mut scheduler);
2863 12500 : }
2864 : }
2865 1 : Ok(())
2866 1 : }
2867 :
2868 : /// Check how the shard's scheduling behaves when in PlacementPolicy::Secondary mode.
2869 : #[test]
2870 1 : fn tenant_secondary_scheduling() -> anyhow::Result<()> {
2871 1 : let az_a = AvailabilityZone("az-a".to_string());
2872 1 : let nodes = make_test_nodes(
2873 1 : 3,
2874 1 : &[
2875 1 : az_a.clone(),
2876 1 : AvailabilityZone("az-b".to_string()),
2877 1 : AvailabilityZone("az-c".to_string()),
2878 1 : ],
2879 1 : );
2880 1 :
2881 1 : let mut scheduler = Scheduler::new(nodes.values());
2882 1 : let mut context = ScheduleContext::default();
2883 1 :
2884 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Secondary);
2885 1 : tenant_shard.intent.preferred_az_id = Some(az_a.clone());
2886 1 : tenant_shard
2887 1 : .schedule(&mut scheduler, &mut context)
2888 1 : .expect("we have enough nodes, scheduling should work");
2889 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
2890 1 : assert!(tenant_shard.intent.attached.is_none());
2891 :
2892 : // Should have scheduled into the preferred AZ
2893 1 : assert_eq!(
2894 1 : scheduler
2895 1 : .get_node_az(&tenant_shard.intent.secondary[0])
2896 1 : .as_ref(),
2897 1 : tenant_shard.preferred_az()
2898 1 : );
2899 :
2900 : // Optimizer should agree
2901 1 : assert_eq!(
2902 1 : tenant_shard.optimize_attachment(&mut scheduler, &context),
2903 1 : None
2904 1 : );
2905 1 : assert_eq!(
2906 1 : tenant_shard.optimize_secondary(&mut scheduler, &context),
2907 1 : None
2908 1 : );
2909 :
2910 : // Switch to PlacementPolicy::Attached
2911 1 : tenant_shard.policy = PlacementPolicy::Attached(1);
2912 1 : tenant_shard
2913 1 : .schedule(&mut scheduler, &mut context)
2914 1 : .expect("we have enough nodes, scheduling should work");
2915 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
2916 1 : assert!(tenant_shard.intent.attached.is_some());
2917 : // Secondary should now be in non-preferred AZ
2918 1 : assert_ne!(
2919 1 : scheduler
2920 1 : .get_node_az(&tenant_shard.intent.secondary[0])
2921 1 : .as_ref(),
2922 1 : tenant_shard.preferred_az()
2923 1 : );
2924 : // Attached should be in preferred AZ
2925 1 : assert_eq!(
2926 1 : scheduler
2927 1 : .get_node_az(&tenant_shard.intent.attached.unwrap())
2928 1 : .as_ref(),
2929 1 : tenant_shard.preferred_az()
2930 1 : );
2931 :
2932 : // Optimizer should agree
2933 1 : assert_eq!(
2934 1 : tenant_shard.optimize_attachment(&mut scheduler, &context),
2935 1 : None
2936 1 : );
2937 1 : assert_eq!(
2938 1 : tenant_shard.optimize_secondary(&mut scheduler, &context),
2939 1 : None
2940 1 : );
2941 :
2942 : // Switch back to PlacementPolicy::Secondary
2943 1 : tenant_shard.policy = PlacementPolicy::Secondary;
2944 1 : tenant_shard
2945 1 : .schedule(&mut scheduler, &mut context)
2946 1 : .expect("we have enough nodes, scheduling should work");
2947 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
2948 1 : assert!(tenant_shard.intent.attached.is_none());
2949 : // When we picked a location to keep, we should have kept the one in the preferred AZ
2950 1 : assert_eq!(
2951 1 : scheduler
2952 1 : .get_node_az(&tenant_shard.intent.secondary[0])
2953 1 : .as_ref(),
2954 1 : tenant_shard.preferred_az()
2955 1 : );
2956 :
2957 : // Optimizer should agree
2958 1 : assert_eq!(
2959 1 : tenant_shard.optimize_attachment(&mut scheduler, &context),
2960 1 : None
2961 1 : );
2962 1 : assert_eq!(
2963 1 : tenant_shard.optimize_secondary(&mut scheduler, &context),
2964 1 : None
2965 1 : );
2966 :
2967 1 : tenant_shard.intent.clear(&mut scheduler);
2968 1 :
2969 1 : Ok(())
2970 1 : }
2971 : }
|