Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::sync::Arc;
3 : use std::time::Duration;
4 :
5 : use futures::future::{self, Either};
6 : use itertools::Itertools;
7 : use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy, ShardSchedulingPolicy};
8 : use pageserver_api::models::{LocationConfig, LocationConfigMode, TenantConfig};
9 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
10 : use serde::{Deserialize, Serialize};
11 : use tokio::task::JoinHandle;
12 : use tokio_util::sync::CancellationToken;
13 : use tracing::{Instrument, instrument};
14 : use utils::generation::Generation;
15 : use utils::id::NodeId;
16 : use utils::seqwait::{SeqWait, SeqWaitError};
17 : use utils::shard::ShardCount;
18 : use utils::sync::gate::GateGuard;
19 :
20 : use crate::compute_hook::ComputeHook;
21 : use crate::metrics::{
22 : self, ReconcileCompleteLabelGroup, ReconcileLongRunningLabelGroup, ReconcileOutcome,
23 : };
24 : use crate::node::Node;
25 : use crate::persistence::split_state::SplitState;
26 : use crate::persistence::{Persistence, TenantShardPersistence};
27 : use crate::reconciler::{
28 : ReconcileError, ReconcileUnits, Reconciler, ReconcilerConfig, TargetState,
29 : attached_location_conf, secondary_location_conf,
30 : };
31 : use crate::scheduler::{
32 : AffinityScore, AttachedShardTag, NodeSchedulingScore, NodeSecondarySchedulingScore,
33 : RefCountUpdate, ScheduleContext, ScheduleError, Scheduler, SecondaryShardTag, ShardTag,
34 : };
35 : use crate::service::ReconcileResultRequest;
36 : use crate::{Sequence, service};
37 :
38 : /// Serialization helper
39 0 : fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
40 0 : where
41 0 : S: serde::ser::Serializer,
42 0 : T: std::fmt::Display,
43 0 : {
44 0 : serializer.collect_str(
45 0 : &v.lock()
46 0 : .unwrap()
47 0 : .as_ref()
48 0 : .map(|e| format!("{e}"))
49 0 : .unwrap_or("".to_string()),
50 0 : )
51 0 : }
52 :
53 : /// In-memory state for a particular tenant shard.
54 : ///
55 : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
56 : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
57 0 : #[derive(Serialize)]
58 : pub(crate) struct TenantShard {
59 : pub(crate) tenant_shard_id: TenantShardId,
60 :
61 : pub(crate) shard: ShardIdentity,
62 :
63 : // Runtime only: sequence used to coordinate when updating this object while
64 : // with background reconcilers may be running. A reconciler runs to a particular
65 : // sequence.
66 : pub(crate) sequence: Sequence,
67 :
68 : // Latest generation number: next time we attach, increment this
69 : // and use the incremented number when attaching.
70 : //
71 : // None represents an incompletely onboarded tenant via the [`Service::location_config`]
72 : // API, where this tenant may only run in PlacementPolicy::Secondary.
73 : pub(crate) generation: Option<Generation>,
74 :
75 : // High level description of how the tenant should be set up. Provided
76 : // externally.
77 : pub(crate) policy: PlacementPolicy,
78 :
79 : // Low level description of exactly which pageservers should fulfil
80 : // which role. Generated by `Self::schedule`.
81 : pub(crate) intent: IntentState,
82 :
83 : // Low level description of how the tenant is configured on pageservers:
84 : // if this does not match `Self::intent` then the tenant needs reconciliation
85 : // with `Self::reconcile`.
86 : pub(crate) observed: ObservedState,
87 :
88 : // Tenant configuration, passed through opaquely to the pageserver. Identical
89 : // for all shards in a tenant.
90 : pub(crate) config: TenantConfig,
91 :
92 : /// If a reconcile task is currently in flight, it may be joined here (it is
93 : /// only safe to join if either the result has been received or the reconciler's
94 : /// cancellation token has been fired)
95 : #[serde(skip)]
96 : pub(crate) reconciler: Option<ReconcilerHandle>,
97 :
98 : /// If a tenant is being split, then all shards with that TenantId will have a
99 : /// SplitState set, this acts as a guard against other operations such as background
100 : /// reconciliation, and timeline creation.
101 : pub(crate) splitting: SplitState,
102 :
103 : /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
104 : /// is set. This flag is cleared when the tenant is popped off the delay queue.
105 : pub(crate) delayed_reconcile: bool,
106 :
107 : /// Optionally wait for reconciliation to complete up to a particular
108 : /// sequence number.
109 : #[serde(skip)]
110 : pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
111 :
112 : /// Indicates sequence number for which we have encountered an error reconciling. If
113 : /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
114 : /// and callers should stop waiting for `waiter` and propagate the error.
115 : #[serde(skip)]
116 : pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
117 :
118 : /// The most recent error from a reconcile on this tenant. This is a nested Arc
119 : /// because:
120 : /// - ReconcileWaiters need to Arc-clone the overall object to read it later
121 : /// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
122 : /// many waiters for one shard, and the underlying error types are not Clone.
123 : ///
124 : /// TODO: generalize to an array of recent events
125 : /// TOOD: use a ArcSwap instead of mutex for faster reads?
126 : #[serde(serialize_with = "read_last_error")]
127 : pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
128 :
129 : /// If we have a pending compute notification that for some reason we weren't able to send,
130 : /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
131 : /// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
132 : /// of state that we publish externally in an eventually consistent way.
133 : pub(crate) pending_compute_notification: bool,
134 :
135 : /// To do a graceful migration, set this field to the destination pageserver, and optimization
136 : /// functions will consider this node the best location and react appropriately.
137 : preferred_node: Option<NodeId>,
138 :
139 : // Support/debug tool: if something is going wrong or flapping with scheduling, this may
140 : // be set to a non-active state to avoid making changes while the issue is fixed.
141 : scheduling_policy: ShardSchedulingPolicy,
142 : }
143 :
144 : #[derive(Clone, Debug, Serialize)]
145 : pub(crate) struct IntentState {
146 : attached: Option<NodeId>,
147 : secondary: Vec<NodeId>,
148 :
149 : // We should attempt to schedule this shard in the provided AZ to
150 : // decrease chances of cross-AZ compute.
151 : preferred_az_id: Option<AvailabilityZone>,
152 : }
153 :
154 : impl IntentState {
155 12860 : pub(crate) fn new(preferred_az_id: Option<AvailabilityZone>) -> Self {
156 12860 : Self {
157 12860 : attached: None,
158 12860 : secondary: vec![],
159 12860 : preferred_az_id,
160 12860 : }
161 12860 : }
162 0 : pub(crate) fn single(
163 0 : scheduler: &mut Scheduler,
164 0 : node_id: Option<NodeId>,
165 0 : preferred_az_id: Option<AvailabilityZone>,
166 0 : ) -> Self {
167 0 : if let Some(node_id) = node_id {
168 0 : scheduler.update_node_ref_counts(
169 0 : node_id,
170 0 : preferred_az_id.as_ref(),
171 0 : RefCountUpdate::Attach,
172 0 : );
173 0 : }
174 0 : Self {
175 0 : attached: node_id,
176 0 : secondary: vec![],
177 0 : preferred_az_id,
178 0 : }
179 0 : }
180 :
181 12859 : pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
182 12859 : if self.attached != new_attached {
183 12859 : if let Some(old_attached) = self.attached.take() {
184 0 : scheduler.update_node_ref_counts(
185 0 : old_attached,
186 0 : self.preferred_az_id.as_ref(),
187 0 : RefCountUpdate::Detach,
188 0 : );
189 12859 : }
190 12859 : if let Some(new_attached) = &new_attached {
191 12859 : scheduler.update_node_ref_counts(
192 12859 : *new_attached,
193 12859 : self.preferred_az_id.as_ref(),
194 12859 : RefCountUpdate::Attach,
195 12859 : );
196 12859 : }
197 12859 : self.attached = new_attached;
198 0 : }
199 :
200 12859 : if let Some(new_attached) = &new_attached {
201 12859 : assert!(!self.secondary.contains(new_attached));
202 0 : }
203 12859 : }
204 :
205 : /// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
206 : /// secondary to attached while maintaining the scheduler's reference counts.
207 7 : pub(crate) fn promote_attached(
208 7 : &mut self,
209 7 : scheduler: &mut Scheduler,
210 7 : promote_secondary: NodeId,
211 7 : ) {
212 7 : // If we call this with a node that isn't in secondary, it would cause incorrect
213 7 : // scheduler reference counting, since we assume the node is already referenced as a secondary.
214 7 : debug_assert!(self.secondary.contains(&promote_secondary));
215 :
216 16 : self.secondary.retain(|n| n != &promote_secondary);
217 7 :
218 7 : let demoted = self.attached;
219 7 : self.attached = Some(promote_secondary);
220 7 :
221 7 : scheduler.update_node_ref_counts(
222 7 : promote_secondary,
223 7 : self.preferred_az_id.as_ref(),
224 7 : RefCountUpdate::PromoteSecondary,
225 7 : );
226 7 : if let Some(demoted) = demoted {
227 0 : scheduler.update_node_ref_counts(
228 0 : demoted,
229 0 : self.preferred_az_id.as_ref(),
230 0 : RefCountUpdate::DemoteAttached,
231 0 : );
232 7 : }
233 7 : }
234 :
235 12848 : pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
236 12848 : assert!(!self.secondary.contains(&new_secondary));
237 12848 : assert!(self.attached != Some(new_secondary));
238 12848 : scheduler.update_node_ref_counts(
239 12848 : new_secondary,
240 12848 : self.preferred_az_id.as_ref(),
241 12848 : RefCountUpdate::AddSecondary,
242 12848 : );
243 12848 : self.secondary.push(new_secondary);
244 12848 : }
245 :
246 : /// It is legal to call this with a node that is not currently a secondary: that is a no-op
247 9 : pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
248 13 : let index = self.secondary.iter().position(|n| *n == node_id);
249 9 : if let Some(index) = index {
250 9 : scheduler.update_node_ref_counts(
251 9 : node_id,
252 9 : self.preferred_az_id.as_ref(),
253 9 : RefCountUpdate::RemoveSecondary,
254 9 : );
255 9 : self.secondary.remove(index);
256 9 : }
257 9 : }
258 :
259 12859 : pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
260 12859 : for secondary in self.secondary.drain(..) {
261 12840 : scheduler.update_node_ref_counts(
262 12840 : secondary,
263 12840 : self.preferred_az_id.as_ref(),
264 12840 : RefCountUpdate::RemoveSecondary,
265 12840 : );
266 12840 : }
267 12859 : }
268 :
269 : /// Remove the last secondary node from the list of secondaries
270 0 : pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
271 0 : if let Some(node_id) = self.secondary.pop() {
272 0 : scheduler.update_node_ref_counts(
273 0 : node_id,
274 0 : self.preferred_az_id.as_ref(),
275 0 : RefCountUpdate::RemoveSecondary,
276 0 : );
277 0 : }
278 0 : }
279 :
280 12859 : pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
281 12859 : if let Some(old_attached) = self.attached.take() {
282 12857 : scheduler.update_node_ref_counts(
283 12857 : old_attached,
284 12857 : self.preferred_az_id.as_ref(),
285 12857 : RefCountUpdate::Detach,
286 12857 : );
287 12857 : }
288 :
289 12859 : self.clear_secondary(scheduler);
290 12859 : }
291 :
292 12901 : pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
293 12901 : let mut result = Vec::new();
294 12901 : if let Some(p) = self.attached {
295 12896 : result.push(p)
296 5 : }
297 :
298 12901 : result.extend(self.secondary.iter().copied());
299 12901 :
300 12901 : result
301 12901 : }
302 :
303 12624 : pub(crate) fn get_attached(&self) -> &Option<NodeId> {
304 12624 : &self.attached
305 12624 : }
306 :
307 12706 : pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
308 12706 : &self.secondary
309 12706 : }
310 :
311 : /// If the node is in use as the attached location, demote it into
312 : /// the list of secondary locations. This is used when a node goes offline,
313 : /// and we want to use a different node for attachment, but not permanently
314 : /// forget the location on the offline node.
315 : ///
316 : /// Returns true if a change was made
317 8 : pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
318 8 : if self.attached == Some(node_id) {
319 8 : self.attached = None;
320 8 : self.secondary.push(node_id);
321 8 : scheduler.update_node_ref_counts(
322 8 : node_id,
323 8 : self.preferred_az_id.as_ref(),
324 8 : RefCountUpdate::DemoteAttached,
325 8 : );
326 8 : true
327 : } else {
328 0 : false
329 : }
330 8 : }
331 :
332 2 : pub(crate) fn set_preferred_az(
333 2 : &mut self,
334 2 : scheduler: &mut Scheduler,
335 2 : preferred_az: Option<AvailabilityZone>,
336 2 : ) {
337 2 : let new_az = preferred_az.as_ref();
338 2 : let old_az = self.preferred_az_id.as_ref();
339 2 :
340 2 : if old_az != new_az {
341 2 : if let Some(node_id) = self.attached {
342 2 : scheduler.update_node_ref_counts(
343 2 : node_id,
344 2 : new_az,
345 2 : RefCountUpdate::ChangePreferredAzFrom(old_az),
346 2 : );
347 2 : }
348 4 : for node_id in &self.secondary {
349 2 : scheduler.update_node_ref_counts(
350 2 : *node_id,
351 2 : new_az,
352 2 : RefCountUpdate::ChangePreferredAzFrom(old_az),
353 2 : );
354 2 : }
355 2 : self.preferred_az_id = preferred_az;
356 0 : }
357 2 : }
358 :
359 12508 : pub(crate) fn get_preferred_az(&self) -> Option<&AvailabilityZone> {
360 12508 : self.preferred_az_id.as_ref()
361 12508 : }
362 : }
363 :
364 : impl Drop for IntentState {
365 12860 : fn drop(&mut self) {
366 12860 : // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
367 12860 : // We do not check this while panicking, to avoid polluting unit test failures or
368 12860 : // other assertions with this assertion's output. It's still wrong to leak these,
369 12860 : // but if we already have a panic then we don't need to independently flag this case.
370 12860 : if !(std::thread::panicking()) {
371 12860 : debug_assert!(self.attached.is_none() && self.secondary.is_empty());
372 0 : }
373 12859 : }
374 : }
375 :
376 0 : #[derive(Default, Clone, Serialize, Deserialize, Debug)]
377 : pub(crate) struct ObservedState {
378 : pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
379 : }
380 :
381 : /// Our latest knowledge of how this tenant is configured in the outside world.
382 : ///
383 : /// Meaning:
384 : /// * No instance of this type exists for a node: we are certain that we have nothing configured on that
385 : /// node for this shard.
386 : /// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
387 : /// what it is (e.g. we failed partway through configuring it)
388 : /// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
389 : /// and that configuration will still be present unless something external interfered.
390 0 : #[derive(Clone, Serialize, Deserialize, Debug)]
391 : pub(crate) struct ObservedStateLocation {
392 : /// If None, it means we do not know the status of this shard's location on this node, but
393 : /// we know that we might have some state on this node.
394 : pub(crate) conf: Option<LocationConfig>,
395 : }
396 :
397 : pub(crate) struct ReconcilerWaiter {
398 : // For observability purposes, remember the ID of the shard we're
399 : // waiting for.
400 : pub(crate) tenant_shard_id: TenantShardId,
401 :
402 : seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
403 : error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
404 : error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
405 : seq: Sequence,
406 : }
407 :
408 : pub(crate) enum ReconcilerStatus {
409 : Done,
410 : Failed,
411 : InProgress,
412 : }
413 :
414 : #[derive(thiserror::Error, Debug)]
415 : pub(crate) enum ReconcileWaitError {
416 : #[error("Timeout waiting for shard {0}")]
417 : Timeout(TenantShardId),
418 : #[error("shutting down")]
419 : Shutdown,
420 : #[error("Reconcile error on shard {0}: {1}")]
421 : Failed(TenantShardId, Arc<ReconcileError>),
422 : }
423 :
424 : #[derive(Eq, PartialEq, Debug, Clone)]
425 : pub(crate) struct ReplaceSecondary {
426 : old_node_id: NodeId,
427 : new_node_id: NodeId,
428 : }
429 :
430 : #[derive(Eq, PartialEq, Debug, Clone)]
431 : pub(crate) struct MigrateAttachment {
432 : pub(crate) old_attached_node_id: NodeId,
433 : pub(crate) new_attached_node_id: NodeId,
434 : }
435 :
436 : #[derive(Eq, PartialEq, Debug, Clone)]
437 : pub(crate) enum ScheduleOptimizationAction {
438 : // Replace one of our secondary locations with a different node
439 : ReplaceSecondary(ReplaceSecondary),
440 : // Migrate attachment to an existing secondary location
441 : MigrateAttachment(MigrateAttachment),
442 : // Create a secondary location, with the intent of later migrating to it
443 : CreateSecondary(NodeId),
444 : // Remove a secondary location that we previously created to facilitate a migration
445 : RemoveSecondary(NodeId),
446 : }
447 :
448 : #[derive(Eq, PartialEq, Debug, Clone)]
449 : pub(crate) struct ScheduleOptimization {
450 : // What was the reconcile sequence when we generated this optimization? The optimization
451 : // should only be applied if the shard's sequence is still at this value, in case other changes
452 : // happened between planning the optimization and applying it.
453 : sequence: Sequence,
454 :
455 : pub(crate) action: ScheduleOptimizationAction,
456 : }
457 :
458 : impl ReconcilerWaiter {
459 0 : pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
460 0 : tokio::select! {
461 0 : result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
462 0 : result.map_err(|e| match e {
463 0 : SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
464 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
465 0 : })?;
466 : },
467 0 : result = self.error_seq_wait.wait_for(self.seq) => {
468 0 : result.map_err(|e| match e {
469 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
470 0 : SeqWaitError::Timeout => unreachable!()
471 0 : })?;
472 :
473 0 : return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
474 0 : self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
475 : }
476 : }
477 :
478 0 : Ok(())
479 0 : }
480 :
481 0 : pub(crate) fn get_status(&self) -> ReconcilerStatus {
482 0 : if self.seq_wait.would_wait_for(self.seq).is_ok() {
483 0 : ReconcilerStatus::Done
484 0 : } else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
485 0 : ReconcilerStatus::Failed
486 : } else {
487 0 : ReconcilerStatus::InProgress
488 : }
489 0 : }
490 : }
491 :
492 : /// Having spawned a reconciler task, the tenant shard's state will carry enough
493 : /// information to optionally cancel & await it later.
494 : pub(crate) struct ReconcilerHandle {
495 : sequence: Sequence,
496 : handle: JoinHandle<()>,
497 : cancel: CancellationToken,
498 : }
499 :
500 : pub(crate) enum ReconcileNeeded {
501 : /// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
502 : /// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
503 : No,
504 : /// shard has a reconciler running, and its intent hasn't changed since that one was
505 : /// spawned: wait for the existing reconciler rather than spawning a new one.
506 : WaitExisting(ReconcilerWaiter),
507 : /// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
508 : Yes(ReconcileReason),
509 : }
510 :
511 : #[derive(Debug)]
512 : pub(crate) enum ReconcileReason {
513 : ActiveNodesDirty,
514 : UnknownLocation,
515 : PendingComputeNotification,
516 : }
517 :
518 : /// Pending modification to the observed state of a tenant shard.
519 : /// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
520 : pub(crate) enum ObservedStateDelta {
521 : Upsert(Box<(NodeId, ObservedStateLocation)>),
522 : Delete(NodeId),
523 : }
524 :
525 : impl ObservedStateDelta {
526 0 : pub(crate) fn node_id(&self) -> &NodeId {
527 0 : match self {
528 0 : Self::Upsert(up) => &up.0,
529 0 : Self::Delete(nid) => nid,
530 : }
531 0 : }
532 : }
533 :
534 : /// When a reconcile task completes, it sends this result object
535 : /// to be applied to the primary TenantShard.
536 : pub(crate) struct ReconcileResult {
537 : pub(crate) sequence: Sequence,
538 : /// On errors, `observed` should be treated as an incompleted description
539 : /// of state (i.e. any nodes present in the result should override nodes
540 : /// present in the parent tenant state, but any unmentioned nodes should
541 : /// not be removed from parent tenant state)
542 : pub(crate) result: Result<(), ReconcileError>,
543 :
544 : pub(crate) tenant_shard_id: TenantShardId,
545 : pub(crate) generation: Option<Generation>,
546 : pub(crate) observed_deltas: Vec<ObservedStateDelta>,
547 :
548 : /// Set [`TenantShard::pending_compute_notification`] from this flag
549 : pub(crate) pending_compute_notification: bool,
550 : }
551 :
552 : impl ObservedState {
553 0 : pub(crate) fn new() -> Self {
554 0 : Self {
555 0 : locations: HashMap::new(),
556 0 : }
557 0 : }
558 :
559 0 : pub(crate) fn is_empty(&self) -> bool {
560 0 : self.locations.is_empty()
561 0 : }
562 : }
563 :
564 : impl TenantShard {
565 12843 : pub(crate) fn new(
566 12843 : tenant_shard_id: TenantShardId,
567 12843 : shard: ShardIdentity,
568 12843 : policy: PlacementPolicy,
569 12843 : preferred_az_id: Option<AvailabilityZone>,
570 12843 : ) -> Self {
571 12843 : metrics::METRICS_REGISTRY
572 12843 : .metrics_group
573 12843 : .storage_controller_tenant_shards
574 12843 : .inc();
575 12843 :
576 12843 : Self {
577 12843 : tenant_shard_id,
578 12843 : policy,
579 12843 : intent: IntentState::new(preferred_az_id),
580 12843 : generation: Some(Generation::new(0)),
581 12843 : shard,
582 12843 : observed: ObservedState::default(),
583 12843 : config: TenantConfig::default(),
584 12843 : reconciler: None,
585 12843 : splitting: SplitState::Idle,
586 12843 : sequence: Sequence(1),
587 12843 : delayed_reconcile: false,
588 12843 : waiter: Arc::new(SeqWait::new(Sequence(0))),
589 12843 : error_waiter: Arc::new(SeqWait::new(Sequence(0))),
590 12843 : last_error: Arc::default(),
591 12843 : pending_compute_notification: false,
592 12843 : scheduling_policy: ShardSchedulingPolicy::default(),
593 12843 : preferred_node: None,
594 12843 : }
595 12843 : }
596 :
597 : /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
598 : /// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
599 : /// to get an intent state that complies with placement policy. The overall goal is to do scheduling
600 : /// in a way that makes use of any configured locations that already exist in the outside world.
601 1 : pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
602 1 : // Choose an attached location by filtering observed locations, and then sorting to get the highest
603 1 : // generation
604 1 : let mut attached_locs = self
605 1 : .observed
606 1 : .locations
607 1 : .iter()
608 2 : .filter_map(|(node_id, l)| {
609 2 : if let Some(conf) = &l.conf {
610 2 : if conf.mode == LocationConfigMode::AttachedMulti
611 1 : || conf.mode == LocationConfigMode::AttachedSingle
612 1 : || conf.mode == LocationConfigMode::AttachedStale
613 : {
614 2 : Some((node_id, conf.generation))
615 : } else {
616 0 : None
617 : }
618 : } else {
619 0 : None
620 : }
621 2 : })
622 1 : .collect::<Vec<_>>();
623 1 :
624 2 : attached_locs.sort_by_key(|i| i.1);
625 1 : if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
626 1 : self.intent.set_attached(scheduler, Some(*node_id));
627 1 : }
628 :
629 : // All remaining observed locations generate secondary intents. This includes None
630 : // observations, as these may well have some local content on disk that is usable (this
631 : // is an edge case that might occur if we restarted during a migration or other change)
632 : //
633 : // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
634 : // will take care of promoting one of these secondaries to be attached.
635 2 : self.observed.locations.keys().for_each(|node_id| {
636 2 : if Some(*node_id) != self.intent.attached {
637 1 : self.intent.push_secondary(scheduler, *node_id);
638 1 : }
639 2 : });
640 1 : }
641 :
642 : /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
643 : /// attached pageserver for a shard.
644 : ///
645 : /// Returns whether we modified it, and the NodeId selected.
646 12832 : fn schedule_attached(
647 12832 : &mut self,
648 12832 : scheduler: &mut Scheduler,
649 12832 : context: &ScheduleContext,
650 12832 : ) -> Result<(bool, NodeId), ScheduleError> {
651 : // No work to do if we already have an attached tenant
652 12832 : if let Some(node_id) = self.intent.attached {
653 0 : return Ok((false, node_id));
654 12832 : }
655 :
656 12832 : if let Some(promote_secondary) = self.preferred_secondary(scheduler) {
657 : // Promote a secondary
658 2 : tracing::debug!("Promoted secondary {} to attached", promote_secondary);
659 2 : self.intent.promote_attached(scheduler, promote_secondary);
660 2 : Ok((true, promote_secondary))
661 : } else {
662 : // Pick a fresh node: either we had no secondaries or none were schedulable
663 12830 : let node_id = scheduler.schedule_shard::<AttachedShardTag>(
664 12830 : &self.intent.secondary,
665 12830 : &self.intent.preferred_az_id,
666 12830 : context,
667 12830 : )?;
668 12830 : tracing::debug!("Selected {} as attached", node_id);
669 12830 : self.intent.set_attached(scheduler, Some(node_id));
670 12830 : Ok((true, node_id))
671 : }
672 12832 : }
673 :
674 : #[instrument(skip_all, fields(
675 : tenant_id=%self.tenant_shard_id.tenant_id,
676 : shard_id=%self.tenant_shard_id.shard_slug(),
677 : sequence=%self.sequence
678 : ))]
679 : pub(crate) fn schedule(
680 : &mut self,
681 : scheduler: &mut Scheduler,
682 : context: &mut ScheduleContext,
683 : ) -> Result<(), ScheduleError> {
684 : let r = self.do_schedule(scheduler, context);
685 :
686 : context.avoid(&self.intent.all_pageservers());
687 :
688 : r
689 : }
690 :
691 12836 : pub(crate) fn do_schedule(
692 12836 : &mut self,
693 12836 : scheduler: &mut Scheduler,
694 12836 : context: &ScheduleContext,
695 12836 : ) -> Result<(), ScheduleError> {
696 12836 : // TODO: before scheduling new nodes, check if any existing content in
697 12836 : // self.intent refers to pageservers that are offline, and pick other
698 12836 : // pageservers if so.
699 12836 :
700 12836 : // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
701 12836 : // change their attach location.
702 12836 :
703 12836 : match self.scheduling_policy {
704 12835 : ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
705 : ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
706 : // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
707 1 : tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
708 0 : "Scheduling is disabled by policy {:?}", self.scheduling_policy);
709 1 : return Ok(());
710 : }
711 : }
712 :
713 : // Build the set of pageservers already in use by this tenant, to avoid scheduling
714 : // more work on the same pageservers we're already using.
715 12835 : let mut modified = false;
716 :
717 : // Add/remove nodes to fulfil policy
718 : use PlacementPolicy::*;
719 12835 : match self.policy {
720 12832 : Attached(secondary_count) => {
721 : // Should have exactly one attached, and at least N secondaries
722 12832 : let (modified_attached, attached_node_id) =
723 12832 : self.schedule_attached(scheduler, context)?;
724 12832 : modified |= modified_attached;
725 12832 :
726 12832 : let mut used_pageservers = vec![attached_node_id];
727 25663 : while self.intent.secondary.len() < secondary_count {
728 12831 : let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
729 12831 : &used_pageservers,
730 12831 : &self.intent.preferred_az_id,
731 12831 : context,
732 12831 : )?;
733 12831 : self.intent.push_secondary(scheduler, node_id);
734 12831 : used_pageservers.push(node_id);
735 12831 : modified = true;
736 : }
737 : }
738 : Secondary => {
739 3 : if let Some(node_id) = self.intent.get_attached() {
740 2 : // Populate secondary by demoting the attached node
741 2 : self.intent.demote_attached(scheduler, *node_id);
742 2 :
743 2 : modified = true;
744 2 : } else if self.intent.secondary.is_empty() {
745 : // Populate secondary by scheduling a fresh node
746 : //
747 : // We use [`AttachedShardTag`] because when a secondary location is the only one
748 : // a shard has, we expect that its next use will be as an attached location: we want
749 : // the tenant to be ready to warm up and run fast in their preferred AZ.
750 1 : let node_id = scheduler.schedule_shard::<AttachedShardTag>(
751 1 : &[],
752 1 : &self.intent.preferred_az_id,
753 1 : context,
754 1 : )?;
755 1 : self.intent.push_secondary(scheduler, node_id);
756 1 : modified = true;
757 0 : }
758 4 : while self.intent.secondary.len() > 1 {
759 1 : // If we have multiple secondaries (e.g. when transitioning from Attached to Secondary and
760 1 : // having just demoted our attached location), then we should prefer to keep the location
761 1 : // in our preferred AZ. Tenants in Secondary mode want to be in the preferred AZ so that
762 1 : // they have a warm location to become attached when transitioning back into Attached.
763 1 :
764 1 : let mut candidates = self.intent.get_secondary().clone();
765 1 : // Sort to get secondaries outside preferred AZ last
766 1 : candidates
767 2 : .sort_by_key(|n| scheduler.get_node_az(n).as_ref() != self.preferred_az());
768 1 : let secondary_to_remove = candidates.pop().unwrap();
769 1 : self.intent.remove_secondary(scheduler, secondary_to_remove);
770 1 : modified = true;
771 1 : }
772 : }
773 : Detached => {
774 : // Never add locations in this mode
775 0 : if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
776 0 : self.intent.clear(scheduler);
777 0 : modified = true;
778 0 : }
779 : }
780 : }
781 :
782 12835 : if modified {
783 12835 : self.sequence.0 += 1;
784 12835 : }
785 :
786 12835 : Ok(())
787 12836 : }
788 :
789 : /// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
790 : /// if the swap is not possible and leaves the intent state in its original state.
791 : ///
792 : /// Arguments:
793 : /// `attached_to`: the currently attached location matching the intent state (may be None if the
794 : /// shard is not attached)
795 : /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
796 : /// the scheduler to recommend a node
797 0 : pub(crate) fn reschedule_to_secondary(
798 0 : &mut self,
799 0 : promote_to: Option<NodeId>,
800 0 : scheduler: &mut Scheduler,
801 0 : ) -> Result<(), ScheduleError> {
802 0 : let promote_to = match promote_to {
803 0 : Some(node) => node,
804 0 : None => match self.preferred_secondary(scheduler) {
805 0 : Some(node) => node,
806 : None => {
807 0 : return Err(ScheduleError::ImpossibleConstraint);
808 : }
809 : },
810 : };
811 :
812 0 : assert!(self.intent.get_secondary().contains(&promote_to));
813 :
814 0 : if let Some(node) = self.intent.get_attached() {
815 0 : let demoted = self.intent.demote_attached(scheduler, *node);
816 0 : if !demoted {
817 0 : return Err(ScheduleError::ImpossibleConstraint);
818 0 : }
819 0 : }
820 :
821 0 : self.intent.promote_attached(scheduler, promote_to);
822 0 :
823 0 : // Increment the sequence number for the edge case where a
824 0 : // reconciler is already running to avoid waiting on the
825 0 : // current reconcile instead of spawning a new one.
826 0 : self.sequence = self.sequence.next();
827 0 :
828 0 : Ok(())
829 0 : }
830 :
831 : /// Returns None if the current location's score is unavailable, i.e. cannot draw a conclusion
832 68 : fn is_better_location<T: ShardTag>(
833 68 : &self,
834 68 : scheduler: &mut Scheduler,
835 68 : schedule_context: &ScheduleContext,
836 68 : current: NodeId,
837 68 : candidate: NodeId,
838 68 : ) -> Option<bool> {
839 68 : let Some(candidate_score) = scheduler.compute_node_score::<T::Score>(
840 68 : candidate,
841 68 : &self.intent.preferred_az_id,
842 68 : schedule_context,
843 68 : ) else {
844 : // The candidate node is unavailable for scheduling or otherwise couldn't get a score
845 1 : return None;
846 : };
847 :
848 : // If the candidate is our preferred node, then it is better than the current location, as long
849 : // as it is online -- the online check is part of the score calculation we did above, so it's
850 : // important that this check comes after that one.
851 67 : if let Some(preferred) = self.preferred_node.as_ref() {
852 3 : if preferred == &candidate {
853 1 : return Some(true);
854 2 : }
855 64 : }
856 :
857 66 : match scheduler.compute_node_score::<T::Score>(
858 66 : current,
859 66 : &self.intent.preferred_az_id,
860 66 : schedule_context,
861 66 : ) {
862 66 : Some(current_score) => {
863 66 : // Ignore utilization components when comparing scores: we don't want to migrate
864 66 : // because of transient load variations, it risks making the system thrash, and
865 66 : // migrating for utilization requires a separate high level view of the system to
866 66 : // e.g. prioritize moving larger or smaller tenants, rather than arbitrarily
867 66 : // moving things around in the order that we hit this function.
868 66 : let candidate_score = candidate_score.for_optimization();
869 66 : let current_score = current_score.for_optimization();
870 66 :
871 66 : if candidate_score < current_score {
872 8 : tracing::info!(
873 0 : "Found a lower scoring location! {candidate} is better than {current} ({candidate_score:?} is better than {current_score:?})"
874 : );
875 8 : Some(true)
876 : } else {
877 : // The candidate node is no better than our current location, so don't migrate
878 58 : tracing::debug!(
879 0 : "Candidate node {candidate} is no better than our current location {current} (candidate {candidate_score:?} vs current {current_score:?})",
880 : );
881 58 : Some(false)
882 : }
883 : }
884 : None => {
885 : // The current node is unavailable for scheduling, so we can't make any sensible
886 : // decisions about optimisation. This should be a transient state -- if the node
887 : // is offline then it will get evacuated, if is blocked by a scheduling mode
888 : // then we will respect that mode by doing nothing.
889 0 : tracing::debug!("Current node {current} is unavailable for scheduling");
890 0 : None
891 : }
892 : }
893 68 : }
894 :
895 48 : pub(crate) fn find_better_location<T: ShardTag>(
896 48 : &self,
897 48 : scheduler: &mut Scheduler,
898 48 : schedule_context: &ScheduleContext,
899 48 : current: NodeId,
900 48 : hard_exclude: &[NodeId],
901 48 : ) -> Option<NodeId> {
902 : // If we have a migration hint, then that is our better location
903 48 : if let Some(hint) = self.preferred_node.as_ref() {
904 1 : if hint == ¤t {
905 0 : return None;
906 1 : }
907 1 :
908 1 : return Some(*hint);
909 47 : }
910 :
911 : // Look for a lower-scoring location to attach to
912 47 : let Ok(candidate_node) = scheduler.schedule_shard::<T>(
913 47 : hard_exclude,
914 47 : &self.intent.preferred_az_id,
915 47 : schedule_context,
916 47 : ) else {
917 : // A scheduling error means we have no possible candidate replacements
918 0 : tracing::debug!("No candidate node found");
919 0 : return None;
920 : };
921 :
922 47 : if candidate_node == current {
923 : // We're already at the best possible location, so don't migrate
924 24 : tracing::debug!("Candidate node {candidate_node} is already in use");
925 24 : return None;
926 23 : }
927 23 :
928 23 : self.is_better_location::<T>(scheduler, schedule_context, current, candidate_node)
929 23 : .and_then(|better| if better { Some(candidate_node) } else { None })
930 48 : }
931 :
932 : /// This function is an optimization, used to avoid doing large numbers of scheduling operations
933 : /// when looking for optimizations. This function uses knowledge of how scores work to do some
934 : /// fast checks for whether it may to be possible to improve a score.
935 : ///
936 : /// If we return true, it only means that optimization _might_ be possible, not that it necessarily is. If we
937 : /// return no, it definitely means that calling [`Self::optimize_attachment`] or [`Self::optimize_secondary`] would do no
938 : /// work.
939 3 : pub(crate) fn maybe_optimizable(
940 3 : &self,
941 3 : scheduler: &mut Scheduler,
942 3 : schedule_context: &ScheduleContext,
943 3 : ) -> bool {
944 : // Tenant with preferred node: check if it is not already at the preferred node
945 3 : if let Some(preferred) = self.preferred_node.as_ref() {
946 0 : if Some(preferred) != self.intent.get_attached().as_ref() {
947 0 : return true;
948 0 : }
949 3 : }
950 :
951 : // Sharded tenant: check if any locations have a nonzero affinity score
952 3 : if self.shard.count >= ShardCount(1) {
953 3 : let schedule_context = schedule_context.project_detach(self);
954 5 : for node in self.intent.all_pageservers() {
955 5 : if let Some(af) = schedule_context.nodes.get(&node) {
956 5 : if *af > AffinityScore(0) {
957 3 : return true;
958 2 : }
959 0 : }
960 : }
961 0 : }
962 :
963 : // Attached tenant: check if the attachment is outside the preferred AZ
964 0 : if let PlacementPolicy::Attached(_) = self.policy {
965 0 : if let Some(attached) = self.intent.get_attached() {
966 0 : if scheduler.get_node_az(attached) != self.intent.preferred_az_id {
967 0 : return true;
968 0 : }
969 0 : }
970 0 : }
971 :
972 : // Tenant with secondary locations: check if any are within the preferred AZ
973 0 : for secondary in self.intent.get_secondary() {
974 0 : if scheduler.get_node_az(secondary) == self.intent.preferred_az_id {
975 0 : return true;
976 0 : }
977 : }
978 :
979 : // Does the tenant have excess secondaries?
980 0 : if self.intent.get_secondary().len() > self.policy.want_secondaries() {
981 0 : return true;
982 0 : }
983 0 :
984 0 : // Fall through: no optimizations possible
985 0 : false
986 3 : }
987 :
988 : /// Optimize attachments: if a shard has a secondary location that is preferable to
989 : /// its primary location based on soft constraints, switch that secondary location
990 : /// to be attached.
991 : ///
992 : /// `schedule_context` should have been populated with all shards in the tenant, including
993 : /// the one we're trying to optimize (this function will subtract its own contribution before making scoring decisions)
994 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
995 : pub(crate) fn optimize_attachment(
996 : &self,
997 : scheduler: &mut Scheduler,
998 : schedule_context: &ScheduleContext,
999 : ) -> Option<ScheduleOptimization> {
1000 : let attached = (*self.intent.get_attached())?;
1001 :
1002 : let schedule_context = schedule_context.project_detach(self);
1003 :
1004 : // If we already have a secondary that is higher-scoring than out current location,
1005 : // then simply migrate to it.
1006 : for secondary in self.intent.get_secondary() {
1007 : if let Some(true) = self.is_better_location::<AttachedShardTag>(
1008 : scheduler,
1009 : &schedule_context,
1010 : attached,
1011 : *secondary,
1012 : ) {
1013 : return Some(ScheduleOptimization {
1014 : sequence: self.sequence,
1015 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1016 : old_attached_node_id: attached,
1017 : new_attached_node_id: *secondary,
1018 : }),
1019 : });
1020 : }
1021 : }
1022 :
1023 : // Given that none of our current secondaries is a better location than our current
1024 : // attached location (checked above), we may trim any secondaries that are not needed
1025 : // for the placement policy.
1026 : if self.intent.get_secondary().len() > self.policy.want_secondaries() {
1027 : // This code path cleans up extra secondaries after migrating, and/or
1028 : // trims extra secondaries after a PlacementPolicy::Attached(N) was
1029 : // modified to decrease N.
1030 :
1031 : let secondary_scores = self
1032 : .intent
1033 : .get_secondary()
1034 : .iter()
1035 12 : .map(|node_id| {
1036 12 : (
1037 12 : *node_id,
1038 12 : scheduler.compute_node_score::<NodeSecondarySchedulingScore>(
1039 12 : *node_id,
1040 12 : &self.intent.preferred_az_id,
1041 12 : &schedule_context,
1042 12 : ),
1043 12 : )
1044 12 : })
1045 : .collect::<HashMap<_, _>>();
1046 :
1047 12 : if secondary_scores.iter().any(|score| score.1.is_none()) {
1048 : // Trivial case: if we only have one secondary, drop that one
1049 : if self.intent.get_secondary().len() == 1 {
1050 : return Some(ScheduleOptimization {
1051 : sequence: self.sequence,
1052 : action: ScheduleOptimizationAction::RemoveSecondary(
1053 : *self.intent.get_secondary().first().unwrap(),
1054 : ),
1055 : });
1056 : }
1057 :
1058 : // Try to find a "good" secondary to keep, without relying on scores (one or more nodes is in a state
1059 : // where its score can't be calculated), and drop the others. This enables us to make progress in
1060 : // most cases, even if some nodes are offline or have scheduling=pause set.
1061 :
1062 : debug_assert!(self.intent.attached.is_some()); // We should not make it here unless attached -- this
1063 : // logic presumes we are in a mode where we want secondaries to be in non-home AZ
1064 1 : if let Some(retain_secondary) = self.intent.get_secondary().iter().find(|n| {
1065 1 : let in_home_az = scheduler.get_node_az(n) == self.intent.preferred_az_id;
1066 1 : let is_available = secondary_scores
1067 1 : .get(n)
1068 1 : .expect("Built from same list of nodes")
1069 1 : .is_some();
1070 1 : is_available && !in_home_az
1071 1 : }) {
1072 : // Great, we found one to retain. Pick some other to drop.
1073 : if let Some(victim) = self
1074 : .intent
1075 : .get_secondary()
1076 : .iter()
1077 2 : .find(|n| n != &retain_secondary)
1078 : {
1079 : return Some(ScheduleOptimization {
1080 : sequence: self.sequence,
1081 : action: ScheduleOptimizationAction::RemoveSecondary(*victim),
1082 : });
1083 : }
1084 : }
1085 :
1086 : // Fall through: we didn't identify one to remove. This ought to be rare.
1087 : tracing::warn!(
1088 : "Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)",
1089 : self.intent.get_secondary()
1090 : );
1091 : } else {
1092 : let victim = secondary_scores
1093 : .iter()
1094 10 : .max_by_key(|score| score.1.unwrap())
1095 : .unwrap()
1096 : .0;
1097 : return Some(ScheduleOptimization {
1098 : sequence: self.sequence,
1099 : action: ScheduleOptimizationAction::RemoveSecondary(*victim),
1100 : });
1101 : }
1102 : }
1103 :
1104 : let replacement = self.find_better_location::<AttachedShardTag>(
1105 : scheduler,
1106 : &schedule_context,
1107 : attached,
1108 : &[], // Don't exclude secondaries: our preferred attachment location may be a secondary
1109 : );
1110 :
1111 : // We have found a candidate and confirmed that its score is preferable
1112 : // to our current location. See if we have a secondary location in the preferred location already: if not,
1113 : // then create one.
1114 : if let Some(replacement) = replacement {
1115 : // If we are currently in non-preferred AZ, then the scheduler might suggest a location that is better, but still
1116 : // not in our preferred AZ. Migration has a cost in resources an impact to the workload, so we want to avoid doing
1117 : // multiple hops where we might go to some other AZ before eventually finding a suitable location in our preferred
1118 : // AZ: skip this optimization if it is not in our final, preferred AZ.
1119 : //
1120 : // This should be a transient state, there should always be capacity eventually in our preferred AZ (even if nodes
1121 : // there are too overloaded for scheduler to suggest them, more should be provisioned eventually).
1122 : if self.preferred_node.is_none()
1123 : && self.intent.preferred_az_id.is_some()
1124 : && scheduler.get_node_az(&replacement) != self.intent.preferred_az_id
1125 : {
1126 : tracing::debug!(
1127 : "Candidate node {replacement} is not in preferred AZ {:?}",
1128 : self.intent.preferred_az_id
1129 : );
1130 :
1131 : // This should only happen if our current location is not in the preferred AZ, otherwise
1132 : // [`Self::find_better_location`]` should have rejected any other location outside the preferred Az, because
1133 : // AZ is the highest priority part of NodeAttachmentSchedulingScore.
1134 : debug_assert!(scheduler.get_node_az(&attached) != self.intent.preferred_az_id);
1135 :
1136 : return None;
1137 : }
1138 :
1139 : if !self.intent.get_secondary().contains(&replacement) {
1140 : Some(ScheduleOptimization {
1141 : sequence: self.sequence,
1142 : action: ScheduleOptimizationAction::CreateSecondary(replacement),
1143 : })
1144 : } else {
1145 : // We already have a secondary in the preferred location, let's try migrating to it. Our caller
1146 : // will check the warmth of the destination before deciding whether to really execute this.
1147 : Some(ScheduleOptimization {
1148 : sequence: self.sequence,
1149 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1150 : old_attached_node_id: attached,
1151 : new_attached_node_id: replacement,
1152 : }),
1153 : })
1154 : }
1155 : } else {
1156 : // We didn't find somewhere we'd rather be, and we don't have any excess secondaries
1157 : // to clean up: no action required.
1158 : None
1159 : }
1160 : }
1161 :
1162 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1163 : pub(crate) fn optimize_secondary(
1164 : &self,
1165 : scheduler: &mut Scheduler,
1166 : schedule_context: &ScheduleContext,
1167 : ) -> Option<ScheduleOptimization> {
1168 : if self.intent.get_secondary().len() > self.policy.want_secondaries() {
1169 : // We have extra secondaries, perhaps to facilitate a migration of the attached location:
1170 : // do nothing, it is up to [`Self::optimize_attachment`] to clean them up. When that's done,
1171 : // and we are called again, we will proceed.
1172 : tracing::debug!("Too many secondaries: skipping");
1173 : return None;
1174 : }
1175 :
1176 : let schedule_context = schedule_context.project_detach(self);
1177 :
1178 : for secondary in self.intent.get_secondary() {
1179 : // Make sure we don't try to migrate a secondary to our attached location: this case happens
1180 : // easily in environments without multiple AZs.
1181 : let exclude = match self.intent.attached {
1182 : Some(attached) => vec![attached],
1183 : None => vec![],
1184 : };
1185 :
1186 : let replacement = match &self.policy {
1187 : PlacementPolicy::Attached(_) => {
1188 : // Secondaries for an attached shard should be scheduled using `SecondaryShardTag`
1189 : // to avoid placing them in the preferred AZ.
1190 : self.find_better_location::<SecondaryShardTag>(
1191 : scheduler,
1192 : &schedule_context,
1193 : *secondary,
1194 : &exclude,
1195 : )
1196 : }
1197 : PlacementPolicy::Secondary => {
1198 : // In secondary-only mode, we want our secondary locations in the preferred AZ,
1199 : // so that they're ready to take over as an attached location when we transition
1200 : // into PlacementPolicy::Attached.
1201 : self.find_better_location::<AttachedShardTag>(
1202 : scheduler,
1203 : &schedule_context,
1204 : *secondary,
1205 : &exclude,
1206 : )
1207 : }
1208 : PlacementPolicy::Detached => None,
1209 : };
1210 :
1211 : assert!(replacement != Some(*secondary));
1212 : if let Some(replacement) = replacement {
1213 : // We have found a candidate and confirmed that its score is preferable
1214 : // to our current location. See if we have a secondary location in the preferred location already: if not,
1215 : // then create one.
1216 : return Some(ScheduleOptimization {
1217 : sequence: self.sequence,
1218 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
1219 : old_node_id: *secondary,
1220 : new_node_id: replacement,
1221 : }),
1222 : });
1223 : }
1224 : }
1225 :
1226 : None
1227 : }
1228 :
1229 : /// Start or abort a graceful migration of this shard to another pageserver. This works on top of the
1230 : /// other optimisation functions, to bias them to move to the destination node.
1231 0 : pub(crate) fn set_preferred_node(&mut self, node: Option<NodeId>) {
1232 0 : if let Some(hint) = self.preferred_node.as_ref() {
1233 0 : if Some(hint) != node.as_ref() {
1234 : // This is legal but a bit surprising: we expect that administrators wouldn't usually
1235 : // change their mind about where to migrate something.
1236 0 : tracing::warn!(
1237 0 : "Changing migration destination from {hint} to {node:?} (current intent {:?})",
1238 : self.intent
1239 : );
1240 0 : }
1241 0 : }
1242 :
1243 0 : self.preferred_node = node;
1244 0 : }
1245 :
1246 0 : pub(crate) fn get_preferred_node(&self) -> Option<NodeId> {
1247 0 : self.preferred_node
1248 0 : }
1249 :
1250 : /// Return true if the optimization was really applied: it will not be applied if the optimization's
1251 : /// sequence is behind this tenant shard's
1252 17 : pub(crate) fn apply_optimization(
1253 17 : &mut self,
1254 17 : scheduler: &mut Scheduler,
1255 17 : optimization: ScheduleOptimization,
1256 17 : ) -> bool {
1257 17 : if optimization.sequence != self.sequence {
1258 0 : return false;
1259 17 : }
1260 17 :
1261 17 : metrics::METRICS_REGISTRY
1262 17 : .metrics_group
1263 17 : .storage_controller_schedule_optimization
1264 17 : .inc();
1265 17 :
1266 17 : match optimization.action {
1267 : ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1268 5 : old_attached_node_id,
1269 5 : new_attached_node_id,
1270 5 : }) => {
1271 5 : self.intent.demote_attached(scheduler, old_attached_node_id);
1272 5 : self.intent
1273 5 : .promote_attached(scheduler, new_attached_node_id);
1274 :
1275 5 : if let Some(hint) = self.preferred_node.as_ref() {
1276 1 : if hint == &new_attached_node_id {
1277 : // The migration target is not a long term pin: once we are done with the migration, clear it.
1278 1 : tracing::info!("Graceful migration to {hint} complete");
1279 1 : self.preferred_node = None;
1280 0 : }
1281 4 : }
1282 : }
1283 : ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
1284 1 : old_node_id,
1285 1 : new_node_id,
1286 1 : }) => {
1287 1 : self.intent.remove_secondary(scheduler, old_node_id);
1288 1 : self.intent.push_secondary(scheduler, new_node_id);
1289 1 : }
1290 4 : ScheduleOptimizationAction::CreateSecondary(new_node_id) => {
1291 4 : self.intent.push_secondary(scheduler, new_node_id);
1292 4 : }
1293 7 : ScheduleOptimizationAction::RemoveSecondary(old_secondary) => {
1294 7 : self.intent.remove_secondary(scheduler, old_secondary);
1295 7 : }
1296 : }
1297 :
1298 17 : true
1299 17 : }
1300 :
1301 : /// When a shard has several secondary locations, we need to pick one in situations where
1302 : /// we promote one of them to an attached location:
1303 : /// - When draining a node for restart
1304 : /// - When responding to a node failure
1305 : ///
1306 : /// In this context, 'preferred' does not mean the node with the best scheduling score: instead
1307 : /// we want to pick the node which is best for use _temporarily_ while the previous attached location
1308 : /// is unavailable (e.g. because it's down or deploying). That means we prefer to use secondary
1309 : /// locations in a non-preferred AZ, as they're more likely to have awarm cache than a temporary
1310 : /// secondary in the preferred AZ (which are usually only created for migrations, and if they exist
1311 : /// they're probably not warmed up yet). The latter behavior is based oni
1312 : ///
1313 : /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
1314 : /// caller needs to a pick a node some other way.
1315 12832 : pub(crate) fn preferred_secondary(&self, scheduler: &Scheduler) -> Option<NodeId> {
1316 12832 : let candidates = scheduler.filter_usable_nodes(&self.intent.secondary);
1317 12832 :
1318 12832 : // We will sort candidates to prefer nodes which are _not_ in our preferred AZ, i.e. we prefer
1319 12832 : // to migrate to a long-lived secondary location (which would have been scheduled in a non-preferred AZ),
1320 12832 : // rather than a short-lived secondary location being used for optimization/migration (which would have
1321 12832 : // been scheduled in our preferred AZ).
1322 12832 : let mut candidates = candidates
1323 12832 : .iter()
1324 12832 : .map(|(node_id, node_az)| {
1325 2 : if node_az == &self.intent.preferred_az_id {
1326 1 : (1, *node_id)
1327 : } else {
1328 1 : (0, *node_id)
1329 : }
1330 12832 : })
1331 12832 : .collect::<Vec<_>>();
1332 12832 :
1333 12832 : candidates.sort();
1334 12832 :
1335 12832 : candidates.first().map(|i| i.1)
1336 12832 : }
1337 :
1338 : /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
1339 : /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
1340 : /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
1341 : ///
1342 : /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
1343 : /// funciton should not be used to decide whether to reconcile.
1344 0 : pub(crate) fn stably_attached(&self) -> Option<NodeId> {
1345 0 : if let Some(attach_intent) = self.intent.attached {
1346 0 : match self.observed.locations.get(&attach_intent) {
1347 0 : Some(loc) => match &loc.conf {
1348 0 : Some(conf) => match conf.mode {
1349 : LocationConfigMode::AttachedMulti
1350 : | LocationConfigMode::AttachedSingle
1351 : | LocationConfigMode::AttachedStale => {
1352 : // Our intent and observed state agree that this node is in an attached state.
1353 0 : Some(attach_intent)
1354 : }
1355 : // Our observed config is not an attached state
1356 0 : _ => None,
1357 : },
1358 : // Our observed state is None, i.e. in flux
1359 0 : None => None,
1360 : },
1361 : // We have no observed state for this node
1362 0 : None => None,
1363 : }
1364 : } else {
1365 : // Our intent is not to attach
1366 0 : None
1367 : }
1368 0 : }
1369 :
1370 0 : fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
1371 0 : let mut dirty_nodes = HashSet::new();
1372 :
1373 0 : if let Some(node_id) = self.intent.attached {
1374 : // Maybe panic: it is a severe bug if we try to attach while generation is null.
1375 0 : let generation = self
1376 0 : .generation
1377 0 : .expect("Attempted to enter attached state without a generation");
1378 0 :
1379 0 : let wanted_conf =
1380 0 : attached_location_conf(generation, &self.shard, &self.config, &self.policy);
1381 0 : match self.observed.locations.get(&node_id) {
1382 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
1383 0 : Some(_) | None => {
1384 0 : dirty_nodes.insert(node_id);
1385 0 : }
1386 : }
1387 0 : }
1388 :
1389 0 : for node_id in &self.intent.secondary {
1390 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
1391 0 : match self.observed.locations.get(node_id) {
1392 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
1393 0 : Some(_) | None => {
1394 0 : dirty_nodes.insert(*node_id);
1395 0 : }
1396 : }
1397 : }
1398 :
1399 0 : for node_id in self.observed.locations.keys() {
1400 0 : if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
1401 0 : // We have observed state that isn't part of our intent: need to clean it up.
1402 0 : dirty_nodes.insert(*node_id);
1403 0 : }
1404 : }
1405 :
1406 0 : dirty_nodes.retain(|node_id| {
1407 0 : nodes
1408 0 : .get(node_id)
1409 0 : .map(|n| n.is_available())
1410 0 : .unwrap_or(false)
1411 0 : });
1412 0 :
1413 0 : !dirty_nodes.is_empty()
1414 0 : }
1415 :
1416 : #[allow(clippy::too_many_arguments)]
1417 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1418 : pub(crate) fn get_reconcile_needed(
1419 : &mut self,
1420 : pageservers: &Arc<HashMap<NodeId, Node>>,
1421 : ) -> ReconcileNeeded {
1422 : // If there are any ambiguous observed states, and the nodes they refer to are available,
1423 : // we should reconcile to clean them up.
1424 : let mut dirty_observed = false;
1425 : for (node_id, observed_loc) in &self.observed.locations {
1426 : let node = pageservers
1427 : .get(node_id)
1428 : .expect("Nodes may not be removed while referenced");
1429 : if observed_loc.conf.is_none() && node.is_available() {
1430 : dirty_observed = true;
1431 : break;
1432 : }
1433 : }
1434 :
1435 : let active_nodes_dirty = self.dirty(pageservers);
1436 :
1437 : let reconcile_needed = match (
1438 : active_nodes_dirty,
1439 : dirty_observed,
1440 : self.pending_compute_notification,
1441 : ) {
1442 : (true, _, _) => ReconcileNeeded::Yes(ReconcileReason::ActiveNodesDirty),
1443 : (_, true, _) => ReconcileNeeded::Yes(ReconcileReason::UnknownLocation),
1444 : (_, _, true) => ReconcileNeeded::Yes(ReconcileReason::PendingComputeNotification),
1445 : _ => ReconcileNeeded::No,
1446 : };
1447 :
1448 : if matches!(reconcile_needed, ReconcileNeeded::No) {
1449 : tracing::debug!("Not dirty, no reconciliation needed.");
1450 : return ReconcileNeeded::No;
1451 : }
1452 :
1453 : // If we are currently splitting, then never start a reconciler task: the splitting logic
1454 : // requires that shards are not interfered with while it runs. Do this check here rather than
1455 : // up top, so that we only log this message if we would otherwise have done a reconciliation.
1456 : if !matches!(self.splitting, SplitState::Idle) {
1457 : tracing::info!("Refusing to reconcile, splitting in progress");
1458 : return ReconcileNeeded::No;
1459 : }
1460 :
1461 : // Reconcile already in flight for the current sequence?
1462 : if let Some(handle) = &self.reconciler {
1463 : if handle.sequence == self.sequence {
1464 : tracing::info!(
1465 : "Reconciliation already in progress for sequence {:?}",
1466 : self.sequence,
1467 : );
1468 : return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
1469 : tenant_shard_id: self.tenant_shard_id,
1470 : seq_wait: self.waiter.clone(),
1471 : error_seq_wait: self.error_waiter.clone(),
1472 : error: self.last_error.clone(),
1473 : seq: self.sequence,
1474 : });
1475 : }
1476 : }
1477 :
1478 : // Pre-checks done: finally check whether we may actually do the work
1479 : match self.scheduling_policy {
1480 : ShardSchedulingPolicy::Active
1481 : | ShardSchedulingPolicy::Essential
1482 : | ShardSchedulingPolicy::Pause => {}
1483 : ShardSchedulingPolicy::Stop => {
1484 : // We only reach this point if there is work to do and we're going to skip
1485 : // doing it: warn it obvious why this tenant isn't doing what it ought to.
1486 : tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
1487 : return ReconcileNeeded::No;
1488 : }
1489 : }
1490 :
1491 : reconcile_needed
1492 : }
1493 :
1494 : /// Ensure the sequence number is set to a value where waiting for this value will make us wait
1495 : /// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
1496 : ///
1497 : /// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
1498 : /// that the waiter will not complete until some future Reconciler is constructed and run.
1499 0 : fn ensure_sequence_ahead(&mut self) {
1500 0 : // Find the highest sequence for which a Reconciler has previously run or is currently
1501 0 : // running
1502 0 : let max_seen = std::cmp::max(
1503 0 : self.reconciler
1504 0 : .as_ref()
1505 0 : .map(|r| r.sequence)
1506 0 : .unwrap_or(Sequence(0)),
1507 0 : std::cmp::max(self.waiter.load(), self.error_waiter.load()),
1508 0 : );
1509 0 :
1510 0 : if self.sequence <= max_seen {
1511 0 : self.sequence = max_seen.next();
1512 0 : }
1513 0 : }
1514 :
1515 : /// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
1516 : ///
1517 : /// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
1518 : /// you would like to wait on the next reconciler that gets spawned in the background.
1519 0 : pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
1520 0 : self.ensure_sequence_ahead();
1521 0 :
1522 0 : ReconcilerWaiter {
1523 0 : tenant_shard_id: self.tenant_shard_id,
1524 0 : seq_wait: self.waiter.clone(),
1525 0 : error_seq_wait: self.error_waiter.clone(),
1526 0 : error: self.last_error.clone(),
1527 0 : seq: self.sequence,
1528 0 : }
1529 0 : }
1530 :
1531 0 : async fn reconcile(
1532 0 : sequence: Sequence,
1533 0 : mut reconciler: Reconciler,
1534 0 : must_notify: bool,
1535 0 : ) -> ReconcileResult {
1536 : // Attempt to make observed state match intent state
1537 0 : let result = reconciler.reconcile().await;
1538 :
1539 : // If we know we had a pending compute notification from some previous action, send a notification irrespective
1540 : // of whether the above reconcile() did any work. It has to be Ok() though, because otherwise we might be
1541 : // sending a notification of a location that isn't really attached.
1542 0 : if result.is_ok() && must_notify {
1543 : // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
1544 0 : reconciler.compute_notify().await.ok();
1545 0 : } else if must_notify {
1546 0 : // Carry this flag so that the reconciler's result will indicate that it still needs to retry
1547 0 : // the compute hook notification eventually.
1548 0 : reconciler.compute_notify_failure = true;
1549 0 : }
1550 :
1551 : // Update result counter
1552 0 : let outcome_label = match &result {
1553 0 : Ok(_) => ReconcileOutcome::Success,
1554 0 : Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
1555 0 : Err(_) => ReconcileOutcome::Error,
1556 : };
1557 :
1558 0 : metrics::METRICS_REGISTRY
1559 0 : .metrics_group
1560 0 : .storage_controller_reconcile_complete
1561 0 : .inc(ReconcileCompleteLabelGroup {
1562 0 : status: outcome_label,
1563 0 : });
1564 0 :
1565 0 : // Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
1566 0 : // try and schedule more work in response to our result.
1567 0 : ReconcileResult {
1568 0 : sequence,
1569 0 : result,
1570 0 : tenant_shard_id: reconciler.tenant_shard_id,
1571 0 : generation: reconciler.generation,
1572 0 : observed_deltas: reconciler.observed_deltas(),
1573 0 : pending_compute_notification: reconciler.compute_notify_failure,
1574 0 : }
1575 0 : }
1576 :
1577 : #[allow(clippy::too_many_arguments)]
1578 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1579 : pub(crate) fn spawn_reconciler(
1580 : &mut self,
1581 : reason: ReconcileReason,
1582 : result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
1583 : pageservers: &Arc<HashMap<NodeId, Node>>,
1584 : compute_hook: &Arc<ComputeHook>,
1585 : reconciler_config: ReconcilerConfig,
1586 : service_config: &service::Config,
1587 : persistence: &Arc<Persistence>,
1588 : units: ReconcileUnits,
1589 : gate_guard: GateGuard,
1590 : cancel: &CancellationToken,
1591 : ) -> Option<ReconcilerWaiter> {
1592 : // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
1593 : // doing our sequence's work.
1594 : let old_handle = self.reconciler.take();
1595 :
1596 : // Build list of nodes from which the reconciler should detach
1597 : let mut detach = Vec::new();
1598 : for node_id in self.observed.locations.keys() {
1599 : if self.intent.get_attached() != &Some(*node_id)
1600 : && !self.intent.secondary.contains(node_id)
1601 : {
1602 : detach.push(
1603 : pageservers
1604 : .get(node_id)
1605 : .expect("Intent references non-existent pageserver")
1606 : .clone(),
1607 : )
1608 : }
1609 : }
1610 :
1611 : // Advance the sequence before spawning a reconciler, so that sequence waiters
1612 : // can distinguish between before+after the reconcile completes.
1613 : self.ensure_sequence_ahead();
1614 :
1615 : let reconciler_cancel = cancel.child_token();
1616 : let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
1617 : let reconciler = Reconciler {
1618 : tenant_shard_id: self.tenant_shard_id,
1619 : shard: self.shard,
1620 : placement_policy: self.policy.clone(),
1621 : generation: self.generation,
1622 : intent: reconciler_intent,
1623 : detach,
1624 : reconciler_config,
1625 : config: self.config.clone(),
1626 : preferred_az: self.intent.preferred_az_id.clone(),
1627 : observed: self.observed.clone(),
1628 : original_observed: self.observed.clone(),
1629 : compute_hook: compute_hook.clone(),
1630 : service_config: service_config.clone(),
1631 : _gate_guard: gate_guard,
1632 : _resource_units: units,
1633 : cancel: reconciler_cancel.clone(),
1634 : persistence: persistence.clone(),
1635 : compute_notify_failure: false,
1636 : };
1637 :
1638 : let reconcile_seq = self.sequence;
1639 : let long_reconcile_threshold = service_config.long_reconcile_threshold;
1640 :
1641 : tracing::info!(seq=%reconcile_seq, "Spawning Reconciler ({reason:?})");
1642 : let must_notify = self.pending_compute_notification;
1643 : let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
1644 : tenant_id=%reconciler.tenant_shard_id.tenant_id,
1645 : shard_id=%reconciler.tenant_shard_id.shard_slug());
1646 : metrics::METRICS_REGISTRY
1647 : .metrics_group
1648 : .storage_controller_reconcile_spawn
1649 : .inc();
1650 : let result_tx = result_tx.clone();
1651 : let join_handle = tokio::task::spawn(
1652 0 : async move {
1653 : // Wait for any previous reconcile task to complete before we start
1654 0 : if let Some(old_handle) = old_handle {
1655 0 : old_handle.cancel.cancel();
1656 0 : if let Err(e) = old_handle.handle.await {
1657 : // We can't do much with this other than log it: the task is done, so
1658 : // we may proceed with our work.
1659 0 : tracing::error!("Unexpected join error waiting for reconcile task: {e}");
1660 0 : }
1661 0 : }
1662 :
1663 : // Early check for cancellation before doing any work
1664 : // TODO: wrap all remote API operations in cancellation check
1665 : // as well.
1666 0 : if reconciler.cancel.is_cancelled() {
1667 0 : metrics::METRICS_REGISTRY
1668 0 : .metrics_group
1669 0 : .storage_controller_reconcile_complete
1670 0 : .inc(ReconcileCompleteLabelGroup {
1671 0 : status: ReconcileOutcome::Cancel,
1672 0 : });
1673 0 : return;
1674 0 : }
1675 0 :
1676 0 : let (tenant_id_label, shard_number_label, sequence_label) = {
1677 0 : (
1678 0 : reconciler.tenant_shard_id.tenant_id.to_string(),
1679 0 : reconciler.tenant_shard_id.shard_number.0.to_string(),
1680 0 : reconcile_seq.to_string(),
1681 0 : )
1682 0 : };
1683 0 :
1684 0 : let label_group = ReconcileLongRunningLabelGroup {
1685 0 : tenant_id: &tenant_id_label,
1686 0 : shard_number: &shard_number_label,
1687 0 : sequence: &sequence_label,
1688 0 : };
1689 0 :
1690 0 : let reconcile_fut = Self::reconcile(reconcile_seq, reconciler, must_notify);
1691 0 : let long_reconcile_fut = {
1692 0 : let label_group = label_group.clone();
1693 0 : async move {
1694 0 : tokio::time::sleep(long_reconcile_threshold).await;
1695 :
1696 0 : tracing::warn!("Reconcile passed the long running threshold of {long_reconcile_threshold:?}");
1697 :
1698 0 : metrics::METRICS_REGISTRY
1699 0 : .metrics_group
1700 0 : .storage_controller_reconcile_long_running
1701 0 : .inc(label_group);
1702 0 : }
1703 : };
1704 :
1705 0 : let reconcile_fut = std::pin::pin!(reconcile_fut);
1706 0 : let long_reconcile_fut = std::pin::pin!(long_reconcile_fut);
1707 :
1708 0 : let (was_long, result) =
1709 0 : match future::select(reconcile_fut, long_reconcile_fut).await {
1710 0 : Either::Left((reconcile_result, _)) => (false, reconcile_result),
1711 0 : Either::Right((_, reconcile_fut)) => (true, reconcile_fut.await),
1712 : };
1713 :
1714 0 : if was_long {
1715 0 : let id = metrics::METRICS_REGISTRY
1716 0 : .metrics_group
1717 0 : .storage_controller_reconcile_long_running
1718 0 : .with_labels(label_group);
1719 0 : metrics::METRICS_REGISTRY
1720 0 : .metrics_group
1721 0 : .storage_controller_reconcile_long_running
1722 0 : .remove_metric(id);
1723 0 : }
1724 :
1725 0 : result_tx
1726 0 : .send(ReconcileResultRequest::ReconcileResult(result))
1727 0 : .ok();
1728 0 : }
1729 : .instrument(reconciler_span),
1730 : );
1731 :
1732 : self.reconciler = Some(ReconcilerHandle {
1733 : sequence: self.sequence,
1734 : handle: join_handle,
1735 : cancel: reconciler_cancel,
1736 : });
1737 :
1738 : Some(ReconcilerWaiter {
1739 : tenant_shard_id: self.tenant_shard_id,
1740 : seq_wait: self.waiter.clone(),
1741 : error_seq_wait: self.error_waiter.clone(),
1742 : error: self.last_error.clone(),
1743 : seq: self.sequence,
1744 : })
1745 : }
1746 :
1747 0 : pub(crate) fn cancel_reconciler(&self) {
1748 0 : if let Some(handle) = self.reconciler.as_ref() {
1749 0 : handle.cancel.cancel()
1750 0 : }
1751 0 : }
1752 :
1753 : /// Get a waiter for any reconciliation in flight, but do not start reconciliation
1754 : /// if it is not already running
1755 0 : pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
1756 0 : if self.reconciler.is_some() {
1757 0 : Some(ReconcilerWaiter {
1758 0 : tenant_shard_id: self.tenant_shard_id,
1759 0 : seq_wait: self.waiter.clone(),
1760 0 : error_seq_wait: self.error_waiter.clone(),
1761 0 : error: self.last_error.clone(),
1762 0 : seq: self.sequence,
1763 0 : })
1764 : } else {
1765 0 : None
1766 : }
1767 0 : }
1768 :
1769 : /// Called when a ReconcileResult has been emitted and the service is updating
1770 : /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
1771 : /// the handle to indicate there is no longer a reconciliation in progress.
1772 0 : pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
1773 0 : if let Some(reconcile_handle) = &self.reconciler {
1774 0 : if reconcile_handle.sequence <= sequence {
1775 0 : self.reconciler = None;
1776 0 : }
1777 0 : }
1778 0 : }
1779 :
1780 : /// If we had any state at all referring to this node ID, drop it. Does not
1781 : /// attempt to reschedule.
1782 : ///
1783 : /// Returns true if we modified the node's intent state.
1784 0 : pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
1785 0 : let mut intent_modified = false;
1786 0 :
1787 0 : // Drop if this node was our attached intent
1788 0 : if self.intent.attached == Some(node_id) {
1789 0 : self.intent.attached = None;
1790 0 : intent_modified = true;
1791 0 : }
1792 :
1793 : // Drop from the list of secondaries, and check if we modified it
1794 0 : let had_secondaries = self.intent.secondary.len();
1795 0 : self.intent.secondary.retain(|n| n != &node_id);
1796 0 : intent_modified |= self.intent.secondary.len() != had_secondaries;
1797 0 :
1798 0 : debug_assert!(!self.intent.all_pageservers().contains(&node_id));
1799 :
1800 0 : if self.preferred_node == Some(node_id) {
1801 0 : self.preferred_node = None;
1802 0 : }
1803 :
1804 0 : intent_modified
1805 0 : }
1806 :
1807 0 : pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
1808 0 : self.scheduling_policy = p;
1809 0 : }
1810 :
1811 0 : pub(crate) fn get_scheduling_policy(&self) -> ShardSchedulingPolicy {
1812 0 : self.scheduling_policy
1813 0 : }
1814 :
1815 0 : pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
1816 0 : // Ordering: always set last_error before advancing sequence, so that sequence
1817 0 : // waiters are guaranteed to see a Some value when they see an error.
1818 0 : *(self.last_error.lock().unwrap()) = Some(Arc::new(error));
1819 0 : self.error_waiter.advance(sequence);
1820 0 : }
1821 :
1822 0 : pub(crate) fn from_persistent(
1823 0 : tsp: TenantShardPersistence,
1824 0 : intent: IntentState,
1825 0 : ) -> anyhow::Result<Self> {
1826 0 : let tenant_shard_id = tsp.get_tenant_shard_id()?;
1827 0 : let shard_identity = tsp.get_shard_identity()?;
1828 :
1829 0 : metrics::METRICS_REGISTRY
1830 0 : .metrics_group
1831 0 : .storage_controller_tenant_shards
1832 0 : .inc();
1833 0 :
1834 0 : Ok(Self {
1835 0 : tenant_shard_id,
1836 0 : shard: shard_identity,
1837 0 : sequence: Sequence::initial(),
1838 0 : generation: tsp.generation.map(|g| Generation::new(g as u32)),
1839 0 : policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
1840 0 : intent,
1841 0 : observed: ObservedState::new(),
1842 0 : config: serde_json::from_str(&tsp.config).unwrap(),
1843 0 : reconciler: None,
1844 0 : splitting: tsp.splitting,
1845 0 : waiter: Arc::new(SeqWait::new(Sequence::initial())),
1846 0 : error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
1847 0 : last_error: Arc::default(),
1848 0 : pending_compute_notification: false,
1849 0 : delayed_reconcile: false,
1850 0 : scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
1851 0 : preferred_node: None,
1852 0 : })
1853 0 : }
1854 :
1855 0 : pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
1856 0 : TenantShardPersistence {
1857 0 : tenant_id: self.tenant_shard_id.tenant_id.to_string(),
1858 0 : shard_number: self.tenant_shard_id.shard_number.0 as i32,
1859 0 : shard_count: self.tenant_shard_id.shard_count.literal() as i32,
1860 0 : shard_stripe_size: self.shard.stripe_size.0 as i32,
1861 0 : generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
1862 0 : generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
1863 0 : placement_policy: serde_json::to_string(&self.policy).unwrap(),
1864 0 : config: serde_json::to_string(&self.config).unwrap(),
1865 0 : splitting: SplitState::default(),
1866 0 : scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
1867 0 : preferred_az_id: self.intent.preferred_az_id.as_ref().map(|az| az.0.clone()),
1868 0 : }
1869 0 : }
1870 :
1871 12508 : pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> {
1872 12508 : self.intent.get_preferred_az()
1873 12508 : }
1874 :
1875 2 : pub(crate) fn set_preferred_az(
1876 2 : &mut self,
1877 2 : scheduler: &mut Scheduler,
1878 2 : preferred_az_id: Option<AvailabilityZone>,
1879 2 : ) {
1880 2 : self.intent.set_preferred_az(scheduler, preferred_az_id);
1881 2 : }
1882 :
1883 : /// Returns all the nodes to which this tenant shard is attached according to the
1884 : /// observed state and the generations. Return vector is sorted from latest generation
1885 : /// to earliest.
1886 0 : pub(crate) fn attached_locations(&self) -> Vec<(NodeId, Generation)> {
1887 0 : self.observed
1888 0 : .locations
1889 0 : .iter()
1890 0 : .filter_map(|(node_id, observed)| {
1891 : use LocationConfigMode::{AttachedMulti, AttachedSingle, AttachedStale};
1892 :
1893 0 : let conf = observed.conf.as_ref()?;
1894 :
1895 0 : match (conf.generation, conf.mode) {
1896 0 : (Some(gen_), AttachedMulti | AttachedSingle | AttachedStale) => {
1897 0 : Some((*node_id, gen_))
1898 : }
1899 0 : _ => None,
1900 : }
1901 0 : })
1902 0 : .sorted_by(|(_lhs_node_id, lhs_gen), (_rhs_node_id, rhs_gen)| {
1903 0 : lhs_gen.cmp(rhs_gen).reverse()
1904 0 : })
1905 0 : .map(|(node_id, gen_)| (node_id, Generation::new(gen_)))
1906 0 : .collect()
1907 0 : }
1908 :
1909 : /// Update the observed state of the tenant by applying incremental deltas
1910 : ///
1911 : /// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
1912 : /// They are then filtered in [`crate::service::Service::process_result`].
1913 0 : pub(crate) fn apply_observed_deltas(
1914 0 : &mut self,
1915 0 : deltas: impl Iterator<Item = ObservedStateDelta>,
1916 0 : ) {
1917 0 : for delta in deltas {
1918 0 : match delta {
1919 0 : ObservedStateDelta::Upsert(ups) => {
1920 0 : let (node_id, loc) = *ups;
1921 0 :
1922 0 : // If the generation of the observed location in the delta is lagging
1923 0 : // behind the current one, then we have a race condition and cannot
1924 0 : // be certain about the true observed state. Set the observed state
1925 0 : // to None in order to reflect this.
1926 0 : let crnt_gen = self
1927 0 : .observed
1928 0 : .locations
1929 0 : .get(&node_id)
1930 0 : .and_then(|loc| loc.conf.as_ref())
1931 0 : .and_then(|conf| conf.generation);
1932 0 : let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
1933 0 : match (crnt_gen, new_gen) {
1934 0 : (Some(crnt), Some(new)) if crnt_gen > new_gen => {
1935 0 : tracing::warn!(
1936 0 : "Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
1937 : node_id,
1938 : loc,
1939 : crnt,
1940 : new
1941 : );
1942 :
1943 0 : self.observed
1944 0 : .locations
1945 0 : .insert(node_id, ObservedStateLocation { conf: None });
1946 0 :
1947 0 : continue;
1948 : }
1949 0 : _ => {}
1950 : }
1951 :
1952 0 : if let Some(conf) = &loc.conf {
1953 0 : tracing::info!("Updating observed location {}: {:?}", node_id, conf);
1954 : } else {
1955 0 : tracing::info!("Setting observed location {} to None", node_id,)
1956 : }
1957 :
1958 0 : self.observed.locations.insert(node_id, loc);
1959 : }
1960 0 : ObservedStateDelta::Delete(node_id) => {
1961 0 : tracing::info!("Deleting observed location {}", node_id);
1962 0 : self.observed.locations.remove(&node_id);
1963 : }
1964 : }
1965 : }
1966 0 : }
1967 :
1968 : /// Returns true if the tenant shard is attached to a node that is outside the preferred AZ.
1969 : ///
1970 : /// If the shard does not have a preferred AZ, returns false.
1971 0 : pub(crate) fn is_attached_outside_preferred_az(&self, nodes: &HashMap<NodeId, Node>) -> bool {
1972 0 : self.intent
1973 0 : .get_attached()
1974 0 : .map(|node_id| {
1975 0 : Some(
1976 0 : nodes
1977 0 : .get(&node_id)
1978 0 : .expect("referenced node exists")
1979 0 : .get_availability_zone_id(),
1980 0 : ) != self.intent.preferred_az_id.as_ref()
1981 0 : })
1982 0 : .unwrap_or(false)
1983 0 : }
1984 : }
1985 :
1986 : impl Drop for TenantShard {
1987 12843 : fn drop(&mut self) {
1988 12843 : metrics::METRICS_REGISTRY
1989 12843 : .metrics_group
1990 12843 : .storage_controller_tenant_shards
1991 12843 : .dec();
1992 12843 : }
1993 : }
1994 :
1995 : #[cfg(test)]
1996 : pub(crate) mod tests {
1997 : use std::cell::RefCell;
1998 : use std::rc::Rc;
1999 :
2000 : use pageserver_api::controller_api::NodeAvailability;
2001 : use pageserver_api::shard::{ShardCount, ShardNumber};
2002 : use rand::SeedableRng;
2003 : use rand::rngs::StdRng;
2004 : use utils::id::TenantId;
2005 :
2006 : use super::*;
2007 : use crate::scheduler::test_utils::make_test_nodes;
2008 :
2009 12 : fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
2010 12 : let tenant_id = TenantId::generate();
2011 12 : let shard_number = ShardNumber(0);
2012 12 : let shard_count = ShardCount::new(1);
2013 12 :
2014 12 : let tenant_shard_id = TenantShardId {
2015 12 : tenant_id,
2016 12 : shard_number,
2017 12 : shard_count,
2018 12 : };
2019 12 : TenantShard::new(
2020 12 : tenant_shard_id,
2021 12 : ShardIdentity::new(
2022 12 : shard_number,
2023 12 : shard_count,
2024 12 : pageserver_api::shard::ShardStripeSize(32768),
2025 12 : )
2026 12 : .unwrap(),
2027 12 : policy,
2028 12 : None,
2029 12 : )
2030 12 : }
2031 :
2032 5004 : pub(crate) fn make_test_tenant(
2033 5004 : policy: PlacementPolicy,
2034 5004 : shard_count: ShardCount,
2035 5004 : preferred_az: Option<AvailabilityZone>,
2036 5004 : ) -> Vec<TenantShard> {
2037 5004 : make_test_tenant_with_id(TenantId::generate(), policy, shard_count, preferred_az)
2038 5004 : }
2039 :
2040 5007 : pub(crate) fn make_test_tenant_with_id(
2041 5007 : tenant_id: TenantId,
2042 5007 : policy: PlacementPolicy,
2043 5007 : shard_count: ShardCount,
2044 5007 : preferred_az: Option<AvailabilityZone>,
2045 5007 : ) -> Vec<TenantShard> {
2046 5007 : (0..shard_count.count())
2047 12522 : .map(|i| {
2048 12522 : let shard_number = ShardNumber(i);
2049 12522 :
2050 12522 : let tenant_shard_id = TenantShardId {
2051 12522 : tenant_id,
2052 12522 : shard_number,
2053 12522 : shard_count,
2054 12522 : };
2055 12522 : TenantShard::new(
2056 12522 : tenant_shard_id,
2057 12522 : ShardIdentity::new(
2058 12522 : shard_number,
2059 12522 : shard_count,
2060 12522 : pageserver_api::shard::ShardStripeSize(32768),
2061 12522 : )
2062 12522 : .unwrap(),
2063 12522 : policy.clone(),
2064 12522 : preferred_az.clone(),
2065 12522 : )
2066 12522 : })
2067 5007 : .collect()
2068 5007 : }
2069 :
2070 : /// Test the scheduling behaviors used when a tenant configured for HA is subject
2071 : /// to nodes being marked offline.
2072 : #[test]
2073 1 : fn tenant_ha_scheduling() -> anyhow::Result<()> {
2074 1 : // Start with three nodes. Our tenant will only use two. The third one is
2075 1 : // expected to remain unused.
2076 1 : let mut nodes = make_test_nodes(3, &[]);
2077 1 :
2078 1 : let mut scheduler = Scheduler::new(nodes.values());
2079 1 : let mut context = ScheduleContext::default();
2080 1 :
2081 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
2082 1 : tenant_shard
2083 1 : .schedule(&mut scheduler, &mut context)
2084 1 : .expect("we have enough nodes, scheduling should work");
2085 1 :
2086 1 : // Expect to initially be schedule on to different nodes
2087 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
2088 1 : assert!(tenant_shard.intent.attached.is_some());
2089 :
2090 1 : let attached_node_id = tenant_shard.intent.attached.unwrap();
2091 1 : let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
2092 1 : assert_ne!(attached_node_id, secondary_node_id);
2093 :
2094 : // Notifying the attached node is offline should demote it to a secondary
2095 1 : let changed = tenant_shard
2096 1 : .intent
2097 1 : .demote_attached(&mut scheduler, attached_node_id);
2098 1 : assert!(changed);
2099 1 : assert!(tenant_shard.intent.attached.is_none());
2100 1 : assert_eq!(tenant_shard.intent.secondary.len(), 2);
2101 :
2102 : // Update the scheduler state to indicate the node is offline
2103 1 : nodes
2104 1 : .get_mut(&attached_node_id)
2105 1 : .unwrap()
2106 1 : .set_availability(NodeAvailability::Offline);
2107 1 : scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
2108 1 :
2109 1 : // Scheduling the node should promote the still-available secondary node to attached
2110 1 : tenant_shard
2111 1 : .schedule(&mut scheduler, &mut context)
2112 1 : .expect("active nodes are available");
2113 1 : assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
2114 :
2115 : // The original attached node should have been retained as a secondary
2116 1 : assert_eq!(
2117 1 : *tenant_shard.intent.secondary.iter().last().unwrap(),
2118 1 : attached_node_id
2119 1 : );
2120 :
2121 1 : tenant_shard.intent.clear(&mut scheduler);
2122 1 :
2123 1 : Ok(())
2124 1 : }
2125 :
2126 : #[test]
2127 1 : fn intent_from_observed() -> anyhow::Result<()> {
2128 1 : let nodes = make_test_nodes(3, &[]);
2129 1 : let mut scheduler = Scheduler::new(nodes.values());
2130 1 :
2131 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
2132 1 :
2133 1 : tenant_shard.observed.locations.insert(
2134 1 : NodeId(3),
2135 1 : ObservedStateLocation {
2136 1 : conf: Some(LocationConfig {
2137 1 : mode: LocationConfigMode::AttachedMulti,
2138 1 : generation: Some(2),
2139 1 : secondary_conf: None,
2140 1 : shard_number: tenant_shard.shard.number.0,
2141 1 : shard_count: tenant_shard.shard.count.literal(),
2142 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
2143 1 : tenant_conf: TenantConfig::default(),
2144 1 : }),
2145 1 : },
2146 1 : );
2147 1 :
2148 1 : tenant_shard.observed.locations.insert(
2149 1 : NodeId(2),
2150 1 : ObservedStateLocation {
2151 1 : conf: Some(LocationConfig {
2152 1 : mode: LocationConfigMode::AttachedStale,
2153 1 : generation: Some(1),
2154 1 : secondary_conf: None,
2155 1 : shard_number: tenant_shard.shard.number.0,
2156 1 : shard_count: tenant_shard.shard.count.literal(),
2157 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
2158 1 : tenant_conf: TenantConfig::default(),
2159 1 : }),
2160 1 : },
2161 1 : );
2162 1 :
2163 1 : tenant_shard.intent_from_observed(&mut scheduler);
2164 1 :
2165 1 : // The highest generationed attached location gets used as attached
2166 1 : assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
2167 : // Other locations get used as secondary
2168 1 : assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
2169 :
2170 1 : scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
2171 :
2172 1 : tenant_shard.intent.clear(&mut scheduler);
2173 1 : Ok(())
2174 1 : }
2175 :
2176 : #[test]
2177 1 : fn scheduling_mode() -> anyhow::Result<()> {
2178 1 : let nodes = make_test_nodes(3, &[]);
2179 1 : let mut scheduler = Scheduler::new(nodes.values());
2180 1 :
2181 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
2182 1 :
2183 1 : // In pause mode, schedule() shouldn't do anything
2184 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
2185 1 : assert!(
2186 1 : tenant_shard
2187 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
2188 1 : .is_ok()
2189 1 : );
2190 1 : assert!(tenant_shard.intent.all_pageservers().is_empty());
2191 :
2192 : // In active mode, schedule() works
2193 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
2194 1 : assert!(
2195 1 : tenant_shard
2196 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
2197 1 : .is_ok()
2198 1 : );
2199 1 : assert!(!tenant_shard.intent.all_pageservers().is_empty());
2200 :
2201 1 : tenant_shard.intent.clear(&mut scheduler);
2202 1 : Ok(())
2203 1 : }
2204 :
2205 : #[test]
2206 : /// Simple case: moving attachment to somewhere better where we already have a secondary
2207 1 : fn optimize_attachment_simple() -> anyhow::Result<()> {
2208 1 : let nodes = make_test_nodes(
2209 1 : 3,
2210 1 : &[
2211 1 : AvailabilityZone("az-a".to_string()),
2212 1 : AvailabilityZone("az-b".to_string()),
2213 1 : AvailabilityZone("az-c".to_string()),
2214 1 : ],
2215 1 : );
2216 1 : let mut scheduler = Scheduler::new(nodes.values());
2217 1 :
2218 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2219 1 : shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2220 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
2221 1 : shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2222 1 :
2223 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
2224 1 : // on different nodes.
2225 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
2226 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(1));
2227 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
2228 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
2229 :
2230 1 : fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
2231 1 : let mut schedule_context = ScheduleContext::default();
2232 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
2233 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
2234 1 : schedule_context
2235 1 : }
2236 :
2237 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2238 1 : let optimization_a = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2239 1 : assert_eq!(
2240 1 : optimization_a,
2241 1 : Some(ScheduleOptimization {
2242 1 : sequence: shard_a.sequence,
2243 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2244 1 : old_attached_node_id: NodeId(2),
2245 1 : new_attached_node_id: NodeId(1)
2246 1 : })
2247 1 : })
2248 1 : );
2249 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
2250 1 :
2251 1 : // // Either shard should recognize that it has the option to switch to a secondary location where there
2252 1 : // // would be no other shards from the same tenant, and request to do so.
2253 1 : // assert_eq!(
2254 1 : // optimization_a_prepare,
2255 1 : // Some(ScheduleOptimization {
2256 1 : // sequence: shard_a.sequence,
2257 1 : // action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
2258 1 : // })
2259 1 : // );
2260 1 : // shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
2261 1 :
2262 1 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2263 1 : // let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2264 1 : // assert_eq!(
2265 1 : // optimization_a_migrate,
2266 1 : // Some(ScheduleOptimization {
2267 1 : // sequence: shard_a.sequence,
2268 1 : // action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2269 1 : // old_attached_node_id: NodeId(1),
2270 1 : // new_attached_node_id: NodeId(2)
2271 1 : // })
2272 1 : // })
2273 1 : // );
2274 1 : // shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
2275 1 :
2276 1 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2277 1 : // let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2278 1 : // assert_eq!(
2279 1 : // optimization_a_cleanup,
2280 1 : // Some(ScheduleOptimization {
2281 1 : // sequence: shard_a.sequence,
2282 1 : // action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
2283 1 : // })
2284 1 : // );
2285 1 : // shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
2286 1 :
2287 1 : // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
2288 1 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2289 1 : // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
2290 1 :
2291 1 : shard_a.intent.clear(&mut scheduler);
2292 1 : shard_b.intent.clear(&mut scheduler);
2293 1 :
2294 1 : Ok(())
2295 1 : }
2296 :
2297 : #[test]
2298 : /// Complicated case: moving attachment to somewhere better where we do not have a secondary
2299 : /// already, creating one as needed.
2300 1 : fn optimize_attachment_multistep() -> anyhow::Result<()> {
2301 1 : let nodes = make_test_nodes(
2302 1 : 3,
2303 1 : &[
2304 1 : AvailabilityZone("az-a".to_string()),
2305 1 : AvailabilityZone("az-b".to_string()),
2306 1 : AvailabilityZone("az-c".to_string()),
2307 1 : ],
2308 1 : );
2309 1 : let mut scheduler = Scheduler::new(nodes.values());
2310 1 :
2311 1 : // Two shards of a tenant that wants to be in AZ A
2312 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2313 1 : shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2314 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
2315 1 : shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2316 1 :
2317 1 : // Both shards are initially attached in non-home AZ _and_ have secondaries in non-home AZs
2318 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
2319 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
2320 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(3)));
2321 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
2322 :
2323 3 : fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
2324 3 : let mut schedule_context = ScheduleContext::default();
2325 3 : schedule_context.avoid(&shard_a.intent.all_pageservers());
2326 3 : schedule_context.avoid(&shard_b.intent.all_pageservers());
2327 3 : schedule_context
2328 3 : }
2329 :
2330 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2331 1 : let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2332 1 : assert_eq!(
2333 1 : optimization_a_prepare,
2334 1 : Some(ScheduleOptimization {
2335 1 : sequence: shard_a.sequence,
2336 1 : action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
2337 1 : })
2338 1 : );
2339 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
2340 1 :
2341 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2342 1 : let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2343 1 : assert_eq!(
2344 1 : optimization_a_migrate,
2345 1 : Some(ScheduleOptimization {
2346 1 : sequence: shard_a.sequence,
2347 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2348 1 : old_attached_node_id: NodeId(2),
2349 1 : new_attached_node_id: NodeId(1)
2350 1 : })
2351 1 : })
2352 1 : );
2353 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
2354 1 :
2355 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2356 1 : let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2357 1 : assert_eq!(
2358 1 : optimization_a_cleanup,
2359 1 : Some(ScheduleOptimization {
2360 1 : sequence: shard_a.sequence,
2361 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
2362 1 : })
2363 1 : );
2364 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
2365 1 :
2366 1 : // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
2367 1 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2368 1 : // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
2369 1 :
2370 1 : shard_a.intent.clear(&mut scheduler);
2371 1 : shard_b.intent.clear(&mut scheduler);
2372 1 :
2373 1 : Ok(())
2374 1 : }
2375 :
2376 : #[test]
2377 : /// How the optimisation code handles a shard with a preferred node set; this is an example
2378 : /// of the multi-step migration, but driven by a different input.
2379 1 : fn optimize_attachment_multi_preferred_node() -> anyhow::Result<()> {
2380 1 : let nodes = make_test_nodes(
2381 1 : 4,
2382 1 : &[
2383 1 : AvailabilityZone("az-a".to_string()),
2384 1 : AvailabilityZone("az-a".to_string()),
2385 1 : AvailabilityZone("az-b".to_string()),
2386 1 : AvailabilityZone("az-b".to_string()),
2387 1 : ],
2388 1 : );
2389 1 : let mut scheduler = Scheduler::new(nodes.values());
2390 1 :
2391 1 : // Two shards of a tenant that wants to be in AZ A
2392 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2393 1 : shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2394 1 :
2395 1 : // Initially attached in a stable location
2396 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
2397 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
2398 1 :
2399 1 : // Set the preferred node to node 2, an equally high scoring node to its current location
2400 1 : shard_a.preferred_node = Some(NodeId(2));
2401 :
2402 3 : fn make_schedule_context(shard_a: &TenantShard) -> ScheduleContext {
2403 3 : let mut schedule_context = ScheduleContext::default();
2404 3 : schedule_context.avoid(&shard_a.intent.all_pageservers());
2405 3 : schedule_context
2406 3 : }
2407 :
2408 1 : let schedule_context = make_schedule_context(&shard_a);
2409 1 : let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2410 1 : assert_eq!(
2411 1 : optimization_a_prepare,
2412 1 : Some(ScheduleOptimization {
2413 1 : sequence: shard_a.sequence,
2414 1 : action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
2415 1 : })
2416 1 : );
2417 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
2418 1 :
2419 1 : // The first step of the optimisation should not have cleared the preferred node
2420 1 : assert_eq!(shard_a.preferred_node, Some(NodeId(2)));
2421 :
2422 1 : let schedule_context = make_schedule_context(&shard_a);
2423 1 : let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2424 1 : assert_eq!(
2425 1 : optimization_a_migrate,
2426 1 : Some(ScheduleOptimization {
2427 1 : sequence: shard_a.sequence,
2428 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2429 1 : old_attached_node_id: NodeId(1),
2430 1 : new_attached_node_id: NodeId(2)
2431 1 : })
2432 1 : })
2433 1 : );
2434 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
2435 1 :
2436 1 : // The cutover step of the optimisation should have cleared the preferred node
2437 1 : assert_eq!(shard_a.preferred_node, None);
2438 :
2439 1 : let schedule_context = make_schedule_context(&shard_a);
2440 1 : let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2441 1 : assert_eq!(
2442 1 : optimization_a_cleanup,
2443 1 : Some(ScheduleOptimization {
2444 1 : sequence: shard_a.sequence,
2445 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
2446 1 : })
2447 1 : );
2448 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
2449 1 :
2450 1 : shard_a.intent.clear(&mut scheduler);
2451 1 :
2452 1 : Ok(())
2453 1 : }
2454 :
2455 : #[test]
2456 : /// Check that multi-step migration works when moving to somewhere that is only better by
2457 : /// 1 AffinityScore -- this ensures that we don't have a bug like the intermediate secondary
2458 : /// counting toward the affinity score such that it prevents the rest of the migration from happening.
2459 1 : fn optimize_attachment_marginal() -> anyhow::Result<()> {
2460 1 : let nodes = make_test_nodes(2, &[]);
2461 1 : let mut scheduler = Scheduler::new(nodes.values());
2462 1 :
2463 1 : // Multi-sharded tenant, we will craft a situation where affinity
2464 1 : // scores differ only slightly
2465 1 : let mut shards = make_test_tenant(PlacementPolicy::Attached(0), ShardCount::new(4), None);
2466 1 :
2467 1 : // 1 attached on node 1
2468 1 : shards[0]
2469 1 : .intent
2470 1 : .set_attached(&mut scheduler, Some(NodeId(1)));
2471 1 : // 3 attached on node 2
2472 1 : shards[1]
2473 1 : .intent
2474 1 : .set_attached(&mut scheduler, Some(NodeId(2)));
2475 1 : shards[2]
2476 1 : .intent
2477 1 : .set_attached(&mut scheduler, Some(NodeId(2)));
2478 1 : shards[3]
2479 1 : .intent
2480 1 : .set_attached(&mut scheduler, Some(NodeId(2)));
2481 :
2482 : // The scheduler should figure out that we need to:
2483 : // - Create a secondary for shard 3 on node 1
2484 : // - Migrate shard 3 to node 1
2485 : // - Remove shard 3's location on node 2
2486 :
2487 4 : fn make_schedule_context(shards: &Vec<TenantShard>) -> ScheduleContext {
2488 4 : let mut schedule_context = ScheduleContext::default();
2489 20 : for shard in shards {
2490 16 : schedule_context.avoid(&shard.intent.all_pageservers());
2491 16 : }
2492 4 : schedule_context
2493 4 : }
2494 :
2495 1 : let schedule_context = make_schedule_context(&shards);
2496 1 : let optimization_a_prepare =
2497 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context);
2498 1 : assert_eq!(
2499 1 : optimization_a_prepare,
2500 1 : Some(ScheduleOptimization {
2501 1 : sequence: shards[1].sequence,
2502 1 : action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
2503 1 : })
2504 1 : );
2505 1 : shards[1].apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
2506 1 :
2507 1 : let schedule_context = make_schedule_context(&shards);
2508 1 : let optimization_a_migrate =
2509 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context);
2510 1 : assert_eq!(
2511 1 : optimization_a_migrate,
2512 1 : Some(ScheduleOptimization {
2513 1 : sequence: shards[1].sequence,
2514 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2515 1 : old_attached_node_id: NodeId(2),
2516 1 : new_attached_node_id: NodeId(1)
2517 1 : })
2518 1 : })
2519 1 : );
2520 1 : shards[1].apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
2521 1 :
2522 1 : let schedule_context = make_schedule_context(&shards);
2523 1 : let optimization_a_cleanup =
2524 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context);
2525 1 : assert_eq!(
2526 1 : optimization_a_cleanup,
2527 1 : Some(ScheduleOptimization {
2528 1 : sequence: shards[1].sequence,
2529 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
2530 1 : })
2531 1 : );
2532 1 : shards[1].apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
2533 1 :
2534 1 : // Everything should be stable now
2535 1 : let schedule_context = make_schedule_context(&shards);
2536 1 : assert_eq!(
2537 1 : shards[0].optimize_attachment(&mut scheduler, &schedule_context),
2538 1 : None
2539 1 : );
2540 1 : assert_eq!(
2541 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context),
2542 1 : None
2543 1 : );
2544 1 : assert_eq!(
2545 1 : shards[2].optimize_attachment(&mut scheduler, &schedule_context),
2546 1 : None
2547 1 : );
2548 1 : assert_eq!(
2549 1 : shards[3].optimize_attachment(&mut scheduler, &schedule_context),
2550 1 : None
2551 1 : );
2552 :
2553 5 : for mut shard in shards {
2554 4 : shard.intent.clear(&mut scheduler);
2555 4 : }
2556 :
2557 1 : Ok(())
2558 1 : }
2559 :
2560 : #[test]
2561 1 : fn optimize_secondary() -> anyhow::Result<()> {
2562 1 : let nodes = make_test_nodes(4, &[]);
2563 1 : let mut scheduler = Scheduler::new(nodes.values());
2564 1 :
2565 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2566 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
2567 1 :
2568 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
2569 1 : // on different nodes.
2570 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
2571 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
2572 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
2573 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
2574 1 :
2575 1 : let mut schedule_context = ScheduleContext::default();
2576 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
2577 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
2578 1 :
2579 1 : let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);
2580 1 :
2581 1 : // Since there is a node with no locations available, the node with two locations for the
2582 1 : // same tenant should generate an optimization to move one away
2583 1 : assert_eq!(
2584 1 : optimization_a,
2585 1 : Some(ScheduleOptimization {
2586 1 : sequence: shard_a.sequence,
2587 1 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
2588 1 : old_node_id: NodeId(3),
2589 1 : new_node_id: NodeId(4)
2590 1 : })
2591 1 : })
2592 1 : );
2593 :
2594 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
2595 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
2596 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
2597 :
2598 1 : shard_a.intent.clear(&mut scheduler);
2599 1 : shard_b.intent.clear(&mut scheduler);
2600 1 :
2601 1 : Ok(())
2602 1 : }
2603 :
2604 : /// Test how the optimisation code behaves with an extra secondary
2605 : #[test]
2606 1 : fn optimize_removes_secondary() -> anyhow::Result<()> {
2607 1 : let az_a_tag = AvailabilityZone("az-a".to_string());
2608 1 : let az_b_tag = AvailabilityZone("az-b".to_string());
2609 1 : let mut nodes = make_test_nodes(
2610 1 : 4,
2611 1 : &[
2612 1 : az_a_tag.clone(),
2613 1 : az_b_tag.clone(),
2614 1 : az_a_tag.clone(),
2615 1 : az_b_tag.clone(),
2616 1 : ],
2617 1 : );
2618 1 : let mut scheduler = Scheduler::new(nodes.values());
2619 1 :
2620 1 : let mut schedule_context = ScheduleContext::default();
2621 1 :
2622 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2623 1 : shard_a.intent.preferred_az_id = Some(az_a_tag.clone());
2624 1 : shard_a
2625 1 : .schedule(&mut scheduler, &mut schedule_context)
2626 1 : .unwrap();
2627 1 :
2628 1 : // Attached on node 1, secondary on node 2
2629 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
2630 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(2)]);
2631 :
2632 : // Initially optimiser is idle
2633 1 : assert_eq!(
2634 1 : shard_a.optimize_attachment(&mut scheduler, &schedule_context),
2635 1 : None
2636 1 : );
2637 1 : assert_eq!(
2638 1 : shard_a.optimize_secondary(&mut scheduler, &schedule_context),
2639 1 : None
2640 1 : );
2641 :
2642 : // A spare secondary in the home AZ: it should be removed -- this is the situation when we're midway through a graceful migration, after cutting over
2643 : // to our new location
2644 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
2645 1 : let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2646 1 : assert_eq!(
2647 1 : optimization,
2648 1 : Some(ScheduleOptimization {
2649 1 : sequence: shard_a.sequence,
2650 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
2651 1 : })
2652 1 : );
2653 1 : shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
2654 1 :
2655 1 : // A spare secondary in the non-home AZ, and one of them is offline
2656 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(4));
2657 1 : nodes
2658 1 : .get_mut(&NodeId(4))
2659 1 : .unwrap()
2660 1 : .set_availability(NodeAvailability::Offline);
2661 1 : scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
2662 1 : let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2663 1 : assert_eq!(
2664 1 : optimization,
2665 1 : Some(ScheduleOptimization {
2666 1 : sequence: shard_a.sequence,
2667 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(4))
2668 1 : })
2669 1 : );
2670 1 : shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
2671 1 :
2672 1 : // A spare secondary when should have none
2673 1 : shard_a.policy = PlacementPolicy::Attached(0);
2674 1 : let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2675 1 : assert_eq!(
2676 1 : optimization,
2677 1 : Some(ScheduleOptimization {
2678 1 : sequence: shard_a.sequence,
2679 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
2680 1 : })
2681 1 : );
2682 1 : shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
2683 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
2684 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![]);
2685 :
2686 : // Check that in secondary mode, we preserve the secondary in the preferred AZ
2687 1 : let mut schedule_context = ScheduleContext::default(); // Fresh context, we're about to call schedule()
2688 1 : shard_a.policy = PlacementPolicy::Secondary;
2689 1 : shard_a
2690 1 : .schedule(&mut scheduler, &mut schedule_context)
2691 1 : .unwrap();
2692 1 : assert_eq!(shard_a.intent.get_attached(), &None);
2693 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
2694 1 : assert_eq!(
2695 1 : shard_a.optimize_attachment(&mut scheduler, &schedule_context),
2696 1 : None
2697 1 : );
2698 1 : assert_eq!(
2699 1 : shard_a.optimize_secondary(&mut scheduler, &schedule_context),
2700 1 : None
2701 1 : );
2702 :
2703 1 : shard_a.intent.clear(&mut scheduler);
2704 1 :
2705 1 : Ok(())
2706 1 : }
2707 :
2708 : // Optimize til quiescent: this emulates what Service::optimize_all does, when
2709 : // called repeatedly in the background.
2710 : // Returns the applied optimizations
2711 3 : fn optimize_til_idle(
2712 3 : scheduler: &mut Scheduler,
2713 3 : shards: &mut [TenantShard],
2714 3 : ) -> Vec<ScheduleOptimization> {
2715 3 : let mut loop_n = 0;
2716 3 : let mut optimizations = Vec::default();
2717 : loop {
2718 6 : let mut schedule_context = ScheduleContext::default();
2719 6 : let mut any_changed = false;
2720 :
2721 24 : for shard in shards.iter() {
2722 24 : schedule_context.avoid(&shard.intent.all_pageservers());
2723 24 : }
2724 :
2725 15 : for shard in shards.iter_mut() {
2726 15 : let optimization = shard.optimize_attachment(scheduler, &schedule_context);
2727 15 : tracing::info!(
2728 0 : "optimize_attachment({})={:?}",
2729 : shard.tenant_shard_id,
2730 : optimization
2731 : );
2732 15 : if let Some(optimization) = optimization {
2733 : // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
2734 3 : assert!(shard.maybe_optimizable(scheduler, &schedule_context));
2735 3 : optimizations.push(optimization.clone());
2736 3 : shard.apply_optimization(scheduler, optimization);
2737 3 : any_changed = true;
2738 3 : break;
2739 12 : }
2740 12 :
2741 12 : let optimization = shard.optimize_secondary(scheduler, &schedule_context);
2742 12 : tracing::info!(
2743 0 : "optimize_secondary({})={:?}",
2744 : shard.tenant_shard_id,
2745 : optimization
2746 : );
2747 12 : if let Some(optimization) = optimization {
2748 : // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
2749 0 : assert!(shard.maybe_optimizable(scheduler, &schedule_context));
2750 :
2751 0 : optimizations.push(optimization.clone());
2752 0 : shard.apply_optimization(scheduler, optimization);
2753 0 : any_changed = true;
2754 0 : break;
2755 12 : }
2756 : }
2757 :
2758 6 : if !any_changed {
2759 3 : break;
2760 3 : }
2761 3 :
2762 3 : // Assert no infinite loop
2763 3 : loop_n += 1;
2764 3 : assert!(loop_n < 1000);
2765 : }
2766 :
2767 3 : optimizations
2768 3 : }
2769 :
2770 : /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
2771 : /// that it converges.
2772 : #[test]
2773 1 : fn optimize_add_nodes() -> anyhow::Result<()> {
2774 1 : let nodes = make_test_nodes(
2775 1 : 9,
2776 1 : &[
2777 1 : // Initial 6 nodes
2778 1 : AvailabilityZone("az-a".to_string()),
2779 1 : AvailabilityZone("az-a".to_string()),
2780 1 : AvailabilityZone("az-b".to_string()),
2781 1 : AvailabilityZone("az-b".to_string()),
2782 1 : AvailabilityZone("az-c".to_string()),
2783 1 : AvailabilityZone("az-c".to_string()),
2784 1 : // Three we will add later
2785 1 : AvailabilityZone("az-a".to_string()),
2786 1 : AvailabilityZone("az-b".to_string()),
2787 1 : AvailabilityZone("az-c".to_string()),
2788 1 : ],
2789 1 : );
2790 1 :
2791 1 : // Only show the scheduler two nodes in each AZ to start with
2792 1 : let mut scheduler = Scheduler::new([].iter());
2793 7 : for i in 1..=6 {
2794 6 : scheduler.node_upsert(nodes.get(&NodeId(i)).unwrap());
2795 6 : }
2796 :
2797 1 : let mut shards = make_test_tenant(
2798 1 : PlacementPolicy::Attached(1),
2799 1 : ShardCount::new(4),
2800 1 : Some(AvailabilityZone("az-a".to_string())),
2801 1 : );
2802 1 : let mut schedule_context = ScheduleContext::default();
2803 5 : for shard in &mut shards {
2804 4 : assert!(
2805 4 : shard
2806 4 : .schedule(&mut scheduler, &mut schedule_context)
2807 4 : .is_ok()
2808 4 : );
2809 : }
2810 :
2811 : // Initial: attached locations land in the tenant's home AZ.
2812 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
2813 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
2814 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
2815 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
2816 :
2817 : // Initial: secondary locations in a remote AZ
2818 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
2819 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
2820 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
2821 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
2822 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
2823 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
2824 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
2825 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
2826 :
2827 : // Add another three nodes: we should see the shards spread out when their optimize
2828 : // methods are called
2829 1 : scheduler.node_upsert(nodes.get(&NodeId(7)).unwrap());
2830 1 : scheduler.node_upsert(nodes.get(&NodeId(8)).unwrap());
2831 1 : scheduler.node_upsert(nodes.get(&NodeId(9)).unwrap());
2832 1 : optimize_til_idle(&mut scheduler, &mut shards);
2833 1 :
2834 1 : // We expect one attached location was moved to the new node in the tenant's home AZ
2835 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(7)), 1);
2836 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(7)), 1);
2837 : // The original node has one less attached shard
2838 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
2839 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
2840 :
2841 : // One of the original nodes still has two attachments, since there are an odd number of nodes
2842 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
2843 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
2844 :
2845 : // None of our secondaries moved, since we already had enough nodes for those to be
2846 : // scheduled perfectly
2847 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
2848 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
2849 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
2850 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
2851 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
2852 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
2853 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
2854 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
2855 :
2856 4 : for shard in shards.iter_mut() {
2857 4 : shard.intent.clear(&mut scheduler);
2858 4 : }
2859 :
2860 1 : Ok(())
2861 1 : }
2862 :
2863 : /// Test that initial shard scheduling is optimal. By optimal we mean
2864 : /// that the optimizer cannot find a way to improve it.
2865 : ///
2866 : /// This test is an example of the scheduling issue described in
2867 : /// https://github.com/neondatabase/neon/issues/8969
2868 : #[test]
2869 1 : fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
2870 : use itertools::Itertools;
2871 :
2872 1 : let nodes = make_test_nodes(2, &[]);
2873 1 :
2874 1 : let mut scheduler = Scheduler::new([].iter());
2875 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
2876 1 : scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
2877 1 :
2878 1 : let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
2879 1 : let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
2880 1 :
2881 1 : let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
2882 1 : let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
2883 1 :
2884 4 : let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
2885 4 : let b_shards_with_context = b.iter_mut().map(|shard| (shard, b_context.clone()));
2886 1 :
2887 1 : let schedule_order = a_shards_with_context.interleave(b_shards_with_context);
2888 :
2889 9 : for (shard, context) in schedule_order {
2890 8 : let context = &mut *context.borrow_mut();
2891 8 : shard.schedule(&mut scheduler, context).unwrap();
2892 8 : }
2893 :
2894 1 : let applied_to_a = optimize_til_idle(&mut scheduler, &mut a);
2895 1 : assert_eq!(applied_to_a, vec![]);
2896 :
2897 1 : let applied_to_b = optimize_til_idle(&mut scheduler, &mut b);
2898 1 : assert_eq!(applied_to_b, vec![]);
2899 :
2900 8 : for shard in a.iter_mut().chain(b.iter_mut()) {
2901 8 : shard.intent.clear(&mut scheduler);
2902 8 : }
2903 :
2904 1 : Ok(())
2905 1 : }
2906 :
2907 : #[test]
2908 1 : fn random_az_shard_scheduling() -> anyhow::Result<()> {
2909 : use rand::seq::SliceRandom;
2910 :
2911 51 : for seed in 0..50 {
2912 50 : eprintln!("Running test with seed {seed}");
2913 50 : let mut rng = StdRng::seed_from_u64(seed);
2914 50 :
2915 50 : let az_a_tag = AvailabilityZone("az-a".to_string());
2916 50 : let az_b_tag = AvailabilityZone("az-b".to_string());
2917 50 : let azs = [az_a_tag, az_b_tag];
2918 50 : let nodes = make_test_nodes(4, &azs);
2919 50 : let mut shards_per_az: HashMap<AvailabilityZone, u32> = HashMap::new();
2920 50 :
2921 50 : let mut scheduler = Scheduler::new([].iter());
2922 200 : for node in nodes.values() {
2923 200 : scheduler.node_upsert(node);
2924 200 : }
2925 :
2926 50 : let mut shards = Vec::default();
2927 50 : let mut contexts = Vec::default();
2928 50 : let mut az_picker = azs.iter().cycle().cloned();
2929 5050 : for i in 0..100 {
2930 5000 : let az = az_picker.next().unwrap();
2931 5000 : let shard_count = i % 4 + 1;
2932 5000 : *shards_per_az.entry(az.clone()).or_default() += shard_count;
2933 5000 :
2934 5000 : let tenant_shards = make_test_tenant(
2935 5000 : PlacementPolicy::Attached(1),
2936 5000 : ShardCount::new(shard_count.try_into().unwrap()),
2937 5000 : Some(az),
2938 5000 : );
2939 5000 : let context = Rc::new(RefCell::new(ScheduleContext::default()));
2940 5000 :
2941 5000 : contexts.push(context.clone());
2942 5000 : let with_ctx = tenant_shards
2943 5000 : .into_iter()
2944 12500 : .map(|shard| (shard, context.clone()));
2945 17500 : for shard_with_ctx in with_ctx {
2946 12500 : shards.push(shard_with_ctx);
2947 12500 : }
2948 : }
2949 :
2950 50 : shards.shuffle(&mut rng);
2951 :
2952 : #[derive(Default, Debug)]
2953 : struct NodeStats {
2954 : attachments: u32,
2955 : secondaries: u32,
2956 : }
2957 :
2958 50 : let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::default();
2959 50 : let mut attachments_in_wrong_az = 0;
2960 50 : let mut secondaries_in_wrong_az = 0;
2961 :
2962 12550 : for (shard, context) in &mut shards {
2963 12500 : let context = &mut *context.borrow_mut();
2964 12500 : shard.schedule(&mut scheduler, context).unwrap();
2965 12500 :
2966 12500 : let attached_node = shard.intent.get_attached().unwrap();
2967 12500 : let stats = node_stats.entry(attached_node).or_default();
2968 12500 : stats.attachments += 1;
2969 12500 :
2970 12500 : let secondary_node = *shard.intent.get_secondary().first().unwrap();
2971 12500 : let stats = node_stats.entry(secondary_node).or_default();
2972 12500 : stats.secondaries += 1;
2973 12500 :
2974 12500 : let attached_node_az = nodes
2975 12500 : .get(&attached_node)
2976 12500 : .unwrap()
2977 12500 : .get_availability_zone_id();
2978 12500 : let secondary_node_az = nodes
2979 12500 : .get(&secondary_node)
2980 12500 : .unwrap()
2981 12500 : .get_availability_zone_id();
2982 12500 : let preferred_az = shard.preferred_az().unwrap();
2983 12500 :
2984 12500 : if attached_node_az != preferred_az {
2985 0 : eprintln!(
2986 0 : "{} attachment was scheduled in AZ {} but preferred AZ {}",
2987 0 : shard.tenant_shard_id, attached_node_az, preferred_az
2988 0 : );
2989 0 : attachments_in_wrong_az += 1;
2990 12500 : }
2991 :
2992 12500 : if secondary_node_az == preferred_az {
2993 0 : eprintln!(
2994 0 : "{} secondary was scheduled in AZ {} which matches preference",
2995 0 : shard.tenant_shard_id, attached_node_az
2996 0 : );
2997 0 : secondaries_in_wrong_az += 1;
2998 12500 : }
2999 : }
3000 :
3001 50 : let mut violations = Vec::default();
3002 50 :
3003 50 : if attachments_in_wrong_az > 0 {
3004 0 : violations.push(format!(
3005 0 : "{} attachments scheduled to the incorrect AZ",
3006 0 : attachments_in_wrong_az
3007 0 : ));
3008 50 : }
3009 :
3010 50 : if secondaries_in_wrong_az > 0 {
3011 0 : violations.push(format!(
3012 0 : "{} secondaries scheduled to the incorrect AZ",
3013 0 : secondaries_in_wrong_az
3014 0 : ));
3015 50 : }
3016 :
3017 50 : eprintln!(
3018 50 : "attachments_in_wrong_az={} secondaries_in_wrong_az={}",
3019 50 : attachments_in_wrong_az, secondaries_in_wrong_az
3020 50 : );
3021 :
3022 250 : for (node_id, stats) in &node_stats {
3023 200 : let node_az = nodes.get(node_id).unwrap().get_availability_zone_id();
3024 200 : let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2;
3025 200 : let allowed_attachment_load =
3026 200 : (ideal_attachment_load - 1)..(ideal_attachment_load + 2);
3027 200 :
3028 200 : if !allowed_attachment_load.contains(&stats.attachments) {
3029 0 : violations.push(format!(
3030 0 : "Found {} attachments on node {}, but expected {}",
3031 0 : stats.attachments, node_id, ideal_attachment_load
3032 0 : ));
3033 200 : }
3034 :
3035 200 : eprintln!(
3036 200 : "{}: attachments={} secondaries={} ideal_attachment_load={}",
3037 200 : node_id, stats.attachments, stats.secondaries, ideal_attachment_load
3038 200 : );
3039 : }
3040 :
3041 50 : assert!(violations.is_empty(), "{violations:?}");
3042 :
3043 12550 : for (mut shard, _ctx) in shards {
3044 12500 : shard.intent.clear(&mut scheduler);
3045 12500 : }
3046 : }
3047 1 : Ok(())
3048 1 : }
3049 :
3050 : /// Check how the shard's scheduling behaves when in PlacementPolicy::Secondary mode.
3051 : #[test]
3052 1 : fn tenant_secondary_scheduling() -> anyhow::Result<()> {
3053 1 : let az_a = AvailabilityZone("az-a".to_string());
3054 1 : let nodes = make_test_nodes(
3055 1 : 3,
3056 1 : &[
3057 1 : az_a.clone(),
3058 1 : AvailabilityZone("az-b".to_string()),
3059 1 : AvailabilityZone("az-c".to_string()),
3060 1 : ],
3061 1 : );
3062 1 :
3063 1 : let mut scheduler = Scheduler::new(nodes.values());
3064 1 : let mut context = ScheduleContext::default();
3065 1 :
3066 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Secondary);
3067 1 : tenant_shard.intent.preferred_az_id = Some(az_a.clone());
3068 1 : tenant_shard
3069 1 : .schedule(&mut scheduler, &mut context)
3070 1 : .expect("we have enough nodes, scheduling should work");
3071 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
3072 1 : assert!(tenant_shard.intent.attached.is_none());
3073 :
3074 : // Should have scheduled into the preferred AZ
3075 1 : assert_eq!(
3076 1 : scheduler
3077 1 : .get_node_az(&tenant_shard.intent.secondary[0])
3078 1 : .as_ref(),
3079 1 : tenant_shard.preferred_az()
3080 1 : );
3081 :
3082 : // Optimizer should agree
3083 1 : assert_eq!(
3084 1 : tenant_shard.optimize_attachment(&mut scheduler, &context),
3085 1 : None
3086 1 : );
3087 1 : assert_eq!(
3088 1 : tenant_shard.optimize_secondary(&mut scheduler, &context),
3089 1 : None
3090 1 : );
3091 :
3092 : // Switch to PlacementPolicy::Attached
3093 1 : tenant_shard.policy = PlacementPolicy::Attached(1);
3094 1 : tenant_shard
3095 1 : .schedule(&mut scheduler, &mut context)
3096 1 : .expect("we have enough nodes, scheduling should work");
3097 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
3098 1 : assert!(tenant_shard.intent.attached.is_some());
3099 : // Secondary should now be in non-preferred AZ
3100 1 : assert_ne!(
3101 1 : scheduler
3102 1 : .get_node_az(&tenant_shard.intent.secondary[0])
3103 1 : .as_ref(),
3104 1 : tenant_shard.preferred_az()
3105 1 : );
3106 : // Attached should be in preferred AZ
3107 1 : assert_eq!(
3108 1 : scheduler
3109 1 : .get_node_az(&tenant_shard.intent.attached.unwrap())
3110 1 : .as_ref(),
3111 1 : tenant_shard.preferred_az()
3112 1 : );
3113 :
3114 : // Optimizer should agree
3115 1 : assert_eq!(
3116 1 : tenant_shard.optimize_attachment(&mut scheduler, &context),
3117 1 : None
3118 1 : );
3119 1 : assert_eq!(
3120 1 : tenant_shard.optimize_secondary(&mut scheduler, &context),
3121 1 : None
3122 1 : );
3123 :
3124 : // Switch back to PlacementPolicy::Secondary
3125 1 : tenant_shard.policy = PlacementPolicy::Secondary;
3126 1 : tenant_shard
3127 1 : .schedule(&mut scheduler, &mut context)
3128 1 : .expect("we have enough nodes, scheduling should work");
3129 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
3130 1 : assert!(tenant_shard.intent.attached.is_none());
3131 : // When we picked a location to keep, we should have kept the one in the preferred AZ
3132 1 : assert_eq!(
3133 1 : scheduler
3134 1 : .get_node_az(&tenant_shard.intent.secondary[0])
3135 1 : .as_ref(),
3136 1 : tenant_shard.preferred_az()
3137 1 : );
3138 :
3139 : // Optimizer should agree
3140 1 : assert_eq!(
3141 1 : tenant_shard.optimize_attachment(&mut scheduler, &context),
3142 1 : None
3143 1 : );
3144 1 : assert_eq!(
3145 1 : tenant_shard.optimize_secondary(&mut scheduler, &context),
3146 1 : None
3147 1 : );
3148 :
3149 1 : tenant_shard.intent.clear(&mut scheduler);
3150 1 :
3151 1 : Ok(())
3152 1 : }
3153 : }
|