Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::sync::Arc;
3 : use std::time::Duration;
4 :
5 : use futures::future::{self, Either};
6 : use itertools::Itertools;
7 : use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy, ShardSchedulingPolicy};
8 : use pageserver_api::models::{LocationConfig, LocationConfigMode, TenantConfig};
9 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
10 : use serde::{Deserialize, Serialize};
11 : use tokio::task::JoinHandle;
12 : use tokio_util::sync::CancellationToken;
13 : use tracing::{Instrument, instrument};
14 : use utils::generation::Generation;
15 : use utils::id::NodeId;
16 : use utils::seqwait::{SeqWait, SeqWaitError};
17 : use utils::shard::ShardCount;
18 : use utils::sync::gate::GateGuard;
19 :
20 : use crate::compute_hook::ComputeHook;
21 : use crate::metrics::{
22 : self, ReconcileCompleteLabelGroup, ReconcileLongRunningLabelGroup, ReconcileOutcome,
23 : };
24 : use crate::node::Node;
25 : use crate::persistence::split_state::SplitState;
26 : use crate::persistence::{Persistence, TenantShardPersistence};
27 : use crate::reconciler::{
28 : ReconcileError, ReconcileUnits, Reconciler, ReconcilerConfig, TargetState,
29 : attached_location_conf, secondary_location_conf,
30 : };
31 : use crate::scheduler::{
32 : AffinityScore, AttachedShardTag, NodeSchedulingScore, NodeSecondarySchedulingScore,
33 : RefCountUpdate, ScheduleContext, ScheduleError, Scheduler, SecondaryShardTag, ShardTag,
34 : };
35 : use crate::service::ReconcileResultRequest;
36 : use crate::timeline_import::TimelineImportState;
37 : use crate::{Sequence, service};
38 :
39 : /// Serialization helper
40 0 : fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
41 0 : where
42 0 : S: serde::ser::Serializer,
43 0 : T: std::fmt::Display,
44 : {
45 0 : serializer.collect_str(
46 0 : &v.lock()
47 0 : .unwrap()
48 0 : .as_ref()
49 0 : .map(|e| format!("{e}"))
50 0 : .unwrap_or("".to_string()),
51 : )
52 0 : }
53 :
54 : /// In-memory state for a particular tenant shard.
55 : ///
56 : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
57 : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
58 : #[derive(Serialize)]
59 : pub(crate) struct TenantShard {
60 : pub(crate) tenant_shard_id: TenantShardId,
61 :
62 : pub(crate) shard: ShardIdentity,
63 :
64 : // Runtime only: sequence used to coordinate when updating this object while
65 : // with background reconcilers may be running. A reconciler runs to a particular
66 : // sequence.
67 : pub(crate) sequence: Sequence,
68 :
69 : // Latest generation number: next time we attach, increment this
70 : // and use the incremented number when attaching.
71 : //
72 : // None represents an incompletely onboarded tenant via the [`Service::location_config`]
73 : // API, where this tenant may only run in PlacementPolicy::Secondary.
74 : pub(crate) generation: Option<Generation>,
75 :
76 : // High level description of how the tenant should be set up. Provided
77 : // externally.
78 : pub(crate) policy: PlacementPolicy,
79 :
80 : // Low level description of exactly which pageservers should fulfil
81 : // which role. Generated by `Self::schedule`.
82 : pub(crate) intent: IntentState,
83 :
84 : // Low level description of how the tenant is configured on pageservers:
85 : // if this does not match `Self::intent` then the tenant needs reconciliation
86 : // with `Self::reconcile`.
87 : pub(crate) observed: ObservedState,
88 :
89 : // Tenant configuration, passed through opaquely to the pageserver. Identical
90 : // for all shards in a tenant.
91 : pub(crate) config: TenantConfig,
92 :
93 : /// If a reconcile task is currently in flight, it may be joined here (it is
94 : /// only safe to join if either the result has been received or the reconciler's
95 : /// cancellation token has been fired)
96 : #[serde(skip)]
97 : pub(crate) reconciler: Option<ReconcilerHandle>,
98 :
99 : /// If a tenant is being split, then all shards with that TenantId will have a
100 : /// SplitState set, this acts as a guard against other operations such as background
101 : /// reconciliation, and timeline creation.
102 : pub(crate) splitting: SplitState,
103 :
104 : /// Flag indicating whether the tenant has an in-progress timeline import.
105 : /// Used to disallow shard splits while an import is in progress.
106 : pub(crate) importing: TimelineImportState,
107 :
108 : /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
109 : /// is set. This flag is cleared when the tenant is popped off the delay queue.
110 : pub(crate) delayed_reconcile: bool,
111 :
112 : /// Optionally wait for reconciliation to complete up to a particular
113 : /// sequence number.
114 : #[serde(skip)]
115 : pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
116 :
117 : /// Indicates sequence number for which we have encountered an error reconciling. If
118 : /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
119 : /// and callers should stop waiting for `waiter` and propagate the error.
120 : #[serde(skip)]
121 : pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
122 :
123 : /// The most recent error from a reconcile on this tenant. This is a nested Arc
124 : /// because:
125 : /// - ReconcileWaiters need to Arc-clone the overall object to read it later
126 : /// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
127 : /// many waiters for one shard, and the underlying error types are not Clone.
128 : ///
129 : /// TODO: generalize to an array of recent events
130 : /// TOOD: use a ArcSwap instead of mutex for faster reads?
131 : #[serde(serialize_with = "read_last_error")]
132 : pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
133 :
134 : /// Amount of consecutive [`crate::service::Service::reconcile_all`] iterations that have been
135 : /// scheduled a reconciliation for this shard.
136 : ///
137 : /// If this reaches `MAX_CONSECUTIVE_RECONCILES`, the shard is considered "stuck" and will be
138 : /// ignored when deciding whether optimizations can run. This includes both successful and failed
139 : /// reconciliations.
140 : ///
141 : /// Incremented in [`crate::service::Service::process_result`], and reset to 0 when
142 : /// [`crate::service::Service::reconcile_all`] determines no reconciliation is needed for this shard.
143 : pub(crate) consecutive_reconciles_count: usize,
144 :
145 : /// If we have a pending compute notification that for some reason we weren't able to send,
146 : /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
147 : /// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
148 : /// of state that we publish externally in an eventually consistent way.
149 : pub(crate) pending_compute_notification: bool,
150 :
151 : /// To do a graceful migration, set this field to the destination pageserver, and optimization
152 : /// functions will consider this node the best location and react appropriately.
153 : preferred_node: Option<NodeId>,
154 :
155 : // Support/debug tool: if something is going wrong or flapping with scheduling, this may
156 : // be set to a non-active state to avoid making changes while the issue is fixed.
157 : scheduling_policy: ShardSchedulingPolicy,
158 : }
159 :
160 : #[derive(Clone, Debug, Serialize)]
161 : pub(crate) struct IntentState {
162 : attached: Option<NodeId>,
163 : secondary: Vec<NodeId>,
164 :
165 : // We should attempt to schedule this shard in the provided AZ to
166 : // decrease chances of cross-AZ compute.
167 : preferred_az_id: Option<AvailabilityZone>,
168 : }
169 :
170 : impl IntentState {
171 12860 : pub(crate) fn new(preferred_az_id: Option<AvailabilityZone>) -> Self {
172 12860 : Self {
173 12860 : attached: None,
174 12860 : secondary: vec![],
175 12860 : preferred_az_id,
176 12860 : }
177 12860 : }
178 0 : pub(crate) fn single(
179 0 : scheduler: &mut Scheduler,
180 0 : node_id: Option<NodeId>,
181 0 : preferred_az_id: Option<AvailabilityZone>,
182 0 : ) -> Self {
183 0 : if let Some(node_id) = node_id {
184 0 : scheduler.update_node_ref_counts(
185 0 : node_id,
186 0 : preferred_az_id.as_ref(),
187 0 : RefCountUpdate::Attach,
188 0 : );
189 0 : }
190 0 : Self {
191 0 : attached: node_id,
192 0 : secondary: vec![],
193 0 : preferred_az_id,
194 0 : }
195 0 : }
196 :
197 12859 : pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
198 12859 : if self.attached != new_attached {
199 12859 : if let Some(old_attached) = self.attached.take() {
200 0 : scheduler.update_node_ref_counts(
201 0 : old_attached,
202 0 : self.preferred_az_id.as_ref(),
203 0 : RefCountUpdate::Detach,
204 0 : );
205 12859 : }
206 12859 : if let Some(new_attached) = &new_attached {
207 12859 : scheduler.update_node_ref_counts(
208 12859 : *new_attached,
209 12859 : self.preferred_az_id.as_ref(),
210 12859 : RefCountUpdate::Attach,
211 12859 : );
212 12859 : }
213 12859 : self.attached = new_attached;
214 0 : }
215 :
216 12859 : if let Some(new_attached) = &new_attached {
217 12859 : assert!(!self.secondary.contains(new_attached));
218 0 : }
219 12859 : }
220 :
221 : /// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
222 : /// secondary to attached while maintaining the scheduler's reference counts.
223 7 : pub(crate) fn promote_attached(
224 7 : &mut self,
225 7 : scheduler: &mut Scheduler,
226 7 : promote_secondary: NodeId,
227 7 : ) {
228 : // If we call this with a node that isn't in secondary, it would cause incorrect
229 : // scheduler reference counting, since we assume the node is already referenced as a secondary.
230 7 : debug_assert!(self.secondary.contains(&promote_secondary));
231 :
232 16 : self.secondary.retain(|n| n != &promote_secondary);
233 :
234 7 : let demoted = self.attached;
235 7 : self.attached = Some(promote_secondary);
236 :
237 7 : scheduler.update_node_ref_counts(
238 7 : promote_secondary,
239 7 : self.preferred_az_id.as_ref(),
240 7 : RefCountUpdate::PromoteSecondary,
241 : );
242 7 : if let Some(demoted) = demoted {
243 0 : scheduler.update_node_ref_counts(
244 0 : demoted,
245 0 : self.preferred_az_id.as_ref(),
246 0 : RefCountUpdate::DemoteAttached,
247 0 : );
248 7 : }
249 7 : }
250 :
251 12848 : pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
252 : // Every assertion here should probably have a corresponding check in
253 : // `validate_optimization` unless it is an invariant that should never be violated. Note
254 : // that the lock is not held between planning optimizations and applying them so you have to
255 : // assume any valid state transition of the intent state may have occurred
256 12848 : assert!(!self.secondary.contains(&new_secondary));
257 12848 : assert!(self.attached != Some(new_secondary));
258 12848 : scheduler.update_node_ref_counts(
259 12848 : new_secondary,
260 12848 : self.preferred_az_id.as_ref(),
261 12848 : RefCountUpdate::AddSecondary,
262 : );
263 12848 : self.secondary.push(new_secondary);
264 12848 : }
265 :
266 : /// It is legal to call this with a node that is not currently a secondary: that is a no-op
267 9 : pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
268 13 : let index = self.secondary.iter().position(|n| *n == node_id);
269 9 : if let Some(index) = index {
270 9 : scheduler.update_node_ref_counts(
271 9 : node_id,
272 9 : self.preferred_az_id.as_ref(),
273 9 : RefCountUpdate::RemoveSecondary,
274 9 : );
275 9 : self.secondary.remove(index);
276 9 : }
277 9 : }
278 :
279 12859 : pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
280 12859 : for secondary in self.secondary.drain(..) {
281 12840 : scheduler.update_node_ref_counts(
282 12840 : secondary,
283 12840 : self.preferred_az_id.as_ref(),
284 12840 : RefCountUpdate::RemoveSecondary,
285 12840 : );
286 12840 : }
287 12859 : }
288 :
289 : /// Remove the last secondary node from the list of secondaries
290 0 : pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
291 0 : if let Some(node_id) = self.secondary.pop() {
292 0 : scheduler.update_node_ref_counts(
293 0 : node_id,
294 0 : self.preferred_az_id.as_ref(),
295 0 : RefCountUpdate::RemoveSecondary,
296 0 : );
297 0 : }
298 0 : }
299 :
300 12859 : pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
301 12859 : if let Some(old_attached) = self.attached.take() {
302 12857 : scheduler.update_node_ref_counts(
303 12857 : old_attached,
304 12857 : self.preferred_az_id.as_ref(),
305 12857 : RefCountUpdate::Detach,
306 12857 : );
307 12857 : }
308 :
309 12859 : self.clear_secondary(scheduler);
310 12859 : }
311 :
312 12901 : pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
313 12901 : let mut result = Vec::new();
314 12901 : if let Some(p) = self.attached {
315 12896 : result.push(p)
316 5 : }
317 :
318 12901 : result.extend(self.secondary.iter().copied());
319 :
320 12901 : result
321 12901 : }
322 :
323 12624 : pub(crate) fn get_attached(&self) -> &Option<NodeId> {
324 12624 : &self.attached
325 12624 : }
326 :
327 12706 : pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
328 12706 : &self.secondary
329 12706 : }
330 :
331 : /// If the node is in use as the attached location, demote it into
332 : /// the list of secondary locations. This is used when a node goes offline,
333 : /// and we want to use a different node for attachment, but not permanently
334 : /// forget the location on the offline node.
335 : ///
336 : /// Returns true if a change was made
337 8 : pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
338 8 : if self.attached == Some(node_id) {
339 8 : self.attached = None;
340 8 : self.secondary.push(node_id);
341 8 : scheduler.update_node_ref_counts(
342 8 : node_id,
343 8 : self.preferred_az_id.as_ref(),
344 8 : RefCountUpdate::DemoteAttached,
345 : );
346 8 : true
347 : } else {
348 0 : false
349 : }
350 8 : }
351 :
352 2 : pub(crate) fn set_preferred_az(
353 2 : &mut self,
354 2 : scheduler: &mut Scheduler,
355 2 : preferred_az: Option<AvailabilityZone>,
356 2 : ) {
357 2 : let new_az = preferred_az.as_ref();
358 2 : let old_az = self.preferred_az_id.as_ref();
359 :
360 2 : if old_az != new_az {
361 2 : if let Some(node_id) = self.attached {
362 2 : scheduler.update_node_ref_counts(
363 2 : node_id,
364 2 : new_az,
365 2 : RefCountUpdate::ChangePreferredAzFrom(old_az),
366 2 : );
367 2 : }
368 4 : for node_id in &self.secondary {
369 2 : scheduler.update_node_ref_counts(
370 2 : *node_id,
371 2 : new_az,
372 2 : RefCountUpdate::ChangePreferredAzFrom(old_az),
373 2 : );
374 2 : }
375 2 : self.preferred_az_id = preferred_az;
376 0 : }
377 2 : }
378 :
379 12508 : pub(crate) fn get_preferred_az(&self) -> Option<&AvailabilityZone> {
380 12508 : self.preferred_az_id.as_ref()
381 12508 : }
382 : }
383 :
384 : impl Drop for IntentState {
385 12860 : fn drop(&mut self) {
386 : // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
387 : // We do not check this while panicking, to avoid polluting unit test failures or
388 : // other assertions with this assertion's output. It's still wrong to leak these,
389 : // but if we already have a panic then we don't need to independently flag this case.
390 12860 : if !(std::thread::panicking()) {
391 12860 : debug_assert!(self.attached.is_none() && self.secondary.is_empty());
392 0 : }
393 12859 : }
394 : }
395 :
396 0 : #[derive(Default, Clone, Serialize, Deserialize, Debug)]
397 : pub(crate) struct ObservedState {
398 : pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
399 : }
400 :
401 : /// Our latest knowledge of how this tenant is configured in the outside world.
402 : ///
403 : /// Meaning:
404 : /// * No instance of this type exists for a node: we are certain that we have nothing configured on that
405 : /// node for this shard.
406 : /// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
407 : /// what it is (e.g. we failed partway through configuring it)
408 : /// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
409 : /// and that configuration will still be present unless something external interfered.
410 0 : #[derive(Clone, Serialize, Deserialize, Debug)]
411 : pub(crate) struct ObservedStateLocation {
412 : /// If None, it means we do not know the status of this shard's location on this node, but
413 : /// we know that we might have some state on this node.
414 : pub(crate) conf: Option<LocationConfig>,
415 : }
416 :
417 : pub(crate) struct ReconcilerWaiter {
418 : // For observability purposes, remember the ID of the shard we're
419 : // waiting for.
420 : pub(crate) tenant_shard_id: TenantShardId,
421 :
422 : seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
423 : error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
424 : error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
425 : seq: Sequence,
426 : }
427 :
428 : pub(crate) enum ReconcilerStatus {
429 : Done,
430 : Failed,
431 : InProgress,
432 : }
433 :
434 : #[derive(thiserror::Error, Debug)]
435 : pub(crate) enum ReconcileWaitError {
436 : #[error("Timeout waiting for shard {0}")]
437 : Timeout(TenantShardId),
438 : #[error("shutting down")]
439 : Shutdown,
440 : #[error("Reconcile error on shard {0}: {1}")]
441 : Failed(TenantShardId, Arc<ReconcileError>),
442 : }
443 :
444 : #[derive(Eq, PartialEq, Debug, Clone)]
445 : pub(crate) struct ReplaceSecondary {
446 : old_node_id: NodeId,
447 : new_node_id: NodeId,
448 : }
449 :
450 : #[derive(Eq, PartialEq, Debug, Clone)]
451 : pub(crate) struct MigrateAttachment {
452 : pub(crate) old_attached_node_id: NodeId,
453 : pub(crate) new_attached_node_id: NodeId,
454 : }
455 :
456 : #[derive(Eq, PartialEq, Debug, Clone)]
457 : pub(crate) enum ScheduleOptimizationAction {
458 : // Replace one of our secondary locations with a different node
459 : ReplaceSecondary(ReplaceSecondary),
460 : // Migrate attachment to an existing secondary location
461 : MigrateAttachment(MigrateAttachment),
462 : // Create a secondary location, with the intent of later migrating to it
463 : CreateSecondary(NodeId),
464 : // Remove a secondary location that we previously created to facilitate a migration
465 : RemoveSecondary(NodeId),
466 : }
467 :
468 : #[derive(Eq, PartialEq, Debug, Clone)]
469 : pub(crate) struct ScheduleOptimization {
470 : // What was the reconcile sequence when we generated this optimization? The optimization
471 : // should only be applied if the shard's sequence is still at this value, in case other changes
472 : // happened between planning the optimization and applying it.
473 : sequence: Sequence,
474 :
475 : pub(crate) action: ScheduleOptimizationAction,
476 : }
477 :
478 : impl ReconcilerWaiter {
479 0 : pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
480 0 : tokio::select! {
481 0 : result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
482 0 : result.map_err(|e| match e {
483 0 : SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
484 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
485 0 : })?;
486 : },
487 0 : result = self.error_seq_wait.wait_for(self.seq) => {
488 0 : result.map_err(|e| match e {
489 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
490 0 : SeqWaitError::Timeout => unreachable!()
491 0 : })?;
492 :
493 0 : return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
494 0 : self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
495 : }
496 : }
497 :
498 0 : Ok(())
499 0 : }
500 :
501 0 : pub(crate) fn get_status(&self) -> ReconcilerStatus {
502 0 : if self.seq_wait.would_wait_for(self.seq).is_ok() {
503 0 : ReconcilerStatus::Done
504 0 : } else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
505 0 : ReconcilerStatus::Failed
506 : } else {
507 0 : ReconcilerStatus::InProgress
508 : }
509 0 : }
510 : }
511 :
512 : /// Having spawned a reconciler task, the tenant shard's state will carry enough
513 : /// information to optionally cancel & await it later.
514 : pub(crate) struct ReconcilerHandle {
515 : sequence: Sequence,
516 : handle: JoinHandle<()>,
517 : cancel: CancellationToken,
518 : }
519 :
520 : pub(crate) enum ReconcileNeeded {
521 : /// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
522 : /// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
523 : No,
524 : /// shard has a reconciler running, and its intent hasn't changed since that one was
525 : /// spawned: wait for the existing reconciler rather than spawning a new one.
526 : WaitExisting(ReconcilerWaiter),
527 : /// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
528 : Yes(ReconcileReason),
529 : }
530 :
531 : #[derive(Debug)]
532 : pub(crate) enum ReconcileReason {
533 : ActiveNodesDirty,
534 : UnknownLocation,
535 : PendingComputeNotification,
536 : }
537 :
538 : /// Pending modification to the observed state of a tenant shard.
539 : /// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
540 : pub(crate) enum ObservedStateDelta {
541 : Upsert(Box<(NodeId, ObservedStateLocation)>),
542 : Delete(NodeId),
543 : }
544 :
545 : impl ObservedStateDelta {
546 0 : pub(crate) fn node_id(&self) -> &NodeId {
547 0 : match self {
548 0 : Self::Upsert(up) => &up.0,
549 0 : Self::Delete(nid) => nid,
550 : }
551 0 : }
552 : }
553 :
554 : /// When a reconcile task completes, it sends this result object
555 : /// to be applied to the primary TenantShard.
556 : pub(crate) struct ReconcileResult {
557 : pub(crate) sequence: Sequence,
558 : /// On errors, `observed` should be treated as an incompleted description
559 : /// of state (i.e. any nodes present in the result should override nodes
560 : /// present in the parent tenant state, but any unmentioned nodes should
561 : /// not be removed from parent tenant state)
562 : pub(crate) result: Result<(), ReconcileError>,
563 :
564 : pub(crate) tenant_shard_id: TenantShardId,
565 : pub(crate) generation: Option<Generation>,
566 : pub(crate) observed_deltas: Vec<ObservedStateDelta>,
567 :
568 : /// Set [`TenantShard::pending_compute_notification`] from this flag
569 : pub(crate) pending_compute_notification: bool,
570 : }
571 :
572 : impl ObservedState {
573 0 : pub(crate) fn new() -> Self {
574 0 : Self {
575 0 : locations: HashMap::new(),
576 0 : }
577 0 : }
578 :
579 0 : pub(crate) fn is_empty(&self) -> bool {
580 0 : self.locations.is_empty()
581 0 : }
582 : }
583 :
584 : impl TenantShard {
585 12843 : pub(crate) fn new(
586 12843 : tenant_shard_id: TenantShardId,
587 12843 : shard: ShardIdentity,
588 12843 : policy: PlacementPolicy,
589 12843 : preferred_az_id: Option<AvailabilityZone>,
590 12843 : ) -> Self {
591 12843 : metrics::METRICS_REGISTRY
592 12843 : .metrics_group
593 12843 : .storage_controller_tenant_shards
594 12843 : .inc();
595 :
596 12843 : Self {
597 12843 : tenant_shard_id,
598 12843 : policy,
599 12843 : intent: IntentState::new(preferred_az_id),
600 12843 : generation: Some(Generation::new(0)),
601 12843 : shard,
602 12843 : observed: ObservedState::default(),
603 12843 : config: TenantConfig::default(),
604 12843 : reconciler: None,
605 12843 : splitting: SplitState::Idle,
606 12843 : importing: TimelineImportState::Idle,
607 12843 : sequence: Sequence(1),
608 12843 : delayed_reconcile: false,
609 12843 : waiter: Arc::new(SeqWait::new(Sequence(0))),
610 12843 : error_waiter: Arc::new(SeqWait::new(Sequence(0))),
611 12843 : last_error: Arc::default(),
612 12843 : consecutive_reconciles_count: 0,
613 12843 : pending_compute_notification: false,
614 12843 : scheduling_policy: ShardSchedulingPolicy::default(),
615 12843 : preferred_node: None,
616 12843 : }
617 12843 : }
618 :
619 : /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
620 : /// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
621 : /// to get an intent state that complies with placement policy. The overall goal is to do scheduling
622 : /// in a way that makes use of any configured locations that already exist in the outside world.
623 1 : pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
624 : // Choose an attached location by filtering observed locations, and then sorting to get the highest
625 : // generation
626 1 : let mut attached_locs = self
627 1 : .observed
628 1 : .locations
629 1 : .iter()
630 2 : .filter_map(|(node_id, l)| {
631 2 : if let Some(conf) = &l.conf {
632 2 : if conf.mode == LocationConfigMode::AttachedMulti
633 1 : || conf.mode == LocationConfigMode::AttachedSingle
634 1 : || conf.mode == LocationConfigMode::AttachedStale
635 : {
636 2 : Some((node_id, conf.generation))
637 : } else {
638 0 : None
639 : }
640 : } else {
641 0 : None
642 : }
643 2 : })
644 1 : .collect::<Vec<_>>();
645 :
646 1 : attached_locs.sort_by_key(|i| i.1);
647 1 : if let Some((node_id, _gen)) = attached_locs.into_iter().next_back() {
648 1 : self.intent.set_attached(scheduler, Some(*node_id));
649 1 : }
650 :
651 : // All remaining observed locations generate secondary intents. This includes None
652 : // observations, as these may well have some local content on disk that is usable (this
653 : // is an edge case that might occur if we restarted during a migration or other change)
654 : //
655 : // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
656 : // will take care of promoting one of these secondaries to be attached.
657 2 : self.observed.locations.keys().for_each(|node_id| {
658 2 : if Some(*node_id) != self.intent.attached {
659 1 : self.intent.push_secondary(scheduler, *node_id);
660 1 : }
661 2 : });
662 1 : }
663 :
664 : /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
665 : /// attached pageserver for a shard.
666 : ///
667 : /// Returns whether we modified it, and the NodeId selected.
668 12832 : fn schedule_attached(
669 12832 : &mut self,
670 12832 : scheduler: &mut Scheduler,
671 12832 : context: &ScheduleContext,
672 12832 : ) -> Result<(bool, NodeId), ScheduleError> {
673 : // No work to do if we already have an attached tenant
674 12832 : if let Some(node_id) = self.intent.attached {
675 0 : return Ok((false, node_id));
676 12832 : }
677 :
678 12832 : if let Some(promote_secondary) = self.preferred_secondary(scheduler) {
679 : // Promote a secondary
680 2 : tracing::debug!("Promoted secondary {} to attached", promote_secondary);
681 2 : self.intent.promote_attached(scheduler, promote_secondary);
682 2 : Ok((true, promote_secondary))
683 : } else {
684 : // Pick a fresh node: either we had no secondaries or none were schedulable
685 12830 : let node_id = scheduler.schedule_shard::<AttachedShardTag>(
686 12830 : &self.intent.secondary,
687 12830 : &self.intent.preferred_az_id,
688 12830 : context,
689 0 : )?;
690 12830 : tracing::debug!("Selected {} as attached", node_id);
691 12830 : self.intent.set_attached(scheduler, Some(node_id));
692 12830 : Ok((true, node_id))
693 : }
694 12832 : }
695 :
696 : #[instrument(skip_all, fields(
697 : tenant_id=%self.tenant_shard_id.tenant_id,
698 : shard_id=%self.tenant_shard_id.shard_slug(),
699 : sequence=%self.sequence
700 : ))]
701 : pub(crate) fn schedule(
702 : &mut self,
703 : scheduler: &mut Scheduler,
704 : context: &mut ScheduleContext,
705 : ) -> Result<(), ScheduleError> {
706 : let r = self.do_schedule(scheduler, context);
707 :
708 : context.avoid(&self.intent.all_pageservers());
709 :
710 : r
711 : }
712 :
713 12836 : pub(crate) fn do_schedule(
714 12836 : &mut self,
715 12836 : scheduler: &mut Scheduler,
716 12836 : context: &ScheduleContext,
717 12836 : ) -> Result<(), ScheduleError> {
718 : // TODO: before scheduling new nodes, check if any existing content in
719 : // self.intent refers to pageservers that are offline, and pick other
720 : // pageservers if so.
721 :
722 : // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
723 : // change their attach location.
724 :
725 12836 : match self.scheduling_policy {
726 12835 : ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
727 : ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
728 : // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
729 1 : tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
730 0 : "Scheduling is disabled by policy {:?}", self.scheduling_policy);
731 1 : return Ok(());
732 : }
733 : }
734 :
735 : // Build the set of pageservers already in use by this tenant, to avoid scheduling
736 : // more work on the same pageservers we're already using.
737 12835 : let mut modified = false;
738 :
739 : // Add/remove nodes to fulfil policy
740 : use PlacementPolicy::*;
741 12835 : match self.policy {
742 12832 : Attached(secondary_count) => {
743 : // Should have exactly one attached, and at least N secondaries
744 12832 : let (modified_attached, attached_node_id) =
745 12832 : self.schedule_attached(scheduler, context)?;
746 12832 : modified |= modified_attached;
747 :
748 12832 : let mut used_pageservers = vec![attached_node_id];
749 25663 : while self.intent.secondary.len() < secondary_count {
750 12831 : let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
751 12831 : &used_pageservers,
752 12831 : &self.intent.preferred_az_id,
753 12831 : context,
754 0 : )?;
755 12831 : self.intent.push_secondary(scheduler, node_id);
756 12831 : used_pageservers.push(node_id);
757 12831 : modified = true;
758 : }
759 : }
760 : Secondary => {
761 3 : if let Some(node_id) = self.intent.get_attached() {
762 2 : // Populate secondary by demoting the attached node
763 2 : self.intent.demote_attached(scheduler, *node_id);
764 2 :
765 2 : modified = true;
766 2 : } else if self.intent.secondary.is_empty() {
767 : // Populate secondary by scheduling a fresh node
768 : //
769 : // We use [`AttachedShardTag`] because when a secondary location is the only one
770 : // a shard has, we expect that its next use will be as an attached location: we want
771 : // the tenant to be ready to warm up and run fast in their preferred AZ.
772 1 : let node_id = scheduler.schedule_shard::<AttachedShardTag>(
773 1 : &[],
774 1 : &self.intent.preferred_az_id,
775 1 : context,
776 0 : )?;
777 1 : self.intent.push_secondary(scheduler, node_id);
778 1 : modified = true;
779 0 : }
780 4 : while self.intent.secondary.len() > 1 {
781 : // If we have multiple secondaries (e.g. when transitioning from Attached to Secondary and
782 : // having just demoted our attached location), then we should prefer to keep the location
783 : // in our preferred AZ. Tenants in Secondary mode want to be in the preferred AZ so that
784 : // they have a warm location to become attached when transitioning back into Attached.
785 :
786 1 : let mut candidates = self.intent.get_secondary().clone();
787 : // Sort to get secondaries outside preferred AZ last
788 1 : candidates
789 2 : .sort_by_key(|n| scheduler.get_node_az(n).as_ref() != self.preferred_az());
790 1 : let secondary_to_remove = candidates.pop().unwrap();
791 1 : self.intent.remove_secondary(scheduler, secondary_to_remove);
792 1 : modified = true;
793 : }
794 : }
795 : Detached => {
796 : // Never add locations in this mode
797 0 : if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
798 0 : self.intent.clear(scheduler);
799 0 : modified = true;
800 0 : }
801 : }
802 : }
803 :
804 12835 : if modified {
805 12835 : self.sequence.0 += 1;
806 12835 : }
807 :
808 12835 : Ok(())
809 12836 : }
810 :
811 : /// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
812 : /// if the swap is not possible and leaves the intent state in its original state.
813 : ///
814 : /// Arguments:
815 : /// `attached_to`: the currently attached location matching the intent state (may be None if the
816 : /// shard is not attached)
817 : /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
818 : /// the scheduler to recommend a node
819 0 : pub(crate) fn reschedule_to_secondary(
820 0 : &mut self,
821 0 : promote_to: Option<NodeId>,
822 0 : scheduler: &mut Scheduler,
823 0 : ) -> Result<(), ScheduleError> {
824 0 : let promote_to = match promote_to {
825 0 : Some(node) => node,
826 0 : None => match self.preferred_secondary(scheduler) {
827 0 : Some(node) => node,
828 : None => {
829 0 : return Err(ScheduleError::ImpossibleConstraint);
830 : }
831 : },
832 : };
833 :
834 0 : assert!(self.intent.get_secondary().contains(&promote_to));
835 :
836 0 : if let Some(node) = self.intent.get_attached() {
837 0 : let demoted = self.intent.demote_attached(scheduler, *node);
838 0 : if !demoted {
839 0 : return Err(ScheduleError::ImpossibleConstraint);
840 0 : }
841 0 : }
842 :
843 0 : self.intent.promote_attached(scheduler, promote_to);
844 :
845 : // Increment the sequence number for the edge case where a
846 : // reconciler is already running to avoid waiting on the
847 : // current reconcile instead of spawning a new one.
848 0 : self.sequence = self.sequence.next();
849 :
850 0 : Ok(())
851 0 : }
852 :
853 : /// Returns None if the current location's score is unavailable, i.e. cannot draw a conclusion
854 68 : fn is_better_location<T: ShardTag>(
855 68 : &self,
856 68 : scheduler: &mut Scheduler,
857 68 : schedule_context: &ScheduleContext,
858 68 : current: NodeId,
859 68 : candidate: NodeId,
860 68 : ) -> Option<bool> {
861 68 : let Some(candidate_score) = scheduler.compute_node_score::<T::Score>(
862 68 : candidate,
863 68 : &self.intent.preferred_az_id,
864 68 : schedule_context,
865 68 : ) else {
866 : // The candidate node is unavailable for scheduling or otherwise couldn't get a score
867 1 : return None;
868 : };
869 :
870 : // If the candidate is our preferred node, then it is better than the current location, as long
871 : // as it is online -- the online check is part of the score calculation we did above, so it's
872 : // important that this check comes after that one.
873 67 : if let Some(preferred) = self.preferred_node.as_ref() {
874 3 : if preferred == &candidate {
875 1 : return Some(true);
876 2 : }
877 64 : }
878 :
879 66 : match scheduler.compute_node_score::<T::Score>(
880 66 : current,
881 66 : &self.intent.preferred_az_id,
882 66 : schedule_context,
883 66 : ) {
884 66 : Some(current_score) => {
885 : // Ignore utilization components when comparing scores: we don't want to migrate
886 : // because of transient load variations, it risks making the system thrash, and
887 : // migrating for utilization requires a separate high level view of the system to
888 : // e.g. prioritize moving larger or smaller tenants, rather than arbitrarily
889 : // moving things around in the order that we hit this function.
890 66 : let candidate_score = candidate_score.for_optimization();
891 66 : let current_score = current_score.for_optimization();
892 :
893 66 : if candidate_score < current_score {
894 8 : tracing::info!(
895 0 : "Found a lower scoring location! {candidate} is better than {current} ({candidate_score:?} is better than {current_score:?})"
896 : );
897 8 : Some(true)
898 : } else {
899 : // The candidate node is no better than our current location, so don't migrate
900 58 : tracing::debug!(
901 0 : "Candidate node {candidate} is no better than our current location {current} (candidate {candidate_score:?} vs current {current_score:?})",
902 : );
903 58 : Some(false)
904 : }
905 : }
906 : None => {
907 : // The current node is unavailable for scheduling, so we can't make any sensible
908 : // decisions about optimisation. This should be a transient state -- if the node
909 : // is offline then it will get evacuated, if is blocked by a scheduling mode
910 : // then we will respect that mode by doing nothing.
911 0 : tracing::debug!("Current node {current} is unavailable for scheduling");
912 0 : None
913 : }
914 : }
915 68 : }
916 :
917 48 : pub(crate) fn find_better_location<T: ShardTag>(
918 48 : &self,
919 48 : scheduler: &mut Scheduler,
920 48 : schedule_context: &ScheduleContext,
921 48 : current: NodeId,
922 48 : hard_exclude: &[NodeId],
923 48 : ) -> Option<NodeId> {
924 : // If we have a migration hint, then that is our better location
925 48 : if let Some(hint) = self.preferred_node.as_ref() {
926 1 : if hint == ¤t {
927 0 : return None;
928 1 : }
929 :
930 1 : return Some(*hint);
931 47 : }
932 :
933 : // Look for a lower-scoring location to attach to
934 47 : let Ok(candidate_node) = scheduler.schedule_shard::<T>(
935 47 : hard_exclude,
936 47 : &self.intent.preferred_az_id,
937 47 : schedule_context,
938 47 : ) else {
939 : // A scheduling error means we have no possible candidate replacements
940 0 : tracing::debug!("No candidate node found");
941 0 : return None;
942 : };
943 :
944 47 : if candidate_node == current {
945 : // We're already at the best possible location, so don't migrate
946 24 : tracing::debug!("Candidate node {candidate_node} is already in use");
947 24 : return None;
948 23 : }
949 :
950 23 : self.is_better_location::<T>(scheduler, schedule_context, current, candidate_node)
951 23 : .and_then(|better| if better { Some(candidate_node) } else { None })
952 48 : }
953 :
954 : /// This function is an optimization, used to avoid doing large numbers of scheduling operations
955 : /// when looking for optimizations. This function uses knowledge of how scores work to do some
956 : /// fast checks for whether it may to be possible to improve a score.
957 : ///
958 : /// If we return true, it only means that optimization _might_ be possible, not that it necessarily is. If we
959 : /// return no, it definitely means that calling [`Self::optimize_attachment`] or [`Self::optimize_secondary`] would do no
960 : /// work.
961 3 : pub(crate) fn maybe_optimizable(
962 3 : &self,
963 3 : scheduler: &mut Scheduler,
964 3 : schedule_context: &ScheduleContext,
965 3 : ) -> bool {
966 : // Tenant with preferred node: check if it is not already at the preferred node
967 3 : if let Some(preferred) = self.preferred_node.as_ref() {
968 0 : if Some(preferred) != self.intent.get_attached().as_ref() {
969 0 : return true;
970 0 : }
971 3 : }
972 :
973 : // Sharded tenant: check if any locations have a nonzero affinity score
974 3 : if self.shard.count >= ShardCount(1) {
975 3 : let schedule_context = schedule_context.project_detach(self);
976 5 : for node in self.intent.all_pageservers() {
977 5 : if let Some(af) = schedule_context.nodes.get(&node) {
978 5 : if *af > AffinityScore(0) {
979 3 : return true;
980 2 : }
981 0 : }
982 : }
983 0 : }
984 :
985 : // Attached tenant: check if the attachment is outside the preferred AZ
986 0 : if let PlacementPolicy::Attached(_) = self.policy {
987 0 : if let Some(attached) = self.intent.get_attached() {
988 0 : if scheduler.get_node_az(attached) != self.intent.preferred_az_id {
989 0 : return true;
990 0 : }
991 0 : }
992 0 : }
993 :
994 : // Tenant with secondary locations: check if any are within the preferred AZ
995 0 : for secondary in self.intent.get_secondary() {
996 0 : if scheduler.get_node_az(secondary) == self.intent.preferred_az_id {
997 0 : return true;
998 0 : }
999 : }
1000 :
1001 : // Does the tenant have excess secondaries?
1002 0 : if self.intent.get_secondary().len() > self.policy.want_secondaries() {
1003 0 : return true;
1004 0 : }
1005 :
1006 : // Fall through: no optimizations possible
1007 0 : false
1008 3 : }
1009 :
1010 : /// Optimize attachments: if a shard has a secondary location that is preferable to
1011 : /// its primary location based on soft constraints, switch that secondary location
1012 : /// to be attached.
1013 : ///
1014 : /// `schedule_context` should have been populated with all shards in the tenant, including
1015 : /// the one we're trying to optimize (this function will subtract its own contribution before making scoring decisions)
1016 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1017 : pub(crate) fn optimize_attachment(
1018 : &self,
1019 : scheduler: &mut Scheduler,
1020 : schedule_context: &ScheduleContext,
1021 : ) -> Option<ScheduleOptimization> {
1022 : let attached = (*self.intent.get_attached())?;
1023 :
1024 : let schedule_context = schedule_context.project_detach(self);
1025 :
1026 : // If we already have a secondary that is higher-scoring than out current location,
1027 : // then simply migrate to it.
1028 : for secondary in self.intent.get_secondary() {
1029 : if let Some(true) = self.is_better_location::<AttachedShardTag>(
1030 : scheduler,
1031 : &schedule_context,
1032 : attached,
1033 : *secondary,
1034 : ) {
1035 : return Some(ScheduleOptimization {
1036 : sequence: self.sequence,
1037 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1038 : old_attached_node_id: attached,
1039 : new_attached_node_id: *secondary,
1040 : }),
1041 : });
1042 : }
1043 : }
1044 :
1045 : // Given that none of our current secondaries is a better location than our current
1046 : // attached location (checked above), we may trim any secondaries that are not needed
1047 : // for the placement policy.
1048 : if self.intent.get_secondary().len() > self.policy.want_secondaries() {
1049 : // This code path cleans up extra secondaries after migrating, and/or
1050 : // trims extra secondaries after a PlacementPolicy::Attached(N) was
1051 : // modified to decrease N.
1052 :
1053 : let secondary_scores = self
1054 : .intent
1055 : .get_secondary()
1056 : .iter()
1057 12 : .map(|node_id| {
1058 12 : (
1059 12 : *node_id,
1060 12 : scheduler.compute_node_score::<NodeSecondarySchedulingScore>(
1061 12 : *node_id,
1062 12 : &self.intent.preferred_az_id,
1063 12 : &schedule_context,
1064 12 : ),
1065 12 : )
1066 12 : })
1067 : .collect::<HashMap<_, _>>();
1068 :
1069 12 : if secondary_scores.iter().any(|score| score.1.is_none()) {
1070 : // Trivial case: if we only have one secondary, drop that one
1071 : if self.intent.get_secondary().len() == 1 {
1072 : return Some(ScheduleOptimization {
1073 : sequence: self.sequence,
1074 : action: ScheduleOptimizationAction::RemoveSecondary(
1075 : *self.intent.get_secondary().first().unwrap(),
1076 : ),
1077 : });
1078 : }
1079 :
1080 : // Try to find a "good" secondary to keep, without relying on scores (one or more nodes is in a state
1081 : // where its score can't be calculated), and drop the others. This enables us to make progress in
1082 : // most cases, even if some nodes are offline or have scheduling=pause set.
1083 :
1084 : debug_assert!(self.intent.attached.is_some()); // We should not make it here unless attached -- this
1085 : // logic presumes we are in a mode where we want secondaries to be in non-home AZ
1086 1 : if let Some(retain_secondary) = self.intent.get_secondary().iter().find(|n| {
1087 1 : let in_home_az = scheduler.get_node_az(n) == self.intent.preferred_az_id;
1088 1 : let is_available = secondary_scores
1089 1 : .get(n)
1090 1 : .expect("Built from same list of nodes")
1091 1 : .is_some();
1092 1 : is_available && !in_home_az
1093 1 : }) {
1094 : // Great, we found one to retain. Pick some other to drop.
1095 : if let Some(victim) = self
1096 : .intent
1097 : .get_secondary()
1098 : .iter()
1099 2 : .find(|n| n != &retain_secondary)
1100 : {
1101 : return Some(ScheduleOptimization {
1102 : sequence: self.sequence,
1103 : action: ScheduleOptimizationAction::RemoveSecondary(*victim),
1104 : });
1105 : }
1106 : }
1107 :
1108 : // Fall through: we didn't identify one to remove. This ought to be rare.
1109 : tracing::warn!(
1110 : "Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)",
1111 : self.intent.get_secondary()
1112 : );
1113 : } else {
1114 : let victim = secondary_scores
1115 : .iter()
1116 10 : .max_by_key(|score| score.1.unwrap())
1117 : .unwrap()
1118 : .0;
1119 : return Some(ScheduleOptimization {
1120 : sequence: self.sequence,
1121 : action: ScheduleOptimizationAction::RemoveSecondary(*victim),
1122 : });
1123 : }
1124 : }
1125 :
1126 : let replacement = self.find_better_location::<AttachedShardTag>(
1127 : scheduler,
1128 : &schedule_context,
1129 : attached,
1130 : &[], // Don't exclude secondaries: our preferred attachment location may be a secondary
1131 : );
1132 :
1133 : // We have found a candidate and confirmed that its score is preferable
1134 : // to our current location. See if we have a secondary location in the preferred location already: if not,
1135 : // then create one.
1136 : if let Some(replacement) = replacement {
1137 : // If we are currently in non-preferred AZ, then the scheduler might suggest a location that is better, but still
1138 : // not in our preferred AZ. Migration has a cost in resources an impact to the workload, so we want to avoid doing
1139 : // multiple hops where we might go to some other AZ before eventually finding a suitable location in our preferred
1140 : // AZ: skip this optimization if it is not in our final, preferred AZ.
1141 : //
1142 : // This should be a transient state, there should always be capacity eventually in our preferred AZ (even if nodes
1143 : // there are too overloaded for scheduler to suggest them, more should be provisioned eventually).
1144 : if self.preferred_node.is_none()
1145 : && self.intent.preferred_az_id.is_some()
1146 : && scheduler.get_node_az(&replacement) != self.intent.preferred_az_id
1147 : {
1148 : tracing::debug!(
1149 : "Candidate node {replacement} is not in preferred AZ {:?}",
1150 : self.intent.preferred_az_id
1151 : );
1152 :
1153 : // This should only happen if our current location is not in the preferred AZ, otherwise
1154 : // [`Self::find_better_location`]` should have rejected any other location outside the preferred Az, because
1155 : // AZ is the highest priority part of NodeAttachmentSchedulingScore.
1156 : debug_assert!(scheduler.get_node_az(&attached) != self.intent.preferred_az_id);
1157 :
1158 : return None;
1159 : }
1160 :
1161 : if !self.intent.get_secondary().contains(&replacement) {
1162 : Some(ScheduleOptimization {
1163 : sequence: self.sequence,
1164 : action: ScheduleOptimizationAction::CreateSecondary(replacement),
1165 : })
1166 : } else {
1167 : // We already have a secondary in the preferred location, let's try migrating to it. Our caller
1168 : // will check the warmth of the destination before deciding whether to really execute this.
1169 : Some(ScheduleOptimization {
1170 : sequence: self.sequence,
1171 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1172 : old_attached_node_id: attached,
1173 : new_attached_node_id: replacement,
1174 : }),
1175 : })
1176 : }
1177 : } else {
1178 : // We didn't find somewhere we'd rather be, and we don't have any excess secondaries
1179 : // to clean up: no action required.
1180 : None
1181 : }
1182 : }
1183 :
1184 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1185 : pub(crate) fn optimize_secondary(
1186 : &self,
1187 : scheduler: &mut Scheduler,
1188 : schedule_context: &ScheduleContext,
1189 : ) -> Option<ScheduleOptimization> {
1190 : if self.intent.get_secondary().len() > self.policy.want_secondaries() {
1191 : // We have extra secondaries, perhaps to facilitate a migration of the attached location:
1192 : // do nothing, it is up to [`Self::optimize_attachment`] to clean them up. When that's done,
1193 : // and we are called again, we will proceed.
1194 : tracing::debug!("Too many secondaries: skipping");
1195 : return None;
1196 : }
1197 :
1198 : let schedule_context = schedule_context.project_detach(self);
1199 :
1200 : for secondary in self.intent.get_secondary() {
1201 : // Make sure we don't try to migrate a secondary to our attached location: this case happens
1202 : // easily in environments without multiple AZs.
1203 : let mut exclude = match self.intent.attached {
1204 : Some(attached) => vec![attached],
1205 : None => vec![],
1206 : };
1207 :
1208 : // Exclude all other secondaries from the scheduling process to avoid replacing
1209 : // one existing secondary with another existing secondary.
1210 : for another_secondary in self.intent.secondary.iter() {
1211 : if another_secondary != secondary {
1212 : exclude.push(*another_secondary);
1213 : }
1214 : }
1215 :
1216 : let replacement = match &self.policy {
1217 : PlacementPolicy::Attached(_) => {
1218 : // Secondaries for an attached shard should be scheduled using `SecondaryShardTag`
1219 : // to avoid placing them in the preferred AZ.
1220 : self.find_better_location::<SecondaryShardTag>(
1221 : scheduler,
1222 : &schedule_context,
1223 : *secondary,
1224 : &exclude,
1225 : )
1226 : }
1227 : PlacementPolicy::Secondary => {
1228 : // In secondary-only mode, we want our secondary locations in the preferred AZ,
1229 : // so that they're ready to take over as an attached location when we transition
1230 : // into PlacementPolicy::Attached.
1231 : self.find_better_location::<AttachedShardTag>(
1232 : scheduler,
1233 : &schedule_context,
1234 : *secondary,
1235 : &exclude,
1236 : )
1237 : }
1238 : PlacementPolicy::Detached => None,
1239 : };
1240 :
1241 : assert!(replacement != Some(*secondary));
1242 : if let Some(replacement) = replacement {
1243 : // We have found a candidate and confirmed that its score is preferable
1244 : // to our current location. See if we have a secondary location in the preferred location already: if not,
1245 : // then create one.
1246 : return Some(ScheduleOptimization {
1247 : sequence: self.sequence,
1248 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
1249 : old_node_id: *secondary,
1250 : new_node_id: replacement,
1251 : }),
1252 : });
1253 : }
1254 : }
1255 :
1256 : None
1257 : }
1258 :
1259 : /// Start or abort a graceful migration of this shard to another pageserver. This works on top of the
1260 : /// other optimisation functions, to bias them to move to the destination node.
1261 0 : pub(crate) fn set_preferred_node(&mut self, node: Option<NodeId>) {
1262 0 : if let Some(hint) = self.preferred_node.as_ref() {
1263 0 : if Some(hint) != node.as_ref() {
1264 : // This is legal but a bit surprising: we expect that administrators wouldn't usually
1265 : // change their mind about where to migrate something.
1266 0 : tracing::warn!(
1267 0 : "Changing migration destination from {hint} to {node:?} (current intent {:?})",
1268 : self.intent
1269 : );
1270 0 : }
1271 0 : }
1272 :
1273 0 : self.preferred_node = node;
1274 0 : }
1275 :
1276 0 : pub(crate) fn get_preferred_node(&self) -> Option<NodeId> {
1277 0 : self.preferred_node
1278 0 : }
1279 :
1280 : /// Return true if the optimization was really applied: it will not be applied if the optimization's
1281 : /// sequence is behind this tenant shard's or if the intent state proposed by the optimization
1282 : /// is not compatible with the current intent state. The later may happen when the background
1283 : /// reconcile loops runs concurrently with HTTP driven optimisations.
1284 17 : pub(crate) fn apply_optimization(
1285 17 : &mut self,
1286 17 : scheduler: &mut Scheduler,
1287 17 : optimization: ScheduleOptimization,
1288 17 : ) -> bool {
1289 17 : if optimization.sequence != self.sequence {
1290 0 : return false;
1291 17 : }
1292 :
1293 17 : if !self.validate_optimization(&optimization) {
1294 0 : tracing::info!(
1295 0 : "Skipping optimization for {} because it does not match current intent: {:?}",
1296 : self.tenant_shard_id,
1297 : optimization,
1298 : );
1299 0 : return false;
1300 17 : }
1301 :
1302 17 : metrics::METRICS_REGISTRY
1303 17 : .metrics_group
1304 17 : .storage_controller_schedule_optimization
1305 17 : .inc();
1306 :
1307 17 : match optimization.action {
1308 : ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1309 5 : old_attached_node_id,
1310 5 : new_attached_node_id,
1311 : }) => {
1312 5 : self.intent.demote_attached(scheduler, old_attached_node_id);
1313 5 : self.intent
1314 5 : .promote_attached(scheduler, new_attached_node_id);
1315 :
1316 5 : if let Some(hint) = self.preferred_node.as_ref() {
1317 1 : if hint == &new_attached_node_id {
1318 : // The migration target is not a long term pin: once we are done with the migration, clear it.
1319 1 : tracing::info!("Graceful migration to {hint} complete");
1320 1 : self.preferred_node = None;
1321 0 : }
1322 4 : }
1323 : }
1324 : ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
1325 1 : old_node_id,
1326 1 : new_node_id,
1327 1 : }) => {
1328 1 : self.intent.remove_secondary(scheduler, old_node_id);
1329 1 : self.intent.push_secondary(scheduler, new_node_id);
1330 1 : }
1331 4 : ScheduleOptimizationAction::CreateSecondary(new_node_id) => {
1332 4 : self.intent.push_secondary(scheduler, new_node_id);
1333 4 : }
1334 7 : ScheduleOptimizationAction::RemoveSecondary(old_secondary) => {
1335 7 : self.intent.remove_secondary(scheduler, old_secondary);
1336 7 : }
1337 : }
1338 :
1339 17 : true
1340 17 : }
1341 :
1342 : /// Check that the desired modifications to the intent state are compatible with the current
1343 : /// intent state. Note that the lock is not held between planning optimizations and applying
1344 : /// them so any valid state transition of the intent state may have occurred.
1345 17 : fn validate_optimization(&self, optimization: &ScheduleOptimization) -> bool {
1346 17 : match optimization.action {
1347 : ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1348 5 : old_attached_node_id,
1349 5 : new_attached_node_id,
1350 : }) => {
1351 5 : self.intent.attached == Some(old_attached_node_id)
1352 5 : && self.intent.secondary.contains(&new_attached_node_id)
1353 : }
1354 : ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
1355 : old_node_id: _,
1356 1 : new_node_id,
1357 : }) => {
1358 : // It's legal to remove a secondary that is not present in the intent state
1359 1 : !self.intent.secondary.contains(&new_node_id)
1360 : // Ensure the secondary hasn't already been promoted to attached by a concurrent
1361 : // optimization/migration.
1362 1 : && self.intent.attached != Some(new_node_id)
1363 : }
1364 4 : ScheduleOptimizationAction::CreateSecondary(new_node_id) => {
1365 4 : !self.intent.secondary.contains(&new_node_id)
1366 : }
1367 : ScheduleOptimizationAction::RemoveSecondary(_) => {
1368 : // It's legal to remove a secondary that is not present in the intent state
1369 7 : true
1370 : }
1371 : }
1372 17 : }
1373 :
1374 : /// When a shard has several secondary locations, we need to pick one in situations where
1375 : /// we promote one of them to an attached location:
1376 : /// - When draining a node for restart
1377 : /// - When responding to a node failure
1378 : ///
1379 : /// In this context, 'preferred' does not mean the node with the best scheduling score: instead
1380 : /// we want to pick the node which is best for use _temporarily_ while the previous attached location
1381 : /// is unavailable (e.g. because it's down or deploying). That means we prefer to use secondary
1382 : /// locations in a non-preferred AZ, as they're more likely to have awarm cache than a temporary
1383 : /// secondary in the preferred AZ (which are usually only created for migrations, and if they exist
1384 : /// they're probably not warmed up yet). The latter behavior is based oni
1385 : ///
1386 : /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
1387 : /// caller needs to a pick a node some other way.
1388 12832 : pub(crate) fn preferred_secondary(&self, scheduler: &Scheduler) -> Option<NodeId> {
1389 12832 : let candidates = scheduler.filter_usable_nodes(&self.intent.secondary);
1390 :
1391 : // We will sort candidates to prefer nodes which are _not_ in our preferred AZ, i.e. we prefer
1392 : // to migrate to a long-lived secondary location (which would have been scheduled in a non-preferred AZ),
1393 : // rather than a short-lived secondary location being used for optimization/migration (which would have
1394 : // been scheduled in our preferred AZ).
1395 12832 : let mut candidates = candidates
1396 12832 : .iter()
1397 12832 : .map(|(node_id, node_az)| {
1398 2 : if node_az == &self.intent.preferred_az_id {
1399 1 : (1, *node_id)
1400 : } else {
1401 1 : (0, *node_id)
1402 : }
1403 2 : })
1404 12832 : .collect::<Vec<_>>();
1405 :
1406 12832 : candidates.sort();
1407 :
1408 12832 : candidates.first().map(|i| i.1)
1409 12832 : }
1410 :
1411 : /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
1412 : /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
1413 : /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
1414 : ///
1415 : /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
1416 : /// funciton should not be used to decide whether to reconcile.
1417 0 : pub(crate) fn stably_attached(&self) -> Option<NodeId> {
1418 : // We have an intent to attach for this node
1419 0 : let attach_intent = self.intent.attached?;
1420 : // We have an observed state for this node
1421 0 : let location = self.observed.locations.get(&attach_intent)?;
1422 : // Our observed state is not None, i.e. not in flux
1423 0 : let location_config = location.conf.as_ref()?;
1424 :
1425 : // Check if our intent and observed state agree that this node is in an attached state.
1426 0 : match location_config.mode {
1427 : LocationConfigMode::AttachedMulti
1428 : | LocationConfigMode::AttachedSingle
1429 0 : | LocationConfigMode::AttachedStale => Some(attach_intent),
1430 0 : _ => None,
1431 : }
1432 0 : }
1433 :
1434 0 : fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
1435 0 : let mut dirty_nodes = HashSet::new();
1436 :
1437 0 : if let Some(node_id) = self.intent.attached {
1438 : // Maybe panic: it is a severe bug if we try to attach while generation is null.
1439 0 : let generation = self
1440 0 : .generation
1441 0 : .expect("Attempted to enter attached state without a generation");
1442 :
1443 0 : let wanted_conf = attached_location_conf(
1444 0 : generation,
1445 0 : &self.shard,
1446 0 : &self.config,
1447 0 : &self.policy,
1448 0 : self.intent.get_secondary().len(),
1449 : );
1450 0 : match self.observed.locations.get(&node_id) {
1451 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
1452 0 : Some(_) | None => {
1453 0 : dirty_nodes.insert(node_id);
1454 0 : }
1455 : }
1456 0 : }
1457 :
1458 0 : for node_id in &self.intent.secondary {
1459 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
1460 0 : match self.observed.locations.get(node_id) {
1461 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
1462 0 : Some(_) | None => {
1463 0 : dirty_nodes.insert(*node_id);
1464 0 : }
1465 : }
1466 : }
1467 :
1468 0 : for node_id in self.observed.locations.keys() {
1469 0 : if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
1470 0 : // We have observed state that isn't part of our intent: need to clean it up.
1471 0 : dirty_nodes.insert(*node_id);
1472 0 : }
1473 : }
1474 :
1475 0 : dirty_nodes.retain(|node_id| {
1476 0 : nodes
1477 0 : .get(node_id)
1478 0 : .map(|n| n.is_available())
1479 0 : .unwrap_or(false)
1480 0 : });
1481 :
1482 0 : !dirty_nodes.is_empty()
1483 0 : }
1484 :
1485 : #[allow(clippy::too_many_arguments)]
1486 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1487 : pub(crate) fn get_reconcile_needed(
1488 : &mut self,
1489 : pageservers: &Arc<HashMap<NodeId, Node>>,
1490 : ) -> ReconcileNeeded {
1491 : // If there are any ambiguous observed states, and the nodes they refer to are available,
1492 : // we should reconcile to clean them up.
1493 : let mut dirty_observed = false;
1494 : for (node_id, observed_loc) in &self.observed.locations {
1495 : let node = pageservers
1496 : .get(node_id)
1497 : .expect("Nodes may not be removed while referenced");
1498 : if observed_loc.conf.is_none() && node.is_available() {
1499 : dirty_observed = true;
1500 : break;
1501 : }
1502 : }
1503 :
1504 : let active_nodes_dirty = self.dirty(pageservers);
1505 :
1506 : let reconcile_needed = match (
1507 : active_nodes_dirty,
1508 : dirty_observed,
1509 : self.pending_compute_notification,
1510 : ) {
1511 : (true, _, _) => ReconcileNeeded::Yes(ReconcileReason::ActiveNodesDirty),
1512 : (_, true, _) => ReconcileNeeded::Yes(ReconcileReason::UnknownLocation),
1513 : (_, _, true) => ReconcileNeeded::Yes(ReconcileReason::PendingComputeNotification),
1514 : _ => ReconcileNeeded::No,
1515 : };
1516 :
1517 : if matches!(reconcile_needed, ReconcileNeeded::No) {
1518 : tracing::debug!("Not dirty, no reconciliation needed.");
1519 : return ReconcileNeeded::No;
1520 : }
1521 :
1522 : // If we are currently splitting, then never start a reconciler task: the splitting logic
1523 : // requires that shards are not interfered with while it runs. Do this check here rather than
1524 : // up top, so that we only log this message if we would otherwise have done a reconciliation.
1525 : if !matches!(self.splitting, SplitState::Idle) {
1526 : tracing::info!("Refusing to reconcile, splitting in progress");
1527 : return ReconcileNeeded::No;
1528 : }
1529 :
1530 : // Reconcile already in flight for the current sequence?
1531 : if let Some(handle) = &self.reconciler {
1532 : if handle.sequence == self.sequence {
1533 : tracing::info!(
1534 : "Reconciliation already in progress for sequence {:?}",
1535 : self.sequence,
1536 : );
1537 : return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
1538 : tenant_shard_id: self.tenant_shard_id,
1539 : seq_wait: self.waiter.clone(),
1540 : error_seq_wait: self.error_waiter.clone(),
1541 : error: self.last_error.clone(),
1542 : seq: self.sequence,
1543 : });
1544 : }
1545 : }
1546 :
1547 : // Pre-checks done: finally check whether we may actually do the work
1548 : match self.scheduling_policy {
1549 : ShardSchedulingPolicy::Active
1550 : | ShardSchedulingPolicy::Essential
1551 : | ShardSchedulingPolicy::Pause => {}
1552 : ShardSchedulingPolicy::Stop => {
1553 : // We only reach this point if there is work to do and we're going to skip
1554 : // doing it: warn it obvious why this tenant isn't doing what it ought to.
1555 : tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
1556 : return ReconcileNeeded::No;
1557 : }
1558 : }
1559 :
1560 : reconcile_needed
1561 : }
1562 :
1563 : /// Ensure the sequence number is set to a value where waiting for this value will make us wait
1564 : /// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
1565 : ///
1566 : /// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
1567 : /// that the waiter will not complete until some future Reconciler is constructed and run.
1568 0 : fn ensure_sequence_ahead(&mut self) {
1569 : // Find the highest sequence for which a Reconciler has previously run or is currently
1570 : // running
1571 0 : let max_seen = std::cmp::max(
1572 0 : self.reconciler
1573 0 : .as_ref()
1574 0 : .map(|r| r.sequence)
1575 0 : .unwrap_or(Sequence(0)),
1576 0 : std::cmp::max(self.waiter.load(), self.error_waiter.load()),
1577 : );
1578 :
1579 0 : if self.sequence <= max_seen {
1580 0 : self.sequence = max_seen.next();
1581 0 : }
1582 0 : }
1583 :
1584 : /// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
1585 : ///
1586 : /// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
1587 : /// you would like to wait on the next reconciler that gets spawned in the background.
1588 0 : pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
1589 0 : self.ensure_sequence_ahead();
1590 :
1591 0 : ReconcilerWaiter {
1592 0 : tenant_shard_id: self.tenant_shard_id,
1593 0 : seq_wait: self.waiter.clone(),
1594 0 : error_seq_wait: self.error_waiter.clone(),
1595 0 : error: self.last_error.clone(),
1596 0 : seq: self.sequence,
1597 0 : }
1598 0 : }
1599 :
1600 0 : async fn reconcile(
1601 0 : sequence: Sequence,
1602 0 : mut reconciler: Reconciler,
1603 0 : must_notify: bool,
1604 0 : ) -> ReconcileResult {
1605 : // Attempt to make observed state match intent state
1606 0 : let result = reconciler.reconcile().await;
1607 :
1608 : // If we know we had a pending compute notification from some previous action, send a notification irrespective
1609 : // of whether the above reconcile() did any work. It has to be Ok() though, because otherwise we might be
1610 : // sending a notification of a location that isn't really attached.
1611 0 : if result.is_ok() && must_notify {
1612 : // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
1613 0 : reconciler.compute_notify().await.ok();
1614 0 : } else if must_notify {
1615 0 : // Carry this flag so that the reconciler's result will indicate that it still needs to retry
1616 0 : // the compute hook notification eventually.
1617 0 : reconciler.compute_notify_failure = true;
1618 0 : }
1619 :
1620 : // Update result counter
1621 0 : let outcome_label = match &result {
1622 : Ok(_) => {
1623 0 : if reconciler.compute_notify_failure {
1624 0 : ReconcileOutcome::SuccessNoNotify
1625 : } else {
1626 0 : ReconcileOutcome::Success
1627 : }
1628 : }
1629 0 : Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
1630 0 : Err(_) => ReconcileOutcome::Error,
1631 : };
1632 :
1633 0 : metrics::METRICS_REGISTRY
1634 0 : .metrics_group
1635 0 : .storage_controller_reconcile_complete
1636 0 : .inc(ReconcileCompleteLabelGroup {
1637 0 : status: outcome_label,
1638 0 : });
1639 :
1640 : // Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
1641 : // try and schedule more work in response to our result.
1642 0 : ReconcileResult {
1643 0 : sequence,
1644 0 : result,
1645 0 : tenant_shard_id: reconciler.tenant_shard_id,
1646 0 : generation: reconciler.generation,
1647 0 : observed_deltas: reconciler.observed_deltas(),
1648 0 : pending_compute_notification: reconciler.compute_notify_failure,
1649 0 : }
1650 0 : }
1651 :
1652 : #[allow(clippy::too_many_arguments)]
1653 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1654 : pub(crate) fn spawn_reconciler(
1655 : &mut self,
1656 : reason: ReconcileReason,
1657 : result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
1658 : pageservers: &Arc<HashMap<NodeId, Node>>,
1659 : compute_hook: &Arc<ComputeHook>,
1660 : reconciler_config: ReconcilerConfig,
1661 : service_config: &service::Config,
1662 : persistence: &Arc<Persistence>,
1663 : units: ReconcileUnits,
1664 : gate_guard: GateGuard,
1665 : cancel: &CancellationToken,
1666 : http_client: reqwest::Client,
1667 : ) -> Option<ReconcilerWaiter> {
1668 : // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
1669 : // doing our sequence's work.
1670 : let old_handle = self.reconciler.take();
1671 :
1672 : // Build list of nodes from which the reconciler should detach
1673 : let mut detach = Vec::new();
1674 : for node_id in self.observed.locations.keys() {
1675 : if self.intent.get_attached() != &Some(*node_id)
1676 : && !self.intent.secondary.contains(node_id)
1677 : {
1678 : detach.push(
1679 : pageservers
1680 : .get(node_id)
1681 : .expect("Intent references non-existent pageserver")
1682 : .clone(),
1683 : )
1684 : }
1685 : }
1686 :
1687 : // Advance the sequence before spawning a reconciler, so that sequence waiters
1688 : // can distinguish between before+after the reconcile completes.
1689 : self.ensure_sequence_ahead();
1690 :
1691 : let reconciler_cancel = cancel.child_token();
1692 : let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
1693 : let reconciler = Reconciler {
1694 : tenant_shard_id: self.tenant_shard_id,
1695 : shard: self.shard,
1696 : placement_policy: self.policy.clone(),
1697 : generation: self.generation,
1698 : intent: reconciler_intent,
1699 : detach,
1700 : reconciler_config,
1701 : config: self.config.clone(),
1702 : preferred_az: self.intent.preferred_az_id.clone(),
1703 : observed: self.observed.clone(),
1704 : original_observed: self.observed.clone(),
1705 : compute_hook: compute_hook.clone(),
1706 : service_config: service_config.clone(),
1707 : _gate_guard: gate_guard,
1708 : _resource_units: units,
1709 : cancel: reconciler_cancel.clone(),
1710 : persistence: persistence.clone(),
1711 : compute_notify_failure: false,
1712 : http_client,
1713 : };
1714 :
1715 : let reconcile_seq = self.sequence;
1716 : let long_reconcile_threshold = service_config.long_reconcile_threshold;
1717 :
1718 : tracing::info!(seq=%reconcile_seq, "Spawning Reconciler ({reason:?})");
1719 : let must_notify = self.pending_compute_notification;
1720 : let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
1721 : tenant_id=%reconciler.tenant_shard_id.tenant_id,
1722 : shard_id=%reconciler.tenant_shard_id.shard_slug());
1723 : metrics::METRICS_REGISTRY
1724 : .metrics_group
1725 : .storage_controller_reconcile_spawn
1726 : .inc();
1727 : let result_tx = result_tx.clone();
1728 : let join_handle = tokio::task::spawn(
1729 0 : async move {
1730 : // Wait for any previous reconcile task to complete before we start
1731 0 : if let Some(old_handle) = old_handle {
1732 0 : old_handle.cancel.cancel();
1733 0 : if let Err(e) = old_handle.handle.await {
1734 : // We can't do much with this other than log it: the task is done, so
1735 : // we may proceed with our work.
1736 0 : tracing::error!("Unexpected join error waiting for reconcile task: {e}");
1737 0 : }
1738 0 : }
1739 :
1740 : // Early check for cancellation before doing any work
1741 : // TODO: wrap all remote API operations in cancellation check
1742 : // as well.
1743 0 : if reconciler.cancel.is_cancelled() {
1744 0 : metrics::METRICS_REGISTRY
1745 0 : .metrics_group
1746 0 : .storage_controller_reconcile_complete
1747 0 : .inc(ReconcileCompleteLabelGroup {
1748 0 : status: ReconcileOutcome::Cancel,
1749 0 : });
1750 0 : return;
1751 0 : }
1752 :
1753 0 : let (tenant_id_label, shard_number_label, sequence_label) = {
1754 0 : (
1755 0 : reconciler.tenant_shard_id.tenant_id.to_string(),
1756 0 : reconciler.tenant_shard_id.shard_number.0.to_string(),
1757 0 : reconcile_seq.to_string(),
1758 0 : )
1759 0 : };
1760 :
1761 0 : let label_group = ReconcileLongRunningLabelGroup {
1762 0 : tenant_id: &tenant_id_label,
1763 0 : shard_number: &shard_number_label,
1764 0 : sequence: &sequence_label,
1765 0 : };
1766 :
1767 0 : let reconcile_fut = Self::reconcile(reconcile_seq, reconciler, must_notify);
1768 0 : let long_reconcile_fut = {
1769 0 : let label_group = label_group.clone();
1770 0 : async move {
1771 0 : tokio::time::sleep(long_reconcile_threshold).await;
1772 :
1773 0 : tracing::warn!("Reconcile passed the long running threshold of {long_reconcile_threshold:?}");
1774 :
1775 0 : metrics::METRICS_REGISTRY
1776 0 : .metrics_group
1777 0 : .storage_controller_reconcile_long_running
1778 0 : .inc(label_group);
1779 0 : }
1780 : };
1781 :
1782 0 : let reconcile_fut = std::pin::pin!(reconcile_fut);
1783 0 : let long_reconcile_fut = std::pin::pin!(long_reconcile_fut);
1784 :
1785 0 : let (was_long, result) =
1786 0 : match future::select(reconcile_fut, long_reconcile_fut).await {
1787 0 : Either::Left((reconcile_result, _)) => (false, reconcile_result),
1788 0 : Either::Right((_, reconcile_fut)) => (true, reconcile_fut.await),
1789 : };
1790 :
1791 0 : if was_long {
1792 0 : let id = metrics::METRICS_REGISTRY
1793 0 : .metrics_group
1794 0 : .storage_controller_reconcile_long_running
1795 0 : .with_labels(label_group);
1796 0 : metrics::METRICS_REGISTRY
1797 0 : .metrics_group
1798 0 : .storage_controller_reconcile_long_running
1799 0 : .remove_metric(id);
1800 0 : }
1801 :
1802 0 : result_tx
1803 0 : .send(ReconcileResultRequest::ReconcileResult(result))
1804 0 : .ok();
1805 0 : }
1806 : .instrument(reconciler_span),
1807 : );
1808 :
1809 : self.reconciler = Some(ReconcilerHandle {
1810 : sequence: self.sequence,
1811 : handle: join_handle,
1812 : cancel: reconciler_cancel,
1813 : });
1814 :
1815 : Some(ReconcilerWaiter {
1816 : tenant_shard_id: self.tenant_shard_id,
1817 : seq_wait: self.waiter.clone(),
1818 : error_seq_wait: self.error_waiter.clone(),
1819 : error: self.last_error.clone(),
1820 : seq: self.sequence,
1821 : })
1822 : }
1823 :
1824 0 : pub(crate) fn cancel_reconciler(&self) {
1825 0 : if let Some(handle) = self.reconciler.as_ref() {
1826 0 : handle.cancel.cancel()
1827 0 : }
1828 0 : }
1829 :
1830 : /// Get a waiter for any reconciliation in flight, but do not start reconciliation
1831 : /// if it is not already running
1832 0 : pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
1833 0 : if self.reconciler.is_some() {
1834 0 : Some(ReconcilerWaiter {
1835 0 : tenant_shard_id: self.tenant_shard_id,
1836 0 : seq_wait: self.waiter.clone(),
1837 0 : error_seq_wait: self.error_waiter.clone(),
1838 0 : error: self.last_error.clone(),
1839 0 : seq: self.sequence,
1840 0 : })
1841 : } else {
1842 0 : None
1843 : }
1844 0 : }
1845 :
1846 : /// Called when a ReconcileResult has been emitted and the service is updating
1847 : /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
1848 : /// the handle to indicate there is no longer a reconciliation in progress.
1849 0 : pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
1850 0 : if let Some(reconcile_handle) = &self.reconciler {
1851 0 : if reconcile_handle.sequence <= sequence {
1852 0 : self.reconciler = None;
1853 0 : }
1854 0 : }
1855 0 : }
1856 :
1857 : /// If we had any state at all referring to this node ID, drop it. Does not
1858 : /// attempt to reschedule.
1859 : ///
1860 : /// Returns true if we modified the node's intent state.
1861 0 : pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
1862 0 : let mut intent_modified = false;
1863 :
1864 : // Drop if this node was our attached intent
1865 0 : if self.intent.attached == Some(node_id) {
1866 0 : self.intent.attached = None;
1867 0 : intent_modified = true;
1868 0 : }
1869 :
1870 : // Drop from the list of secondaries, and check if we modified it
1871 0 : let had_secondaries = self.intent.secondary.len();
1872 0 : self.intent.secondary.retain(|n| n != &node_id);
1873 0 : intent_modified |= self.intent.secondary.len() != had_secondaries;
1874 :
1875 0 : debug_assert!(!self.intent.all_pageservers().contains(&node_id));
1876 :
1877 0 : if self.preferred_node == Some(node_id) {
1878 0 : self.preferred_node = None;
1879 0 : }
1880 :
1881 0 : intent_modified
1882 0 : }
1883 :
1884 0 : pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
1885 0 : self.scheduling_policy = p;
1886 0 : }
1887 :
1888 0 : pub(crate) fn get_scheduling_policy(&self) -> ShardSchedulingPolicy {
1889 0 : self.scheduling_policy
1890 0 : }
1891 :
1892 0 : pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
1893 : // Ordering: always set last_error before advancing sequence, so that sequence
1894 : // waiters are guaranteed to see a Some value when they see an error.
1895 0 : *(self.last_error.lock().unwrap()) = Some(Arc::new(error));
1896 0 : self.error_waiter.advance(sequence);
1897 0 : }
1898 :
1899 0 : pub(crate) fn from_persistent(
1900 0 : tsp: TenantShardPersistence,
1901 0 : intent: IntentState,
1902 0 : ) -> anyhow::Result<Self> {
1903 0 : let tenant_shard_id = tsp.get_tenant_shard_id()?;
1904 0 : let shard_identity = tsp.get_shard_identity()?;
1905 :
1906 0 : metrics::METRICS_REGISTRY
1907 0 : .metrics_group
1908 0 : .storage_controller_tenant_shards
1909 0 : .inc();
1910 :
1911 : Ok(Self {
1912 0 : tenant_shard_id,
1913 0 : shard: shard_identity,
1914 0 : sequence: Sequence::initial(),
1915 0 : generation: tsp.generation.map(|g| Generation::new(g as u32)),
1916 0 : policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
1917 0 : intent,
1918 0 : observed: ObservedState::new(),
1919 0 : config: serde_json::from_str(&tsp.config).unwrap(),
1920 0 : reconciler: None,
1921 0 : splitting: tsp.splitting,
1922 : // Filled in during [`Service::startup_reconcile`]
1923 0 : importing: TimelineImportState::Idle,
1924 0 : waiter: Arc::new(SeqWait::new(Sequence::initial())),
1925 0 : error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
1926 0 : last_error: Arc::default(),
1927 : consecutive_reconciles_count: 0,
1928 : pending_compute_notification: false,
1929 : delayed_reconcile: false,
1930 0 : scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
1931 0 : preferred_node: None,
1932 : })
1933 0 : }
1934 :
1935 0 : pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
1936 : TenantShardPersistence {
1937 0 : tenant_id: self.tenant_shard_id.tenant_id.to_string(),
1938 0 : shard_number: self.tenant_shard_id.shard_number.0 as i32,
1939 0 : shard_count: self.tenant_shard_id.shard_count.literal() as i32,
1940 0 : shard_stripe_size: self.shard.stripe_size.0 as i32,
1941 0 : generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
1942 0 : generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
1943 0 : placement_policy: serde_json::to_string(&self.policy).unwrap(),
1944 0 : config: serde_json::to_string(&self.config).unwrap(),
1945 0 : splitting: SplitState::default(),
1946 0 : scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
1947 0 : preferred_az_id: self.intent.preferred_az_id.as_ref().map(|az| az.0.clone()),
1948 : }
1949 0 : }
1950 :
1951 12508 : pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> {
1952 12508 : self.intent.get_preferred_az()
1953 12508 : }
1954 :
1955 2 : pub(crate) fn set_preferred_az(
1956 2 : &mut self,
1957 2 : scheduler: &mut Scheduler,
1958 2 : preferred_az_id: Option<AvailabilityZone>,
1959 2 : ) {
1960 2 : self.intent.set_preferred_az(scheduler, preferred_az_id);
1961 2 : }
1962 :
1963 : /// Returns all the nodes to which this tenant shard is attached according to the
1964 : /// observed state and the generations. Return vector is sorted from latest generation
1965 : /// to earliest.
1966 0 : pub(crate) fn attached_locations(&self) -> Vec<(NodeId, Generation)> {
1967 0 : self.observed
1968 0 : .locations
1969 0 : .iter()
1970 0 : .filter_map(|(node_id, observed)| {
1971 : use LocationConfigMode::{AttachedMulti, AttachedSingle, AttachedStale};
1972 :
1973 0 : let conf = observed.conf.as_ref()?;
1974 :
1975 0 : match (conf.generation, conf.mode) {
1976 0 : (Some(gen_), AttachedMulti | AttachedSingle | AttachedStale) => {
1977 0 : Some((*node_id, gen_))
1978 : }
1979 0 : _ => None,
1980 : }
1981 0 : })
1982 0 : .sorted_by(|(_lhs_node_id, lhs_gen), (_rhs_node_id, rhs_gen)| {
1983 0 : lhs_gen.cmp(rhs_gen).reverse()
1984 0 : })
1985 0 : .map(|(node_id, gen_)| (node_id, Generation::new(gen_)))
1986 0 : .collect()
1987 0 : }
1988 :
1989 : /// Update the observed state of the tenant by applying incremental deltas
1990 : ///
1991 : /// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
1992 : /// They are then filtered in [`crate::service::Service::process_result`].
1993 0 : pub(crate) fn apply_observed_deltas(
1994 0 : &mut self,
1995 0 : deltas: impl Iterator<Item = ObservedStateDelta>,
1996 0 : ) {
1997 0 : for delta in deltas {
1998 0 : match delta {
1999 0 : ObservedStateDelta::Upsert(ups) => {
2000 0 : let (node_id, loc) = *ups;
2001 :
2002 : // If the generation of the observed location in the delta is lagging
2003 : // behind the current one, then we have a race condition and cannot
2004 : // be certain about the true observed state. Set the observed state
2005 : // to None in order to reflect this.
2006 0 : let crnt_gen = self
2007 0 : .observed
2008 0 : .locations
2009 0 : .get(&node_id)
2010 0 : .and_then(|loc| loc.conf.as_ref())
2011 0 : .and_then(|conf| conf.generation);
2012 0 : let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
2013 0 : match (crnt_gen, new_gen) {
2014 0 : (Some(crnt), Some(new)) if crnt_gen > new_gen => {
2015 0 : tracing::warn!(
2016 0 : "Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
2017 : node_id,
2018 : loc,
2019 : crnt,
2020 : new
2021 : );
2022 :
2023 0 : self.observed
2024 0 : .locations
2025 0 : .insert(node_id, ObservedStateLocation { conf: None });
2026 :
2027 0 : continue;
2028 : }
2029 0 : _ => {}
2030 : }
2031 :
2032 0 : if let Some(conf) = &loc.conf {
2033 0 : tracing::info!("Updating observed location {}: {:?}", node_id, conf);
2034 : } else {
2035 0 : tracing::info!("Setting observed location {} to None", node_id,)
2036 : }
2037 :
2038 0 : self.observed.locations.insert(node_id, loc);
2039 : }
2040 0 : ObservedStateDelta::Delete(node_id) => {
2041 0 : tracing::info!("Deleting observed location {}", node_id);
2042 0 : self.observed.locations.remove(&node_id);
2043 : }
2044 : }
2045 : }
2046 0 : }
2047 :
2048 : /// Returns true if the tenant shard is attached to a node that is outside the preferred AZ.
2049 : ///
2050 : /// If the shard does not have a preferred AZ, returns false.
2051 0 : pub(crate) fn is_attached_outside_preferred_az(&self, nodes: &HashMap<NodeId, Node>) -> bool {
2052 0 : self.intent
2053 0 : .get_attached()
2054 0 : .map(|node_id| {
2055 0 : Some(
2056 0 : nodes
2057 0 : .get(&node_id)
2058 0 : .expect("referenced node exists")
2059 0 : .get_availability_zone_id(),
2060 0 : ) != self.intent.preferred_az_id.as_ref()
2061 0 : })
2062 0 : .unwrap_or(false)
2063 0 : }
2064 : }
2065 :
2066 : impl Drop for TenantShard {
2067 12843 : fn drop(&mut self) {
2068 12843 : metrics::METRICS_REGISTRY
2069 12843 : .metrics_group
2070 12843 : .storage_controller_tenant_shards
2071 12843 : .dec();
2072 12843 : }
2073 : }
2074 :
2075 : #[cfg(test)]
2076 : pub(crate) mod tests {
2077 : use std::cell::RefCell;
2078 : use std::rc::Rc;
2079 :
2080 : use pageserver_api::controller_api::NodeAvailability;
2081 : use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber};
2082 : use rand::SeedableRng;
2083 : use rand::rngs::StdRng;
2084 : use utils::id::TenantId;
2085 :
2086 : use super::*;
2087 : use crate::scheduler::test_utils::make_test_nodes;
2088 :
2089 12 : fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
2090 12 : let tenant_id = TenantId::generate();
2091 12 : let shard_number = ShardNumber(0);
2092 12 : let shard_count = ShardCount::new(1);
2093 12 : let stripe_size = DEFAULT_STRIPE_SIZE;
2094 :
2095 12 : let tenant_shard_id = TenantShardId {
2096 12 : tenant_id,
2097 12 : shard_number,
2098 12 : shard_count,
2099 12 : };
2100 12 : TenantShard::new(
2101 12 : tenant_shard_id,
2102 12 : ShardIdentity::new(shard_number, shard_count, stripe_size).unwrap(),
2103 12 : policy,
2104 12 : None,
2105 : )
2106 12 : }
2107 :
2108 5004 : pub(crate) fn make_test_tenant(
2109 5004 : policy: PlacementPolicy,
2110 5004 : shard_count: ShardCount,
2111 5004 : preferred_az: Option<AvailabilityZone>,
2112 5004 : ) -> Vec<TenantShard> {
2113 5004 : make_test_tenant_with_id(TenantId::generate(), policy, shard_count, preferred_az)
2114 5004 : }
2115 :
2116 5007 : pub(crate) fn make_test_tenant_with_id(
2117 5007 : tenant_id: TenantId,
2118 5007 : policy: PlacementPolicy,
2119 5007 : shard_count: ShardCount,
2120 5007 : preferred_az: Option<AvailabilityZone>,
2121 5007 : ) -> Vec<TenantShard> {
2122 5007 : let stripe_size = DEFAULT_STRIPE_SIZE;
2123 5007 : (0..shard_count.count())
2124 12522 : .map(|i| {
2125 12522 : let shard_number = ShardNumber(i);
2126 :
2127 12522 : let tenant_shard_id = TenantShardId {
2128 12522 : tenant_id,
2129 12522 : shard_number,
2130 12522 : shard_count,
2131 12522 : };
2132 12522 : TenantShard::new(
2133 12522 : tenant_shard_id,
2134 12522 : ShardIdentity::new(shard_number, shard_count, stripe_size).unwrap(),
2135 12522 : policy.clone(),
2136 12522 : preferred_az.clone(),
2137 : )
2138 12522 : })
2139 5007 : .collect()
2140 5007 : }
2141 :
2142 : /// Test the scheduling behaviors used when a tenant configured for HA is subject
2143 : /// to nodes being marked offline.
2144 : #[test]
2145 1 : fn tenant_ha_scheduling() -> anyhow::Result<()> {
2146 : // Start with three nodes. Our tenant will only use two. The third one is
2147 : // expected to remain unused.
2148 1 : let mut nodes = make_test_nodes(3, &[]);
2149 :
2150 1 : let mut scheduler = Scheduler::new(nodes.values());
2151 1 : let mut context = ScheduleContext::default();
2152 :
2153 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
2154 1 : tenant_shard
2155 1 : .schedule(&mut scheduler, &mut context)
2156 1 : .expect("we have enough nodes, scheduling should work");
2157 :
2158 : // Expect to initially be schedule on to different nodes
2159 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
2160 1 : assert!(tenant_shard.intent.attached.is_some());
2161 :
2162 1 : let attached_node_id = tenant_shard.intent.attached.unwrap();
2163 1 : let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
2164 1 : assert_ne!(attached_node_id, secondary_node_id);
2165 :
2166 : // Notifying the attached node is offline should demote it to a secondary
2167 1 : let changed = tenant_shard
2168 1 : .intent
2169 1 : .demote_attached(&mut scheduler, attached_node_id);
2170 1 : assert!(changed);
2171 1 : assert!(tenant_shard.intent.attached.is_none());
2172 1 : assert_eq!(tenant_shard.intent.secondary.len(), 2);
2173 :
2174 : // Update the scheduler state to indicate the node is offline
2175 1 : nodes
2176 1 : .get_mut(&attached_node_id)
2177 1 : .unwrap()
2178 1 : .set_availability(NodeAvailability::Offline);
2179 1 : scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
2180 :
2181 : // Scheduling the node should promote the still-available secondary node to attached
2182 1 : tenant_shard
2183 1 : .schedule(&mut scheduler, &mut context)
2184 1 : .expect("active nodes are available");
2185 1 : assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
2186 :
2187 : // The original attached node should have been retained as a secondary
2188 1 : assert_eq!(
2189 1 : *tenant_shard.intent.secondary.iter().last().unwrap(),
2190 : attached_node_id
2191 : );
2192 :
2193 1 : tenant_shard.intent.clear(&mut scheduler);
2194 :
2195 1 : Ok(())
2196 1 : }
2197 :
2198 : #[test]
2199 1 : fn intent_from_observed() -> anyhow::Result<()> {
2200 1 : let nodes = make_test_nodes(3, &[]);
2201 1 : let mut scheduler = Scheduler::new(nodes.values());
2202 :
2203 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
2204 :
2205 1 : tenant_shard.observed.locations.insert(
2206 1 : NodeId(3),
2207 1 : ObservedStateLocation {
2208 1 : conf: Some(LocationConfig {
2209 1 : mode: LocationConfigMode::AttachedMulti,
2210 1 : generation: Some(2),
2211 1 : secondary_conf: None,
2212 1 : shard_number: tenant_shard.shard.number.0,
2213 1 : shard_count: tenant_shard.shard.count.literal(),
2214 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
2215 1 : tenant_conf: TenantConfig::default(),
2216 1 : }),
2217 1 : },
2218 : );
2219 :
2220 1 : tenant_shard.observed.locations.insert(
2221 1 : NodeId(2),
2222 1 : ObservedStateLocation {
2223 1 : conf: Some(LocationConfig {
2224 1 : mode: LocationConfigMode::AttachedStale,
2225 1 : generation: Some(1),
2226 1 : secondary_conf: None,
2227 1 : shard_number: tenant_shard.shard.number.0,
2228 1 : shard_count: tenant_shard.shard.count.literal(),
2229 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
2230 1 : tenant_conf: TenantConfig::default(),
2231 1 : }),
2232 1 : },
2233 : );
2234 :
2235 1 : tenant_shard.intent_from_observed(&mut scheduler);
2236 :
2237 : // The highest generationed attached location gets used as attached
2238 1 : assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
2239 : // Other locations get used as secondary
2240 1 : assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
2241 :
2242 1 : scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
2243 :
2244 1 : tenant_shard.intent.clear(&mut scheduler);
2245 1 : Ok(())
2246 1 : }
2247 :
2248 : #[test]
2249 1 : fn scheduling_mode() -> anyhow::Result<()> {
2250 1 : let nodes = make_test_nodes(3, &[]);
2251 1 : let mut scheduler = Scheduler::new(nodes.values());
2252 :
2253 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
2254 :
2255 : // In pause mode, schedule() shouldn't do anything
2256 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
2257 1 : assert!(
2258 1 : tenant_shard
2259 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
2260 1 : .is_ok()
2261 : );
2262 1 : assert!(tenant_shard.intent.all_pageservers().is_empty());
2263 :
2264 : // In active mode, schedule() works
2265 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
2266 1 : assert!(
2267 1 : tenant_shard
2268 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
2269 1 : .is_ok()
2270 : );
2271 1 : assert!(!tenant_shard.intent.all_pageservers().is_empty());
2272 :
2273 1 : tenant_shard.intent.clear(&mut scheduler);
2274 1 : Ok(())
2275 1 : }
2276 :
2277 : #[test]
2278 : /// Simple case: moving attachment to somewhere better where we already have a secondary
2279 1 : fn optimize_attachment_simple() -> anyhow::Result<()> {
2280 1 : let nodes = make_test_nodes(
2281 : 3,
2282 1 : &[
2283 1 : AvailabilityZone("az-a".to_string()),
2284 1 : AvailabilityZone("az-b".to_string()),
2285 1 : AvailabilityZone("az-c".to_string()),
2286 1 : ],
2287 : );
2288 1 : let mut scheduler = Scheduler::new(nodes.values());
2289 :
2290 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2291 1 : shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2292 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
2293 1 : shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2294 :
2295 : // Initially: both nodes attached on shard 1, and both have secondary locations
2296 : // on different nodes.
2297 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
2298 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(1));
2299 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
2300 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
2301 :
2302 1 : fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
2303 1 : let mut schedule_context = ScheduleContext::default();
2304 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
2305 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
2306 1 : schedule_context
2307 1 : }
2308 :
2309 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2310 1 : let optimization_a = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2311 1 : assert_eq!(
2312 : optimization_a,
2313 1 : Some(ScheduleOptimization {
2314 1 : sequence: shard_a.sequence,
2315 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2316 1 : old_attached_node_id: NodeId(2),
2317 1 : new_attached_node_id: NodeId(1)
2318 1 : })
2319 1 : })
2320 : );
2321 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
2322 :
2323 : // // Either shard should recognize that it has the option to switch to a secondary location where there
2324 : // // would be no other shards from the same tenant, and request to do so.
2325 : // assert_eq!(
2326 : // optimization_a_prepare,
2327 : // Some(ScheduleOptimization {
2328 : // sequence: shard_a.sequence,
2329 : // action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
2330 : // })
2331 : // );
2332 : // shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
2333 :
2334 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2335 : // let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2336 : // assert_eq!(
2337 : // optimization_a_migrate,
2338 : // Some(ScheduleOptimization {
2339 : // sequence: shard_a.sequence,
2340 : // action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2341 : // old_attached_node_id: NodeId(1),
2342 : // new_attached_node_id: NodeId(2)
2343 : // })
2344 : // })
2345 : // );
2346 : // shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
2347 :
2348 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2349 : // let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2350 : // assert_eq!(
2351 : // optimization_a_cleanup,
2352 : // Some(ScheduleOptimization {
2353 : // sequence: shard_a.sequence,
2354 : // action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
2355 : // })
2356 : // );
2357 : // shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
2358 :
2359 : // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
2360 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2361 : // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
2362 :
2363 1 : shard_a.intent.clear(&mut scheduler);
2364 1 : shard_b.intent.clear(&mut scheduler);
2365 :
2366 1 : Ok(())
2367 1 : }
2368 :
2369 : #[test]
2370 : /// Complicated case: moving attachment to somewhere better where we do not have a secondary
2371 : /// already, creating one as needed.
2372 1 : fn optimize_attachment_multistep() -> anyhow::Result<()> {
2373 1 : let nodes = make_test_nodes(
2374 : 3,
2375 1 : &[
2376 1 : AvailabilityZone("az-a".to_string()),
2377 1 : AvailabilityZone("az-b".to_string()),
2378 1 : AvailabilityZone("az-c".to_string()),
2379 1 : ],
2380 : );
2381 1 : let mut scheduler = Scheduler::new(nodes.values());
2382 :
2383 : // Two shards of a tenant that wants to be in AZ A
2384 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2385 1 : shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2386 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
2387 1 : shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2388 :
2389 : // Both shards are initially attached in non-home AZ _and_ have secondaries in non-home AZs
2390 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
2391 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
2392 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(3)));
2393 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
2394 :
2395 3 : fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
2396 3 : let mut schedule_context = ScheduleContext::default();
2397 3 : schedule_context.avoid(&shard_a.intent.all_pageservers());
2398 3 : schedule_context.avoid(&shard_b.intent.all_pageservers());
2399 3 : schedule_context
2400 3 : }
2401 :
2402 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2403 1 : let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2404 1 : assert_eq!(
2405 : optimization_a_prepare,
2406 1 : Some(ScheduleOptimization {
2407 1 : sequence: shard_a.sequence,
2408 1 : action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
2409 1 : })
2410 : );
2411 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
2412 :
2413 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2414 1 : let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2415 1 : assert_eq!(
2416 : optimization_a_migrate,
2417 1 : Some(ScheduleOptimization {
2418 1 : sequence: shard_a.sequence,
2419 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2420 1 : old_attached_node_id: NodeId(2),
2421 1 : new_attached_node_id: NodeId(1)
2422 1 : })
2423 1 : })
2424 : );
2425 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
2426 :
2427 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2428 1 : let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2429 1 : assert_eq!(
2430 : optimization_a_cleanup,
2431 1 : Some(ScheduleOptimization {
2432 1 : sequence: shard_a.sequence,
2433 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
2434 1 : })
2435 : );
2436 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
2437 :
2438 : // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
2439 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2440 : // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
2441 :
2442 1 : shard_a.intent.clear(&mut scheduler);
2443 1 : shard_b.intent.clear(&mut scheduler);
2444 :
2445 1 : Ok(())
2446 1 : }
2447 :
2448 : #[test]
2449 : /// How the optimisation code handles a shard with a preferred node set; this is an example
2450 : /// of the multi-step migration, but driven by a different input.
2451 1 : fn optimize_attachment_multi_preferred_node() -> anyhow::Result<()> {
2452 1 : let nodes = make_test_nodes(
2453 : 4,
2454 1 : &[
2455 1 : AvailabilityZone("az-a".to_string()),
2456 1 : AvailabilityZone("az-a".to_string()),
2457 1 : AvailabilityZone("az-b".to_string()),
2458 1 : AvailabilityZone("az-b".to_string()),
2459 1 : ],
2460 : );
2461 1 : let mut scheduler = Scheduler::new(nodes.values());
2462 :
2463 : // Two shards of a tenant that wants to be in AZ A
2464 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2465 1 : shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2466 :
2467 : // Initially attached in a stable location
2468 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
2469 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
2470 :
2471 : // Set the preferred node to node 2, an equally high scoring node to its current location
2472 1 : shard_a.preferred_node = Some(NodeId(2));
2473 :
2474 3 : fn make_schedule_context(shard_a: &TenantShard) -> ScheduleContext {
2475 3 : let mut schedule_context = ScheduleContext::default();
2476 3 : schedule_context.avoid(&shard_a.intent.all_pageservers());
2477 3 : schedule_context
2478 3 : }
2479 :
2480 1 : let schedule_context = make_schedule_context(&shard_a);
2481 1 : let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2482 1 : assert_eq!(
2483 : optimization_a_prepare,
2484 1 : Some(ScheduleOptimization {
2485 1 : sequence: shard_a.sequence,
2486 1 : action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
2487 1 : })
2488 : );
2489 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
2490 :
2491 : // The first step of the optimisation should not have cleared the preferred node
2492 1 : assert_eq!(shard_a.preferred_node, Some(NodeId(2)));
2493 :
2494 1 : let schedule_context = make_schedule_context(&shard_a);
2495 1 : let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2496 1 : assert_eq!(
2497 : optimization_a_migrate,
2498 1 : Some(ScheduleOptimization {
2499 1 : sequence: shard_a.sequence,
2500 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2501 1 : old_attached_node_id: NodeId(1),
2502 1 : new_attached_node_id: NodeId(2)
2503 1 : })
2504 1 : })
2505 : );
2506 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
2507 :
2508 : // The cutover step of the optimisation should have cleared the preferred node
2509 1 : assert_eq!(shard_a.preferred_node, None);
2510 :
2511 1 : let schedule_context = make_schedule_context(&shard_a);
2512 1 : let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2513 1 : assert_eq!(
2514 : optimization_a_cleanup,
2515 1 : Some(ScheduleOptimization {
2516 1 : sequence: shard_a.sequence,
2517 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
2518 1 : })
2519 : );
2520 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
2521 :
2522 1 : shard_a.intent.clear(&mut scheduler);
2523 :
2524 1 : Ok(())
2525 1 : }
2526 :
2527 : #[test]
2528 : /// Check that multi-step migration works when moving to somewhere that is only better by
2529 : /// 1 AffinityScore -- this ensures that we don't have a bug like the intermediate secondary
2530 : /// counting toward the affinity score such that it prevents the rest of the migration from happening.
2531 1 : fn optimize_attachment_marginal() -> anyhow::Result<()> {
2532 1 : let nodes = make_test_nodes(2, &[]);
2533 1 : let mut scheduler = Scheduler::new(nodes.values());
2534 :
2535 : // Multi-sharded tenant, we will craft a situation where affinity
2536 : // scores differ only slightly
2537 1 : let mut shards = make_test_tenant(PlacementPolicy::Attached(0), ShardCount::new(4), None);
2538 :
2539 : // 1 attached on node 1
2540 1 : shards[0]
2541 1 : .intent
2542 1 : .set_attached(&mut scheduler, Some(NodeId(1)));
2543 : // 3 attached on node 2
2544 1 : shards[1]
2545 1 : .intent
2546 1 : .set_attached(&mut scheduler, Some(NodeId(2)));
2547 1 : shards[2]
2548 1 : .intent
2549 1 : .set_attached(&mut scheduler, Some(NodeId(2)));
2550 1 : shards[3]
2551 1 : .intent
2552 1 : .set_attached(&mut scheduler, Some(NodeId(2)));
2553 :
2554 : // The scheduler should figure out that we need to:
2555 : // - Create a secondary for shard 3 on node 1
2556 : // - Migrate shard 3 to node 1
2557 : // - Remove shard 3's location on node 2
2558 :
2559 4 : fn make_schedule_context(shards: &Vec<TenantShard>) -> ScheduleContext {
2560 4 : let mut schedule_context = ScheduleContext::default();
2561 20 : for shard in shards {
2562 16 : schedule_context.avoid(&shard.intent.all_pageservers());
2563 16 : }
2564 4 : schedule_context
2565 4 : }
2566 :
2567 1 : let schedule_context = make_schedule_context(&shards);
2568 1 : let optimization_a_prepare =
2569 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context);
2570 1 : assert_eq!(
2571 : optimization_a_prepare,
2572 1 : Some(ScheduleOptimization {
2573 1 : sequence: shards[1].sequence,
2574 1 : action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
2575 1 : })
2576 : );
2577 1 : shards[1].apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
2578 :
2579 1 : let schedule_context = make_schedule_context(&shards);
2580 1 : let optimization_a_migrate =
2581 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context);
2582 1 : assert_eq!(
2583 : optimization_a_migrate,
2584 1 : Some(ScheduleOptimization {
2585 1 : sequence: shards[1].sequence,
2586 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2587 1 : old_attached_node_id: NodeId(2),
2588 1 : new_attached_node_id: NodeId(1)
2589 1 : })
2590 1 : })
2591 : );
2592 1 : shards[1].apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
2593 :
2594 1 : let schedule_context = make_schedule_context(&shards);
2595 1 : let optimization_a_cleanup =
2596 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context);
2597 1 : assert_eq!(
2598 : optimization_a_cleanup,
2599 1 : Some(ScheduleOptimization {
2600 1 : sequence: shards[1].sequence,
2601 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
2602 1 : })
2603 : );
2604 1 : shards[1].apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
2605 :
2606 : // Everything should be stable now
2607 1 : let schedule_context = make_schedule_context(&shards);
2608 1 : assert_eq!(
2609 1 : shards[0].optimize_attachment(&mut scheduler, &schedule_context),
2610 : None
2611 : );
2612 1 : assert_eq!(
2613 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context),
2614 : None
2615 : );
2616 1 : assert_eq!(
2617 1 : shards[2].optimize_attachment(&mut scheduler, &schedule_context),
2618 : None
2619 : );
2620 1 : assert_eq!(
2621 1 : shards[3].optimize_attachment(&mut scheduler, &schedule_context),
2622 : None
2623 : );
2624 :
2625 5 : for mut shard in shards {
2626 4 : shard.intent.clear(&mut scheduler);
2627 4 : }
2628 :
2629 1 : Ok(())
2630 1 : }
2631 :
2632 : #[test]
2633 1 : fn optimize_secondary() -> anyhow::Result<()> {
2634 1 : let nodes = make_test_nodes(4, &[]);
2635 1 : let mut scheduler = Scheduler::new(nodes.values());
2636 :
2637 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2638 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
2639 :
2640 : // Initially: both nodes attached on shard 1, and both have secondary locations
2641 : // on different nodes.
2642 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
2643 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
2644 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
2645 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
2646 :
2647 1 : let mut schedule_context = ScheduleContext::default();
2648 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
2649 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
2650 :
2651 1 : let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);
2652 :
2653 : // Since there is a node with no locations available, the node with two locations for the
2654 : // same tenant should generate an optimization to move one away
2655 1 : assert_eq!(
2656 : optimization_a,
2657 1 : Some(ScheduleOptimization {
2658 1 : sequence: shard_a.sequence,
2659 1 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
2660 1 : old_node_id: NodeId(3),
2661 1 : new_node_id: NodeId(4)
2662 1 : })
2663 1 : })
2664 : );
2665 :
2666 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
2667 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
2668 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
2669 :
2670 1 : shard_a.intent.clear(&mut scheduler);
2671 1 : shard_b.intent.clear(&mut scheduler);
2672 :
2673 1 : Ok(())
2674 1 : }
2675 :
2676 : /// Test how the optimisation code behaves with an extra secondary
2677 : #[test]
2678 1 : fn optimize_removes_secondary() -> anyhow::Result<()> {
2679 1 : let az_a_tag = AvailabilityZone("az-a".to_string());
2680 1 : let az_b_tag = AvailabilityZone("az-b".to_string());
2681 1 : let mut nodes = make_test_nodes(
2682 : 4,
2683 1 : &[
2684 1 : az_a_tag.clone(),
2685 1 : az_b_tag.clone(),
2686 1 : az_a_tag.clone(),
2687 1 : az_b_tag.clone(),
2688 1 : ],
2689 : );
2690 1 : let mut scheduler = Scheduler::new(nodes.values());
2691 :
2692 1 : let mut schedule_context = ScheduleContext::default();
2693 :
2694 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2695 1 : shard_a.intent.preferred_az_id = Some(az_a_tag.clone());
2696 1 : shard_a
2697 1 : .schedule(&mut scheduler, &mut schedule_context)
2698 1 : .unwrap();
2699 :
2700 : // Attached on node 1, secondary on node 2
2701 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
2702 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(2)]);
2703 :
2704 : // Initially optimiser is idle
2705 1 : assert_eq!(
2706 1 : shard_a.optimize_attachment(&mut scheduler, &schedule_context),
2707 : None
2708 : );
2709 1 : assert_eq!(
2710 1 : shard_a.optimize_secondary(&mut scheduler, &schedule_context),
2711 : None
2712 : );
2713 :
2714 : // A spare secondary in the home AZ: it should be removed -- this is the situation when we're midway through a graceful migration, after cutting over
2715 : // to our new location
2716 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
2717 1 : let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2718 1 : assert_eq!(
2719 : optimization,
2720 1 : Some(ScheduleOptimization {
2721 1 : sequence: shard_a.sequence,
2722 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
2723 1 : })
2724 : );
2725 1 : shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
2726 :
2727 : // A spare secondary in the non-home AZ, and one of them is offline
2728 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(4));
2729 1 : nodes
2730 1 : .get_mut(&NodeId(4))
2731 1 : .unwrap()
2732 1 : .set_availability(NodeAvailability::Offline);
2733 1 : scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
2734 1 : let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2735 1 : assert_eq!(
2736 : optimization,
2737 1 : Some(ScheduleOptimization {
2738 1 : sequence: shard_a.sequence,
2739 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(4))
2740 1 : })
2741 : );
2742 1 : shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
2743 :
2744 : // A spare secondary when should have none
2745 1 : shard_a.policy = PlacementPolicy::Attached(0);
2746 1 : let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2747 1 : assert_eq!(
2748 : optimization,
2749 1 : Some(ScheduleOptimization {
2750 1 : sequence: shard_a.sequence,
2751 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
2752 1 : })
2753 : );
2754 1 : shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
2755 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
2756 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![]);
2757 :
2758 : // Check that in secondary mode, we preserve the secondary in the preferred AZ
2759 1 : let mut schedule_context = ScheduleContext::default(); // Fresh context, we're about to call schedule()
2760 1 : shard_a.policy = PlacementPolicy::Secondary;
2761 1 : shard_a
2762 1 : .schedule(&mut scheduler, &mut schedule_context)
2763 1 : .unwrap();
2764 1 : assert_eq!(shard_a.intent.get_attached(), &None);
2765 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
2766 1 : assert_eq!(
2767 1 : shard_a.optimize_attachment(&mut scheduler, &schedule_context),
2768 : None
2769 : );
2770 1 : assert_eq!(
2771 1 : shard_a.optimize_secondary(&mut scheduler, &schedule_context),
2772 : None
2773 : );
2774 :
2775 1 : shard_a.intent.clear(&mut scheduler);
2776 :
2777 1 : Ok(())
2778 1 : }
2779 :
2780 : // Optimize til quiescent: this emulates what Service::optimize_all does, when
2781 : // called repeatedly in the background.
2782 : // Returns the applied optimizations
2783 3 : fn optimize_til_idle(
2784 3 : scheduler: &mut Scheduler,
2785 3 : shards: &mut [TenantShard],
2786 3 : ) -> Vec<ScheduleOptimization> {
2787 3 : let mut loop_n = 0;
2788 3 : let mut optimizations = Vec::default();
2789 : loop {
2790 6 : let mut schedule_context = ScheduleContext::default();
2791 6 : let mut any_changed = false;
2792 :
2793 24 : for shard in shards.iter() {
2794 24 : schedule_context.avoid(&shard.intent.all_pageservers());
2795 24 : }
2796 :
2797 15 : for shard in shards.iter_mut() {
2798 15 : let optimization = shard.optimize_attachment(scheduler, &schedule_context);
2799 15 : tracing::info!(
2800 0 : "optimize_attachment({})={:?}",
2801 : shard.tenant_shard_id,
2802 : optimization
2803 : );
2804 15 : if let Some(optimization) = optimization {
2805 : // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
2806 3 : assert!(shard.maybe_optimizable(scheduler, &schedule_context));
2807 3 : optimizations.push(optimization.clone());
2808 3 : shard.apply_optimization(scheduler, optimization);
2809 3 : any_changed = true;
2810 3 : break;
2811 12 : }
2812 :
2813 12 : let optimization = shard.optimize_secondary(scheduler, &schedule_context);
2814 12 : tracing::info!(
2815 0 : "optimize_secondary({})={:?}",
2816 : shard.tenant_shard_id,
2817 : optimization
2818 : );
2819 12 : if let Some(optimization) = optimization {
2820 : // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
2821 0 : assert!(shard.maybe_optimizable(scheduler, &schedule_context));
2822 :
2823 0 : optimizations.push(optimization.clone());
2824 0 : shard.apply_optimization(scheduler, optimization);
2825 0 : any_changed = true;
2826 0 : break;
2827 12 : }
2828 : }
2829 :
2830 6 : if !any_changed {
2831 3 : break;
2832 3 : }
2833 :
2834 : // Assert no infinite loop
2835 3 : loop_n += 1;
2836 3 : assert!(loop_n < 1000);
2837 : }
2838 :
2839 3 : optimizations
2840 3 : }
2841 :
2842 : /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
2843 : /// that it converges.
2844 : #[test]
2845 1 : fn optimize_add_nodes() -> anyhow::Result<()> {
2846 1 : let nodes = make_test_nodes(
2847 : 9,
2848 1 : &[
2849 1 : // Initial 6 nodes
2850 1 : AvailabilityZone("az-a".to_string()),
2851 1 : AvailabilityZone("az-a".to_string()),
2852 1 : AvailabilityZone("az-b".to_string()),
2853 1 : AvailabilityZone("az-b".to_string()),
2854 1 : AvailabilityZone("az-c".to_string()),
2855 1 : AvailabilityZone("az-c".to_string()),
2856 1 : // Three we will add later
2857 1 : AvailabilityZone("az-a".to_string()),
2858 1 : AvailabilityZone("az-b".to_string()),
2859 1 : AvailabilityZone("az-c".to_string()),
2860 1 : ],
2861 : );
2862 :
2863 : // Only show the scheduler two nodes in each AZ to start with
2864 1 : let mut scheduler = Scheduler::new([].iter());
2865 7 : for i in 1..=6 {
2866 6 : scheduler.node_upsert(nodes.get(&NodeId(i)).unwrap());
2867 6 : }
2868 :
2869 1 : let mut shards = make_test_tenant(
2870 1 : PlacementPolicy::Attached(1),
2871 1 : ShardCount::new(4),
2872 1 : Some(AvailabilityZone("az-a".to_string())),
2873 : );
2874 1 : let mut schedule_context = ScheduleContext::default();
2875 5 : for shard in &mut shards {
2876 4 : assert!(
2877 4 : shard
2878 4 : .schedule(&mut scheduler, &mut schedule_context)
2879 4 : .is_ok()
2880 : );
2881 : }
2882 :
2883 : // Initial: attached locations land in the tenant's home AZ.
2884 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
2885 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
2886 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
2887 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
2888 :
2889 : // Initial: secondary locations in a remote AZ
2890 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
2891 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
2892 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
2893 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
2894 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
2895 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
2896 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
2897 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
2898 :
2899 : // Add another three nodes: we should see the shards spread out when their optimize
2900 : // methods are called
2901 1 : scheduler.node_upsert(nodes.get(&NodeId(7)).unwrap());
2902 1 : scheduler.node_upsert(nodes.get(&NodeId(8)).unwrap());
2903 1 : scheduler.node_upsert(nodes.get(&NodeId(9)).unwrap());
2904 1 : optimize_til_idle(&mut scheduler, &mut shards);
2905 :
2906 : // We expect one attached location was moved to the new node in the tenant's home AZ
2907 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(7)), 1);
2908 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(7)), 1);
2909 : // The original node has one less attached shard
2910 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
2911 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
2912 :
2913 : // One of the original nodes still has two attachments, since there are an odd number of nodes
2914 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
2915 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
2916 :
2917 : // None of our secondaries moved, since we already had enough nodes for those to be
2918 : // scheduled perfectly
2919 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
2920 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
2921 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
2922 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
2923 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
2924 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
2925 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
2926 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
2927 :
2928 4 : for shard in shards.iter_mut() {
2929 4 : shard.intent.clear(&mut scheduler);
2930 4 : }
2931 :
2932 1 : Ok(())
2933 1 : }
2934 :
2935 : /// Test that initial shard scheduling is optimal. By optimal we mean
2936 : /// that the optimizer cannot find a way to improve it.
2937 : ///
2938 : /// This test is an example of the scheduling issue described in
2939 : /// https://github.com/neondatabase/neon/issues/8969
2940 : #[test]
2941 1 : fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
2942 : use itertools::Itertools;
2943 :
2944 1 : let nodes = make_test_nodes(2, &[]);
2945 :
2946 1 : let mut scheduler = Scheduler::new([].iter());
2947 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
2948 1 : scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
2949 :
2950 1 : let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
2951 1 : let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
2952 :
2953 1 : let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
2954 1 : let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
2955 :
2956 4 : let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
2957 4 : let b_shards_with_context = b.iter_mut().map(|shard| (shard, b_context.clone()));
2958 :
2959 1 : let schedule_order = a_shards_with_context.interleave(b_shards_with_context);
2960 :
2961 9 : for (shard, context) in schedule_order {
2962 8 : let context = &mut *context.borrow_mut();
2963 8 : shard.schedule(&mut scheduler, context).unwrap();
2964 8 : }
2965 :
2966 1 : let applied_to_a = optimize_til_idle(&mut scheduler, &mut a);
2967 1 : assert_eq!(applied_to_a, vec![]);
2968 :
2969 1 : let applied_to_b = optimize_til_idle(&mut scheduler, &mut b);
2970 1 : assert_eq!(applied_to_b, vec![]);
2971 :
2972 8 : for shard in a.iter_mut().chain(b.iter_mut()) {
2973 8 : shard.intent.clear(&mut scheduler);
2974 8 : }
2975 :
2976 1 : Ok(())
2977 1 : }
2978 :
2979 : #[test]
2980 1 : fn random_az_shard_scheduling() -> anyhow::Result<()> {
2981 : use rand::seq::SliceRandom;
2982 :
2983 51 : for seed in 0..50 {
2984 50 : eprintln!("Running test with seed {seed}");
2985 50 : let mut rng = StdRng::seed_from_u64(seed);
2986 :
2987 50 : let az_a_tag = AvailabilityZone("az-a".to_string());
2988 50 : let az_b_tag = AvailabilityZone("az-b".to_string());
2989 50 : let azs = [az_a_tag, az_b_tag];
2990 50 : let nodes = make_test_nodes(4, &azs);
2991 50 : let mut shards_per_az: HashMap<AvailabilityZone, u32> = HashMap::new();
2992 :
2993 50 : let mut scheduler = Scheduler::new([].iter());
2994 200 : for node in nodes.values() {
2995 200 : scheduler.node_upsert(node);
2996 200 : }
2997 :
2998 50 : let mut shards = Vec::default();
2999 50 : let mut contexts = Vec::default();
3000 50 : let mut az_picker = azs.iter().cycle().cloned();
3001 5050 : for i in 0..100 {
3002 5000 : let az = az_picker.next().unwrap();
3003 5000 : let shard_count = i % 4 + 1;
3004 5000 : *shards_per_az.entry(az.clone()).or_default() += shard_count;
3005 :
3006 5000 : let tenant_shards = make_test_tenant(
3007 5000 : PlacementPolicy::Attached(1),
3008 5000 : ShardCount::new(shard_count.try_into().unwrap()),
3009 5000 : Some(az),
3010 : );
3011 5000 : let context = Rc::new(RefCell::new(ScheduleContext::default()));
3012 :
3013 5000 : contexts.push(context.clone());
3014 5000 : let with_ctx = tenant_shards
3015 5000 : .into_iter()
3016 12500 : .map(|shard| (shard, context.clone()));
3017 17500 : for shard_with_ctx in with_ctx {
3018 12500 : shards.push(shard_with_ctx);
3019 12500 : }
3020 : }
3021 :
3022 50 : shards.shuffle(&mut rng);
3023 :
3024 : #[derive(Default, Debug)]
3025 : struct NodeStats {
3026 : attachments: u32,
3027 : secondaries: u32,
3028 : }
3029 :
3030 50 : let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::default();
3031 50 : let mut attachments_in_wrong_az = 0;
3032 50 : let mut secondaries_in_wrong_az = 0;
3033 :
3034 12550 : for (shard, context) in &mut shards {
3035 12500 : let context = &mut *context.borrow_mut();
3036 12500 : shard.schedule(&mut scheduler, context).unwrap();
3037 :
3038 12500 : let attached_node = shard.intent.get_attached().unwrap();
3039 12500 : let stats = node_stats.entry(attached_node).or_default();
3040 12500 : stats.attachments += 1;
3041 :
3042 12500 : let secondary_node = *shard.intent.get_secondary().first().unwrap();
3043 12500 : let stats = node_stats.entry(secondary_node).or_default();
3044 12500 : stats.secondaries += 1;
3045 :
3046 12500 : let attached_node_az = nodes
3047 12500 : .get(&attached_node)
3048 12500 : .unwrap()
3049 12500 : .get_availability_zone_id();
3050 12500 : let secondary_node_az = nodes
3051 12500 : .get(&secondary_node)
3052 12500 : .unwrap()
3053 12500 : .get_availability_zone_id();
3054 12500 : let preferred_az = shard.preferred_az().unwrap();
3055 :
3056 12500 : if attached_node_az != preferred_az {
3057 0 : eprintln!(
3058 0 : "{} attachment was scheduled in AZ {} but preferred AZ {}",
3059 0 : shard.tenant_shard_id, attached_node_az, preferred_az
3060 0 : );
3061 0 : attachments_in_wrong_az += 1;
3062 12500 : }
3063 :
3064 12500 : if secondary_node_az == preferred_az {
3065 0 : eprintln!(
3066 0 : "{} secondary was scheduled in AZ {} which matches preference",
3067 0 : shard.tenant_shard_id, attached_node_az
3068 0 : );
3069 0 : secondaries_in_wrong_az += 1;
3070 12500 : }
3071 : }
3072 :
3073 50 : let mut violations = Vec::default();
3074 :
3075 50 : if attachments_in_wrong_az > 0 {
3076 0 : violations.push(format!(
3077 0 : "{attachments_in_wrong_az} attachments scheduled to the incorrect AZ"
3078 0 : ));
3079 50 : }
3080 :
3081 50 : if secondaries_in_wrong_az > 0 {
3082 0 : violations.push(format!(
3083 0 : "{secondaries_in_wrong_az} secondaries scheduled to the incorrect AZ"
3084 0 : ));
3085 50 : }
3086 :
3087 50 : eprintln!(
3088 50 : "attachments_in_wrong_az={attachments_in_wrong_az} secondaries_in_wrong_az={secondaries_in_wrong_az}"
3089 : );
3090 :
3091 250 : for (node_id, stats) in &node_stats {
3092 200 : let node_az = nodes.get(node_id).unwrap().get_availability_zone_id();
3093 200 : let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2;
3094 200 : let allowed_attachment_load =
3095 200 : (ideal_attachment_load - 1)..(ideal_attachment_load + 2);
3096 :
3097 200 : if !allowed_attachment_load.contains(&stats.attachments) {
3098 0 : violations.push(format!(
3099 0 : "Found {} attachments on node {}, but expected {}",
3100 0 : stats.attachments, node_id, ideal_attachment_load
3101 0 : ));
3102 200 : }
3103 :
3104 200 : eprintln!(
3105 200 : "{}: attachments={} secondaries={} ideal_attachment_load={}",
3106 : node_id, stats.attachments, stats.secondaries, ideal_attachment_load
3107 : );
3108 : }
3109 :
3110 50 : assert!(violations.is_empty(), "{violations:?}");
3111 :
3112 12550 : for (mut shard, _ctx) in shards {
3113 12500 : shard.intent.clear(&mut scheduler);
3114 12500 : }
3115 : }
3116 1 : Ok(())
3117 1 : }
3118 :
3119 : /// Check how the shard's scheduling behaves when in PlacementPolicy::Secondary mode.
3120 : #[test]
3121 1 : fn tenant_secondary_scheduling() -> anyhow::Result<()> {
3122 1 : let az_a = AvailabilityZone("az-a".to_string());
3123 1 : let nodes = make_test_nodes(
3124 : 3,
3125 1 : &[
3126 1 : az_a.clone(),
3127 1 : AvailabilityZone("az-b".to_string()),
3128 1 : AvailabilityZone("az-c".to_string()),
3129 1 : ],
3130 : );
3131 :
3132 1 : let mut scheduler = Scheduler::new(nodes.values());
3133 1 : let mut context = ScheduleContext::default();
3134 :
3135 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Secondary);
3136 1 : tenant_shard.intent.preferred_az_id = Some(az_a.clone());
3137 1 : tenant_shard
3138 1 : .schedule(&mut scheduler, &mut context)
3139 1 : .expect("we have enough nodes, scheduling should work");
3140 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
3141 1 : assert!(tenant_shard.intent.attached.is_none());
3142 :
3143 : // Should have scheduled into the preferred AZ
3144 1 : assert_eq!(
3145 1 : scheduler
3146 1 : .get_node_az(&tenant_shard.intent.secondary[0])
3147 1 : .as_ref(),
3148 1 : tenant_shard.preferred_az()
3149 : );
3150 :
3151 : // Optimizer should agree
3152 1 : assert_eq!(
3153 1 : tenant_shard.optimize_attachment(&mut scheduler, &context),
3154 : None
3155 : );
3156 1 : assert_eq!(
3157 1 : tenant_shard.optimize_secondary(&mut scheduler, &context),
3158 : None
3159 : );
3160 :
3161 : // Switch to PlacementPolicy::Attached
3162 1 : tenant_shard.policy = PlacementPolicy::Attached(1);
3163 1 : tenant_shard
3164 1 : .schedule(&mut scheduler, &mut context)
3165 1 : .expect("we have enough nodes, scheduling should work");
3166 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
3167 1 : assert!(tenant_shard.intent.attached.is_some());
3168 : // Secondary should now be in non-preferred AZ
3169 1 : assert_ne!(
3170 1 : scheduler
3171 1 : .get_node_az(&tenant_shard.intent.secondary[0])
3172 1 : .as_ref(),
3173 1 : tenant_shard.preferred_az()
3174 : );
3175 : // Attached should be in preferred AZ
3176 1 : assert_eq!(
3177 1 : scheduler
3178 1 : .get_node_az(&tenant_shard.intent.attached.unwrap())
3179 1 : .as_ref(),
3180 1 : tenant_shard.preferred_az()
3181 : );
3182 :
3183 : // Optimizer should agree
3184 1 : assert_eq!(
3185 1 : tenant_shard.optimize_attachment(&mut scheduler, &context),
3186 : None
3187 : );
3188 1 : assert_eq!(
3189 1 : tenant_shard.optimize_secondary(&mut scheduler, &context),
3190 : None
3191 : );
3192 :
3193 : // Switch back to PlacementPolicy::Secondary
3194 1 : tenant_shard.policy = PlacementPolicy::Secondary;
3195 1 : tenant_shard
3196 1 : .schedule(&mut scheduler, &mut context)
3197 1 : .expect("we have enough nodes, scheduling should work");
3198 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
3199 1 : assert!(tenant_shard.intent.attached.is_none());
3200 : // When we picked a location to keep, we should have kept the one in the preferred AZ
3201 1 : assert_eq!(
3202 1 : scheduler
3203 1 : .get_node_az(&tenant_shard.intent.secondary[0])
3204 1 : .as_ref(),
3205 1 : tenant_shard.preferred_az()
3206 : );
3207 :
3208 : // Optimizer should agree
3209 1 : assert_eq!(
3210 1 : tenant_shard.optimize_attachment(&mut scheduler, &context),
3211 : None
3212 : );
3213 1 : assert_eq!(
3214 1 : tenant_shard.optimize_secondary(&mut scheduler, &context),
3215 : None
3216 : );
3217 :
3218 1 : tenant_shard.intent.clear(&mut scheduler);
3219 :
3220 1 : Ok(())
3221 1 : }
3222 : }
|