Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::sync::Arc;
3 : use std::time::Duration;
4 :
5 : use futures::future::{self, Either};
6 : use itertools::Itertools;
7 : use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy, ShardSchedulingPolicy};
8 : use pageserver_api::models::{LocationConfig, LocationConfigMode, TenantConfig};
9 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
10 : use serde::{Deserialize, Serialize};
11 : use tokio::task::JoinHandle;
12 : use tokio_util::sync::CancellationToken;
13 : use tracing::{Instrument, instrument};
14 : use utils::generation::Generation;
15 : use utils::id::NodeId;
16 : use utils::seqwait::{SeqWait, SeqWaitError};
17 : use utils::shard::ShardCount;
18 : use utils::sync::gate::GateGuard;
19 :
20 : use crate::compute_hook::ComputeHook;
21 : use crate::metrics::{
22 : self, ReconcileCompleteLabelGroup, ReconcileLongRunningLabelGroup, ReconcileOutcome,
23 : };
24 : use crate::node::Node;
25 : use crate::persistence::split_state::SplitState;
26 : use crate::persistence::{Persistence, TenantShardPersistence};
27 : use crate::reconciler::{
28 : ReconcileError, ReconcileUnits, Reconciler, ReconcilerConfig, TargetState,
29 : attached_location_conf, secondary_location_conf,
30 : };
31 : use crate::scheduler::{
32 : AffinityScore, AttachedShardTag, NodeSchedulingScore, NodeSecondarySchedulingScore,
33 : RefCountUpdate, ScheduleContext, ScheduleError, Scheduler, SecondaryShardTag, ShardTag,
34 : };
35 : use crate::service::ReconcileResultRequest;
36 : use crate::timeline_import::TimelineImportState;
37 : use crate::{Sequence, service};
38 :
39 : /// Serialization helper
40 0 : fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
41 0 : where
42 0 : S: serde::ser::Serializer,
43 0 : T: std::fmt::Display,
44 0 : {
45 0 : serializer.collect_str(
46 0 : &v.lock()
47 0 : .unwrap()
48 0 : .as_ref()
49 0 : .map(|e| format!("{e}"))
50 0 : .unwrap_or("".to_string()),
51 0 : )
52 0 : }
53 :
54 : /// In-memory state for a particular tenant shard.
55 : ///
56 : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
57 : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
58 0 : #[derive(Serialize)]
59 : pub(crate) struct TenantShard {
60 : pub(crate) tenant_shard_id: TenantShardId,
61 :
62 : pub(crate) shard: ShardIdentity,
63 :
64 : // Runtime only: sequence used to coordinate when updating this object while
65 : // with background reconcilers may be running. A reconciler runs to a particular
66 : // sequence.
67 : pub(crate) sequence: Sequence,
68 :
69 : // Latest generation number: next time we attach, increment this
70 : // and use the incremented number when attaching.
71 : //
72 : // None represents an incompletely onboarded tenant via the [`Service::location_config`]
73 : // API, where this tenant may only run in PlacementPolicy::Secondary.
74 : pub(crate) generation: Option<Generation>,
75 :
76 : // High level description of how the tenant should be set up. Provided
77 : // externally.
78 : pub(crate) policy: PlacementPolicy,
79 :
80 : // Low level description of exactly which pageservers should fulfil
81 : // which role. Generated by `Self::schedule`.
82 : pub(crate) intent: IntentState,
83 :
84 : // Low level description of how the tenant is configured on pageservers:
85 : // if this does not match `Self::intent` then the tenant needs reconciliation
86 : // with `Self::reconcile`.
87 : pub(crate) observed: ObservedState,
88 :
89 : // Tenant configuration, passed through opaquely to the pageserver. Identical
90 : // for all shards in a tenant.
91 : pub(crate) config: TenantConfig,
92 :
93 : /// If a reconcile task is currently in flight, it may be joined here (it is
94 : /// only safe to join if either the result has been received or the reconciler's
95 : /// cancellation token has been fired)
96 : #[serde(skip)]
97 : pub(crate) reconciler: Option<ReconcilerHandle>,
98 :
99 : /// If a tenant is being split, then all shards with that TenantId will have a
100 : /// SplitState set, this acts as a guard against other operations such as background
101 : /// reconciliation, and timeline creation.
102 : pub(crate) splitting: SplitState,
103 :
104 : /// Flag indicating whether the tenant has an in-progress timeline import.
105 : /// Used to disallow shard splits while an import is in progress.
106 : pub(crate) importing: TimelineImportState,
107 :
108 : /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
109 : /// is set. This flag is cleared when the tenant is popped off the delay queue.
110 : pub(crate) delayed_reconcile: bool,
111 :
112 : /// Optionally wait for reconciliation to complete up to a particular
113 : /// sequence number.
114 : #[serde(skip)]
115 : pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
116 :
117 : /// Indicates sequence number for which we have encountered an error reconciling. If
118 : /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
119 : /// and callers should stop waiting for `waiter` and propagate the error.
120 : #[serde(skip)]
121 : pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
122 :
123 : /// The most recent error from a reconcile on this tenant. This is a nested Arc
124 : /// because:
125 : /// - ReconcileWaiters need to Arc-clone the overall object to read it later
126 : /// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
127 : /// many waiters for one shard, and the underlying error types are not Clone.
128 : ///
129 : /// TODO: generalize to an array of recent events
130 : /// TOOD: use a ArcSwap instead of mutex for faster reads?
131 : #[serde(serialize_with = "read_last_error")]
132 : pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
133 :
134 : /// If we have a pending compute notification that for some reason we weren't able to send,
135 : /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
136 : /// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
137 : /// of state that we publish externally in an eventually consistent way.
138 : pub(crate) pending_compute_notification: bool,
139 :
140 : /// To do a graceful migration, set this field to the destination pageserver, and optimization
141 : /// functions will consider this node the best location and react appropriately.
142 : preferred_node: Option<NodeId>,
143 :
144 : // Support/debug tool: if something is going wrong or flapping with scheduling, this may
145 : // be set to a non-active state to avoid making changes while the issue is fixed.
146 : scheduling_policy: ShardSchedulingPolicy,
147 : }
148 :
149 : #[derive(Clone, Debug, Serialize)]
150 : pub(crate) struct IntentState {
151 : attached: Option<NodeId>,
152 : secondary: Vec<NodeId>,
153 :
154 : // We should attempt to schedule this shard in the provided AZ to
155 : // decrease chances of cross-AZ compute.
156 : preferred_az_id: Option<AvailabilityZone>,
157 : }
158 :
159 : impl IntentState {
160 12860 : pub(crate) fn new(preferred_az_id: Option<AvailabilityZone>) -> Self {
161 12860 : Self {
162 12860 : attached: None,
163 12860 : secondary: vec![],
164 12860 : preferred_az_id,
165 12860 : }
166 12860 : }
167 0 : pub(crate) fn single(
168 0 : scheduler: &mut Scheduler,
169 0 : node_id: Option<NodeId>,
170 0 : preferred_az_id: Option<AvailabilityZone>,
171 0 : ) -> Self {
172 0 : if let Some(node_id) = node_id {
173 0 : scheduler.update_node_ref_counts(
174 0 : node_id,
175 0 : preferred_az_id.as_ref(),
176 0 : RefCountUpdate::Attach,
177 0 : );
178 0 : }
179 0 : Self {
180 0 : attached: node_id,
181 0 : secondary: vec![],
182 0 : preferred_az_id,
183 0 : }
184 0 : }
185 :
186 12859 : pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
187 12859 : if self.attached != new_attached {
188 12859 : if let Some(old_attached) = self.attached.take() {
189 0 : scheduler.update_node_ref_counts(
190 0 : old_attached,
191 0 : self.preferred_az_id.as_ref(),
192 0 : RefCountUpdate::Detach,
193 0 : );
194 12859 : }
195 12859 : if let Some(new_attached) = &new_attached {
196 12859 : scheduler.update_node_ref_counts(
197 12859 : *new_attached,
198 12859 : self.preferred_az_id.as_ref(),
199 12859 : RefCountUpdate::Attach,
200 12859 : );
201 12859 : }
202 12859 : self.attached = new_attached;
203 0 : }
204 :
205 12859 : if let Some(new_attached) = &new_attached {
206 12859 : assert!(!self.secondary.contains(new_attached));
207 0 : }
208 12859 : }
209 :
210 : /// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
211 : /// secondary to attached while maintaining the scheduler's reference counts.
212 7 : pub(crate) fn promote_attached(
213 7 : &mut self,
214 7 : scheduler: &mut Scheduler,
215 7 : promote_secondary: NodeId,
216 7 : ) {
217 7 : // If we call this with a node that isn't in secondary, it would cause incorrect
218 7 : // scheduler reference counting, since we assume the node is already referenced as a secondary.
219 7 : debug_assert!(self.secondary.contains(&promote_secondary));
220 :
221 16 : self.secondary.retain(|n| n != &promote_secondary);
222 7 :
223 7 : let demoted = self.attached;
224 7 : self.attached = Some(promote_secondary);
225 7 :
226 7 : scheduler.update_node_ref_counts(
227 7 : promote_secondary,
228 7 : self.preferred_az_id.as_ref(),
229 7 : RefCountUpdate::PromoteSecondary,
230 7 : );
231 7 : if let Some(demoted) = demoted {
232 0 : scheduler.update_node_ref_counts(
233 0 : demoted,
234 0 : self.preferred_az_id.as_ref(),
235 0 : RefCountUpdate::DemoteAttached,
236 0 : );
237 7 : }
238 7 : }
239 :
240 12848 : pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
241 12848 : assert!(!self.secondary.contains(&new_secondary));
242 12848 : assert!(self.attached != Some(new_secondary));
243 12848 : scheduler.update_node_ref_counts(
244 12848 : new_secondary,
245 12848 : self.preferred_az_id.as_ref(),
246 12848 : RefCountUpdate::AddSecondary,
247 12848 : );
248 12848 : self.secondary.push(new_secondary);
249 12848 : }
250 :
251 : /// It is legal to call this with a node that is not currently a secondary: that is a no-op
252 9 : pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
253 13 : let index = self.secondary.iter().position(|n| *n == node_id);
254 9 : if let Some(index) = index {
255 9 : scheduler.update_node_ref_counts(
256 9 : node_id,
257 9 : self.preferred_az_id.as_ref(),
258 9 : RefCountUpdate::RemoveSecondary,
259 9 : );
260 9 : self.secondary.remove(index);
261 9 : }
262 9 : }
263 :
264 12859 : pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
265 12859 : for secondary in self.secondary.drain(..) {
266 12840 : scheduler.update_node_ref_counts(
267 12840 : secondary,
268 12840 : self.preferred_az_id.as_ref(),
269 12840 : RefCountUpdate::RemoveSecondary,
270 12840 : );
271 12840 : }
272 12859 : }
273 :
274 : /// Remove the last secondary node from the list of secondaries
275 0 : pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
276 0 : if let Some(node_id) = self.secondary.pop() {
277 0 : scheduler.update_node_ref_counts(
278 0 : node_id,
279 0 : self.preferred_az_id.as_ref(),
280 0 : RefCountUpdate::RemoveSecondary,
281 0 : );
282 0 : }
283 0 : }
284 :
285 12859 : pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
286 12859 : if let Some(old_attached) = self.attached.take() {
287 12857 : scheduler.update_node_ref_counts(
288 12857 : old_attached,
289 12857 : self.preferred_az_id.as_ref(),
290 12857 : RefCountUpdate::Detach,
291 12857 : );
292 12857 : }
293 :
294 12859 : self.clear_secondary(scheduler);
295 12859 : }
296 :
297 12901 : pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
298 12901 : let mut result = Vec::new();
299 12901 : if let Some(p) = self.attached {
300 12896 : result.push(p)
301 5 : }
302 :
303 12901 : result.extend(self.secondary.iter().copied());
304 12901 :
305 12901 : result
306 12901 : }
307 :
308 12624 : pub(crate) fn get_attached(&self) -> &Option<NodeId> {
309 12624 : &self.attached
310 12624 : }
311 :
312 12706 : pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
313 12706 : &self.secondary
314 12706 : }
315 :
316 : /// If the node is in use as the attached location, demote it into
317 : /// the list of secondary locations. This is used when a node goes offline,
318 : /// and we want to use a different node for attachment, but not permanently
319 : /// forget the location on the offline node.
320 : ///
321 : /// Returns true if a change was made
322 8 : pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
323 8 : if self.attached == Some(node_id) {
324 8 : self.attached = None;
325 8 : self.secondary.push(node_id);
326 8 : scheduler.update_node_ref_counts(
327 8 : node_id,
328 8 : self.preferred_az_id.as_ref(),
329 8 : RefCountUpdate::DemoteAttached,
330 8 : );
331 8 : true
332 : } else {
333 0 : false
334 : }
335 8 : }
336 :
337 2 : pub(crate) fn set_preferred_az(
338 2 : &mut self,
339 2 : scheduler: &mut Scheduler,
340 2 : preferred_az: Option<AvailabilityZone>,
341 2 : ) {
342 2 : let new_az = preferred_az.as_ref();
343 2 : let old_az = self.preferred_az_id.as_ref();
344 2 :
345 2 : if old_az != new_az {
346 2 : if let Some(node_id) = self.attached {
347 2 : scheduler.update_node_ref_counts(
348 2 : node_id,
349 2 : new_az,
350 2 : RefCountUpdate::ChangePreferredAzFrom(old_az),
351 2 : );
352 2 : }
353 4 : for node_id in &self.secondary {
354 2 : scheduler.update_node_ref_counts(
355 2 : *node_id,
356 2 : new_az,
357 2 : RefCountUpdate::ChangePreferredAzFrom(old_az),
358 2 : );
359 2 : }
360 2 : self.preferred_az_id = preferred_az;
361 0 : }
362 2 : }
363 :
364 12508 : pub(crate) fn get_preferred_az(&self) -> Option<&AvailabilityZone> {
365 12508 : self.preferred_az_id.as_ref()
366 12508 : }
367 : }
368 :
369 : impl Drop for IntentState {
370 12860 : fn drop(&mut self) {
371 12860 : // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
372 12860 : // We do not check this while panicking, to avoid polluting unit test failures or
373 12860 : // other assertions with this assertion's output. It's still wrong to leak these,
374 12860 : // but if we already have a panic then we don't need to independently flag this case.
375 12860 : if !(std::thread::panicking()) {
376 12860 : debug_assert!(self.attached.is_none() && self.secondary.is_empty());
377 0 : }
378 12859 : }
379 : }
380 :
381 0 : #[derive(Default, Clone, Serialize, Deserialize, Debug)]
382 : pub(crate) struct ObservedState {
383 : pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
384 : }
385 :
386 : /// Our latest knowledge of how this tenant is configured in the outside world.
387 : ///
388 : /// Meaning:
389 : /// * No instance of this type exists for a node: we are certain that we have nothing configured on that
390 : /// node for this shard.
391 : /// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
392 : /// what it is (e.g. we failed partway through configuring it)
393 : /// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
394 : /// and that configuration will still be present unless something external interfered.
395 0 : #[derive(Clone, Serialize, Deserialize, Debug)]
396 : pub(crate) struct ObservedStateLocation {
397 : /// If None, it means we do not know the status of this shard's location on this node, but
398 : /// we know that we might have some state on this node.
399 : pub(crate) conf: Option<LocationConfig>,
400 : }
401 :
402 : pub(crate) struct ReconcilerWaiter {
403 : // For observability purposes, remember the ID of the shard we're
404 : // waiting for.
405 : pub(crate) tenant_shard_id: TenantShardId,
406 :
407 : seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
408 : error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
409 : error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
410 : seq: Sequence,
411 : }
412 :
413 : pub(crate) enum ReconcilerStatus {
414 : Done,
415 : Failed,
416 : InProgress,
417 : }
418 :
419 : #[derive(thiserror::Error, Debug)]
420 : pub(crate) enum ReconcileWaitError {
421 : #[error("Timeout waiting for shard {0}")]
422 : Timeout(TenantShardId),
423 : #[error("shutting down")]
424 : Shutdown,
425 : #[error("Reconcile error on shard {0}: {1}")]
426 : Failed(TenantShardId, Arc<ReconcileError>),
427 : }
428 :
429 : #[derive(Eq, PartialEq, Debug, Clone)]
430 : pub(crate) struct ReplaceSecondary {
431 : old_node_id: NodeId,
432 : new_node_id: NodeId,
433 : }
434 :
435 : #[derive(Eq, PartialEq, Debug, Clone)]
436 : pub(crate) struct MigrateAttachment {
437 : pub(crate) old_attached_node_id: NodeId,
438 : pub(crate) new_attached_node_id: NodeId,
439 : }
440 :
441 : #[derive(Eq, PartialEq, Debug, Clone)]
442 : pub(crate) enum ScheduleOptimizationAction {
443 : // Replace one of our secondary locations with a different node
444 : ReplaceSecondary(ReplaceSecondary),
445 : // Migrate attachment to an existing secondary location
446 : MigrateAttachment(MigrateAttachment),
447 : // Create a secondary location, with the intent of later migrating to it
448 : CreateSecondary(NodeId),
449 : // Remove a secondary location that we previously created to facilitate a migration
450 : RemoveSecondary(NodeId),
451 : }
452 :
453 : #[derive(Eq, PartialEq, Debug, Clone)]
454 : pub(crate) struct ScheduleOptimization {
455 : // What was the reconcile sequence when we generated this optimization? The optimization
456 : // should only be applied if the shard's sequence is still at this value, in case other changes
457 : // happened between planning the optimization and applying it.
458 : sequence: Sequence,
459 :
460 : pub(crate) action: ScheduleOptimizationAction,
461 : }
462 :
463 : impl ReconcilerWaiter {
464 0 : pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
465 0 : tokio::select! {
466 0 : result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
467 0 : result.map_err(|e| match e {
468 0 : SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
469 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
470 0 : })?;
471 : },
472 0 : result = self.error_seq_wait.wait_for(self.seq) => {
473 0 : result.map_err(|e| match e {
474 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
475 0 : SeqWaitError::Timeout => unreachable!()
476 0 : })?;
477 :
478 0 : return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
479 0 : self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
480 : }
481 : }
482 :
483 0 : Ok(())
484 0 : }
485 :
486 0 : pub(crate) fn get_status(&self) -> ReconcilerStatus {
487 0 : if self.seq_wait.would_wait_for(self.seq).is_ok() {
488 0 : ReconcilerStatus::Done
489 0 : } else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
490 0 : ReconcilerStatus::Failed
491 : } else {
492 0 : ReconcilerStatus::InProgress
493 : }
494 0 : }
495 : }
496 :
497 : /// Having spawned a reconciler task, the tenant shard's state will carry enough
498 : /// information to optionally cancel & await it later.
499 : pub(crate) struct ReconcilerHandle {
500 : sequence: Sequence,
501 : handle: JoinHandle<()>,
502 : cancel: CancellationToken,
503 : }
504 :
505 : pub(crate) enum ReconcileNeeded {
506 : /// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
507 : /// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
508 : No,
509 : /// shard has a reconciler running, and its intent hasn't changed since that one was
510 : /// spawned: wait for the existing reconciler rather than spawning a new one.
511 : WaitExisting(ReconcilerWaiter),
512 : /// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
513 : Yes(ReconcileReason),
514 : }
515 :
516 : #[derive(Debug)]
517 : pub(crate) enum ReconcileReason {
518 : ActiveNodesDirty,
519 : UnknownLocation,
520 : PendingComputeNotification,
521 : }
522 :
523 : /// Pending modification to the observed state of a tenant shard.
524 : /// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
525 : pub(crate) enum ObservedStateDelta {
526 : Upsert(Box<(NodeId, ObservedStateLocation)>),
527 : Delete(NodeId),
528 : }
529 :
530 : impl ObservedStateDelta {
531 0 : pub(crate) fn node_id(&self) -> &NodeId {
532 0 : match self {
533 0 : Self::Upsert(up) => &up.0,
534 0 : Self::Delete(nid) => nid,
535 : }
536 0 : }
537 : }
538 :
539 : /// When a reconcile task completes, it sends this result object
540 : /// to be applied to the primary TenantShard.
541 : pub(crate) struct ReconcileResult {
542 : pub(crate) sequence: Sequence,
543 : /// On errors, `observed` should be treated as an incompleted description
544 : /// of state (i.e. any nodes present in the result should override nodes
545 : /// present in the parent tenant state, but any unmentioned nodes should
546 : /// not be removed from parent tenant state)
547 : pub(crate) result: Result<(), ReconcileError>,
548 :
549 : pub(crate) tenant_shard_id: TenantShardId,
550 : pub(crate) generation: Option<Generation>,
551 : pub(crate) observed_deltas: Vec<ObservedStateDelta>,
552 :
553 : /// Set [`TenantShard::pending_compute_notification`] from this flag
554 : pub(crate) pending_compute_notification: bool,
555 : }
556 :
557 : impl ObservedState {
558 0 : pub(crate) fn new() -> Self {
559 0 : Self {
560 0 : locations: HashMap::new(),
561 0 : }
562 0 : }
563 :
564 0 : pub(crate) fn is_empty(&self) -> bool {
565 0 : self.locations.is_empty()
566 0 : }
567 : }
568 :
569 : impl TenantShard {
570 12843 : pub(crate) fn new(
571 12843 : tenant_shard_id: TenantShardId,
572 12843 : shard: ShardIdentity,
573 12843 : policy: PlacementPolicy,
574 12843 : preferred_az_id: Option<AvailabilityZone>,
575 12843 : ) -> Self {
576 12843 : metrics::METRICS_REGISTRY
577 12843 : .metrics_group
578 12843 : .storage_controller_tenant_shards
579 12843 : .inc();
580 12843 :
581 12843 : Self {
582 12843 : tenant_shard_id,
583 12843 : policy,
584 12843 : intent: IntentState::new(preferred_az_id),
585 12843 : generation: Some(Generation::new(0)),
586 12843 : shard,
587 12843 : observed: ObservedState::default(),
588 12843 : config: TenantConfig::default(),
589 12843 : reconciler: None,
590 12843 : splitting: SplitState::Idle,
591 12843 : importing: TimelineImportState::Idle,
592 12843 : sequence: Sequence(1),
593 12843 : delayed_reconcile: false,
594 12843 : waiter: Arc::new(SeqWait::new(Sequence(0))),
595 12843 : error_waiter: Arc::new(SeqWait::new(Sequence(0))),
596 12843 : last_error: Arc::default(),
597 12843 : pending_compute_notification: false,
598 12843 : scheduling_policy: ShardSchedulingPolicy::default(),
599 12843 : preferred_node: None,
600 12843 : }
601 12843 : }
602 :
603 : /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
604 : /// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
605 : /// to get an intent state that complies with placement policy. The overall goal is to do scheduling
606 : /// in a way that makes use of any configured locations that already exist in the outside world.
607 1 : pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
608 1 : // Choose an attached location by filtering observed locations, and then sorting to get the highest
609 1 : // generation
610 1 : let mut attached_locs = self
611 1 : .observed
612 1 : .locations
613 1 : .iter()
614 2 : .filter_map(|(node_id, l)| {
615 2 : if let Some(conf) = &l.conf {
616 2 : if conf.mode == LocationConfigMode::AttachedMulti
617 1 : || conf.mode == LocationConfigMode::AttachedSingle
618 1 : || conf.mode == LocationConfigMode::AttachedStale
619 : {
620 2 : Some((node_id, conf.generation))
621 : } else {
622 0 : None
623 : }
624 : } else {
625 0 : None
626 : }
627 2 : })
628 1 : .collect::<Vec<_>>();
629 1 :
630 2 : attached_locs.sort_by_key(|i| i.1);
631 1 : if let Some((node_id, _gen)) = attached_locs.into_iter().next_back() {
632 1 : self.intent.set_attached(scheduler, Some(*node_id));
633 1 : }
634 :
635 : // All remaining observed locations generate secondary intents. This includes None
636 : // observations, as these may well have some local content on disk that is usable (this
637 : // is an edge case that might occur if we restarted during a migration or other change)
638 : //
639 : // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
640 : // will take care of promoting one of these secondaries to be attached.
641 2 : self.observed.locations.keys().for_each(|node_id| {
642 2 : if Some(*node_id) != self.intent.attached {
643 1 : self.intent.push_secondary(scheduler, *node_id);
644 1 : }
645 2 : });
646 1 : }
647 :
648 : /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
649 : /// attached pageserver for a shard.
650 : ///
651 : /// Returns whether we modified it, and the NodeId selected.
652 12832 : fn schedule_attached(
653 12832 : &mut self,
654 12832 : scheduler: &mut Scheduler,
655 12832 : context: &ScheduleContext,
656 12832 : ) -> Result<(bool, NodeId), ScheduleError> {
657 : // No work to do if we already have an attached tenant
658 12832 : if let Some(node_id) = self.intent.attached {
659 0 : return Ok((false, node_id));
660 12832 : }
661 :
662 12832 : if let Some(promote_secondary) = self.preferred_secondary(scheduler) {
663 : // Promote a secondary
664 2 : tracing::debug!("Promoted secondary {} to attached", promote_secondary);
665 2 : self.intent.promote_attached(scheduler, promote_secondary);
666 2 : Ok((true, promote_secondary))
667 : } else {
668 : // Pick a fresh node: either we had no secondaries or none were schedulable
669 12830 : let node_id = scheduler.schedule_shard::<AttachedShardTag>(
670 12830 : &self.intent.secondary,
671 12830 : &self.intent.preferred_az_id,
672 12830 : context,
673 12830 : )?;
674 12830 : tracing::debug!("Selected {} as attached", node_id);
675 12830 : self.intent.set_attached(scheduler, Some(node_id));
676 12830 : Ok((true, node_id))
677 : }
678 12832 : }
679 :
680 : #[instrument(skip_all, fields(
681 : tenant_id=%self.tenant_shard_id.tenant_id,
682 : shard_id=%self.tenant_shard_id.shard_slug(),
683 : sequence=%self.sequence
684 : ))]
685 : pub(crate) fn schedule(
686 : &mut self,
687 : scheduler: &mut Scheduler,
688 : context: &mut ScheduleContext,
689 : ) -> Result<(), ScheduleError> {
690 : let r = self.do_schedule(scheduler, context);
691 :
692 : context.avoid(&self.intent.all_pageservers());
693 :
694 : r
695 : }
696 :
697 12836 : pub(crate) fn do_schedule(
698 12836 : &mut self,
699 12836 : scheduler: &mut Scheduler,
700 12836 : context: &ScheduleContext,
701 12836 : ) -> Result<(), ScheduleError> {
702 12836 : // TODO: before scheduling new nodes, check if any existing content in
703 12836 : // self.intent refers to pageservers that are offline, and pick other
704 12836 : // pageservers if so.
705 12836 :
706 12836 : // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
707 12836 : // change their attach location.
708 12836 :
709 12836 : match self.scheduling_policy {
710 12835 : ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
711 : ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
712 : // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
713 1 : tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
714 0 : "Scheduling is disabled by policy {:?}", self.scheduling_policy);
715 1 : return Ok(());
716 : }
717 : }
718 :
719 : // Build the set of pageservers already in use by this tenant, to avoid scheduling
720 : // more work on the same pageservers we're already using.
721 12835 : let mut modified = false;
722 :
723 : // Add/remove nodes to fulfil policy
724 : use PlacementPolicy::*;
725 12835 : match self.policy {
726 12832 : Attached(secondary_count) => {
727 : // Should have exactly one attached, and at least N secondaries
728 12832 : let (modified_attached, attached_node_id) =
729 12832 : self.schedule_attached(scheduler, context)?;
730 12832 : modified |= modified_attached;
731 12832 :
732 12832 : let mut used_pageservers = vec![attached_node_id];
733 25663 : while self.intent.secondary.len() < secondary_count {
734 12831 : let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
735 12831 : &used_pageservers,
736 12831 : &self.intent.preferred_az_id,
737 12831 : context,
738 12831 : )?;
739 12831 : self.intent.push_secondary(scheduler, node_id);
740 12831 : used_pageservers.push(node_id);
741 12831 : modified = true;
742 : }
743 : }
744 : Secondary => {
745 3 : if let Some(node_id) = self.intent.get_attached() {
746 2 : // Populate secondary by demoting the attached node
747 2 : self.intent.demote_attached(scheduler, *node_id);
748 2 :
749 2 : modified = true;
750 2 : } else if self.intent.secondary.is_empty() {
751 : // Populate secondary by scheduling a fresh node
752 : //
753 : // We use [`AttachedShardTag`] because when a secondary location is the only one
754 : // a shard has, we expect that its next use will be as an attached location: we want
755 : // the tenant to be ready to warm up and run fast in their preferred AZ.
756 1 : let node_id = scheduler.schedule_shard::<AttachedShardTag>(
757 1 : &[],
758 1 : &self.intent.preferred_az_id,
759 1 : context,
760 1 : )?;
761 1 : self.intent.push_secondary(scheduler, node_id);
762 1 : modified = true;
763 0 : }
764 4 : while self.intent.secondary.len() > 1 {
765 1 : // If we have multiple secondaries (e.g. when transitioning from Attached to Secondary and
766 1 : // having just demoted our attached location), then we should prefer to keep the location
767 1 : // in our preferred AZ. Tenants in Secondary mode want to be in the preferred AZ so that
768 1 : // they have a warm location to become attached when transitioning back into Attached.
769 1 :
770 1 : let mut candidates = self.intent.get_secondary().clone();
771 1 : // Sort to get secondaries outside preferred AZ last
772 1 : candidates
773 2 : .sort_by_key(|n| scheduler.get_node_az(n).as_ref() != self.preferred_az());
774 1 : let secondary_to_remove = candidates.pop().unwrap();
775 1 : self.intent.remove_secondary(scheduler, secondary_to_remove);
776 1 : modified = true;
777 1 : }
778 : }
779 : Detached => {
780 : // Never add locations in this mode
781 0 : if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
782 0 : self.intent.clear(scheduler);
783 0 : modified = true;
784 0 : }
785 : }
786 : }
787 :
788 12835 : if modified {
789 12835 : self.sequence.0 += 1;
790 12835 : }
791 :
792 12835 : Ok(())
793 12836 : }
794 :
795 : /// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
796 : /// if the swap is not possible and leaves the intent state in its original state.
797 : ///
798 : /// Arguments:
799 : /// `attached_to`: the currently attached location matching the intent state (may be None if the
800 : /// shard is not attached)
801 : /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
802 : /// the scheduler to recommend a node
803 0 : pub(crate) fn reschedule_to_secondary(
804 0 : &mut self,
805 0 : promote_to: Option<NodeId>,
806 0 : scheduler: &mut Scheduler,
807 0 : ) -> Result<(), ScheduleError> {
808 0 : let promote_to = match promote_to {
809 0 : Some(node) => node,
810 0 : None => match self.preferred_secondary(scheduler) {
811 0 : Some(node) => node,
812 : None => {
813 0 : return Err(ScheduleError::ImpossibleConstraint);
814 : }
815 : },
816 : };
817 :
818 0 : assert!(self.intent.get_secondary().contains(&promote_to));
819 :
820 0 : if let Some(node) = self.intent.get_attached() {
821 0 : let demoted = self.intent.demote_attached(scheduler, *node);
822 0 : if !demoted {
823 0 : return Err(ScheduleError::ImpossibleConstraint);
824 0 : }
825 0 : }
826 :
827 0 : self.intent.promote_attached(scheduler, promote_to);
828 0 :
829 0 : // Increment the sequence number for the edge case where a
830 0 : // reconciler is already running to avoid waiting on the
831 0 : // current reconcile instead of spawning a new one.
832 0 : self.sequence = self.sequence.next();
833 0 :
834 0 : Ok(())
835 0 : }
836 :
837 : /// Returns None if the current location's score is unavailable, i.e. cannot draw a conclusion
838 68 : fn is_better_location<T: ShardTag>(
839 68 : &self,
840 68 : scheduler: &mut Scheduler,
841 68 : schedule_context: &ScheduleContext,
842 68 : current: NodeId,
843 68 : candidate: NodeId,
844 68 : ) -> Option<bool> {
845 68 : let Some(candidate_score) = scheduler.compute_node_score::<T::Score>(
846 68 : candidate,
847 68 : &self.intent.preferred_az_id,
848 68 : schedule_context,
849 68 : ) else {
850 : // The candidate node is unavailable for scheduling or otherwise couldn't get a score
851 1 : return None;
852 : };
853 :
854 : // If the candidate is our preferred node, then it is better than the current location, as long
855 : // as it is online -- the online check is part of the score calculation we did above, so it's
856 : // important that this check comes after that one.
857 67 : if let Some(preferred) = self.preferred_node.as_ref() {
858 3 : if preferred == &candidate {
859 1 : return Some(true);
860 2 : }
861 64 : }
862 :
863 66 : match scheduler.compute_node_score::<T::Score>(
864 66 : current,
865 66 : &self.intent.preferred_az_id,
866 66 : schedule_context,
867 66 : ) {
868 66 : Some(current_score) => {
869 66 : // Ignore utilization components when comparing scores: we don't want to migrate
870 66 : // because of transient load variations, it risks making the system thrash, and
871 66 : // migrating for utilization requires a separate high level view of the system to
872 66 : // e.g. prioritize moving larger or smaller tenants, rather than arbitrarily
873 66 : // moving things around in the order that we hit this function.
874 66 : let candidate_score = candidate_score.for_optimization();
875 66 : let current_score = current_score.for_optimization();
876 66 :
877 66 : if candidate_score < current_score {
878 8 : tracing::info!(
879 0 : "Found a lower scoring location! {candidate} is better than {current} ({candidate_score:?} is better than {current_score:?})"
880 : );
881 8 : Some(true)
882 : } else {
883 : // The candidate node is no better than our current location, so don't migrate
884 58 : tracing::debug!(
885 0 : "Candidate node {candidate} is no better than our current location {current} (candidate {candidate_score:?} vs current {current_score:?})",
886 : );
887 58 : Some(false)
888 : }
889 : }
890 : None => {
891 : // The current node is unavailable for scheduling, so we can't make any sensible
892 : // decisions about optimisation. This should be a transient state -- if the node
893 : // is offline then it will get evacuated, if is blocked by a scheduling mode
894 : // then we will respect that mode by doing nothing.
895 0 : tracing::debug!("Current node {current} is unavailable for scheduling");
896 0 : None
897 : }
898 : }
899 68 : }
900 :
901 48 : pub(crate) fn find_better_location<T: ShardTag>(
902 48 : &self,
903 48 : scheduler: &mut Scheduler,
904 48 : schedule_context: &ScheduleContext,
905 48 : current: NodeId,
906 48 : hard_exclude: &[NodeId],
907 48 : ) -> Option<NodeId> {
908 : // If we have a migration hint, then that is our better location
909 48 : if let Some(hint) = self.preferred_node.as_ref() {
910 1 : if hint == ¤t {
911 0 : return None;
912 1 : }
913 1 :
914 1 : return Some(*hint);
915 47 : }
916 :
917 : // Look for a lower-scoring location to attach to
918 47 : let Ok(candidate_node) = scheduler.schedule_shard::<T>(
919 47 : hard_exclude,
920 47 : &self.intent.preferred_az_id,
921 47 : schedule_context,
922 47 : ) else {
923 : // A scheduling error means we have no possible candidate replacements
924 0 : tracing::debug!("No candidate node found");
925 0 : return None;
926 : };
927 :
928 47 : if candidate_node == current {
929 : // We're already at the best possible location, so don't migrate
930 24 : tracing::debug!("Candidate node {candidate_node} is already in use");
931 24 : return None;
932 23 : }
933 23 :
934 23 : self.is_better_location::<T>(scheduler, schedule_context, current, candidate_node)
935 23 : .and_then(|better| if better { Some(candidate_node) } else { None })
936 48 : }
937 :
938 : /// This function is an optimization, used to avoid doing large numbers of scheduling operations
939 : /// when looking for optimizations. This function uses knowledge of how scores work to do some
940 : /// fast checks for whether it may to be possible to improve a score.
941 : ///
942 : /// If we return true, it only means that optimization _might_ be possible, not that it necessarily is. If we
943 : /// return no, it definitely means that calling [`Self::optimize_attachment`] or [`Self::optimize_secondary`] would do no
944 : /// work.
945 3 : pub(crate) fn maybe_optimizable(
946 3 : &self,
947 3 : scheduler: &mut Scheduler,
948 3 : schedule_context: &ScheduleContext,
949 3 : ) -> bool {
950 : // Tenant with preferred node: check if it is not already at the preferred node
951 3 : if let Some(preferred) = self.preferred_node.as_ref() {
952 0 : if Some(preferred) != self.intent.get_attached().as_ref() {
953 0 : return true;
954 0 : }
955 3 : }
956 :
957 : // Sharded tenant: check if any locations have a nonzero affinity score
958 3 : if self.shard.count >= ShardCount(1) {
959 3 : let schedule_context = schedule_context.project_detach(self);
960 5 : for node in self.intent.all_pageservers() {
961 5 : if let Some(af) = schedule_context.nodes.get(&node) {
962 5 : if *af > AffinityScore(0) {
963 3 : return true;
964 2 : }
965 0 : }
966 : }
967 0 : }
968 :
969 : // Attached tenant: check if the attachment is outside the preferred AZ
970 0 : if let PlacementPolicy::Attached(_) = self.policy {
971 0 : if let Some(attached) = self.intent.get_attached() {
972 0 : if scheduler.get_node_az(attached) != self.intent.preferred_az_id {
973 0 : return true;
974 0 : }
975 0 : }
976 0 : }
977 :
978 : // Tenant with secondary locations: check if any are within the preferred AZ
979 0 : for secondary in self.intent.get_secondary() {
980 0 : if scheduler.get_node_az(secondary) == self.intent.preferred_az_id {
981 0 : return true;
982 0 : }
983 : }
984 :
985 : // Does the tenant have excess secondaries?
986 0 : if self.intent.get_secondary().len() > self.policy.want_secondaries() {
987 0 : return true;
988 0 : }
989 0 :
990 0 : // Fall through: no optimizations possible
991 0 : false
992 3 : }
993 :
994 : /// Optimize attachments: if a shard has a secondary location that is preferable to
995 : /// its primary location based on soft constraints, switch that secondary location
996 : /// to be attached.
997 : ///
998 : /// `schedule_context` should have been populated with all shards in the tenant, including
999 : /// the one we're trying to optimize (this function will subtract its own contribution before making scoring decisions)
1000 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1001 : pub(crate) fn optimize_attachment(
1002 : &self,
1003 : scheduler: &mut Scheduler,
1004 : schedule_context: &ScheduleContext,
1005 : ) -> Option<ScheduleOptimization> {
1006 : let attached = (*self.intent.get_attached())?;
1007 :
1008 : let schedule_context = schedule_context.project_detach(self);
1009 :
1010 : // If we already have a secondary that is higher-scoring than out current location,
1011 : // then simply migrate to it.
1012 : for secondary in self.intent.get_secondary() {
1013 : if let Some(true) = self.is_better_location::<AttachedShardTag>(
1014 : scheduler,
1015 : &schedule_context,
1016 : attached,
1017 : *secondary,
1018 : ) {
1019 : return Some(ScheduleOptimization {
1020 : sequence: self.sequence,
1021 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1022 : old_attached_node_id: attached,
1023 : new_attached_node_id: *secondary,
1024 : }),
1025 : });
1026 : }
1027 : }
1028 :
1029 : // Given that none of our current secondaries is a better location than our current
1030 : // attached location (checked above), we may trim any secondaries that are not needed
1031 : // for the placement policy.
1032 : if self.intent.get_secondary().len() > self.policy.want_secondaries() {
1033 : // This code path cleans up extra secondaries after migrating, and/or
1034 : // trims extra secondaries after a PlacementPolicy::Attached(N) was
1035 : // modified to decrease N.
1036 :
1037 : let secondary_scores = self
1038 : .intent
1039 : .get_secondary()
1040 : .iter()
1041 12 : .map(|node_id| {
1042 12 : (
1043 12 : *node_id,
1044 12 : scheduler.compute_node_score::<NodeSecondarySchedulingScore>(
1045 12 : *node_id,
1046 12 : &self.intent.preferred_az_id,
1047 12 : &schedule_context,
1048 12 : ),
1049 12 : )
1050 12 : })
1051 : .collect::<HashMap<_, _>>();
1052 :
1053 11 : if secondary_scores.iter().any(|score| score.1.is_none()) {
1054 : // Trivial case: if we only have one secondary, drop that one
1055 : if self.intent.get_secondary().len() == 1 {
1056 : return Some(ScheduleOptimization {
1057 : sequence: self.sequence,
1058 : action: ScheduleOptimizationAction::RemoveSecondary(
1059 : *self.intent.get_secondary().first().unwrap(),
1060 : ),
1061 : });
1062 : }
1063 :
1064 : // Try to find a "good" secondary to keep, without relying on scores (one or more nodes is in a state
1065 : // where its score can't be calculated), and drop the others. This enables us to make progress in
1066 : // most cases, even if some nodes are offline or have scheduling=pause set.
1067 :
1068 : debug_assert!(self.intent.attached.is_some()); // We should not make it here unless attached -- this
1069 : // logic presumes we are in a mode where we want secondaries to be in non-home AZ
1070 1 : if let Some(retain_secondary) = self.intent.get_secondary().iter().find(|n| {
1071 1 : let in_home_az = scheduler.get_node_az(n) == self.intent.preferred_az_id;
1072 1 : let is_available = secondary_scores
1073 1 : .get(n)
1074 1 : .expect("Built from same list of nodes")
1075 1 : .is_some();
1076 1 : is_available && !in_home_az
1077 1 : }) {
1078 : // Great, we found one to retain. Pick some other to drop.
1079 : if let Some(victim) = self
1080 : .intent
1081 : .get_secondary()
1082 : .iter()
1083 2 : .find(|n| n != &retain_secondary)
1084 : {
1085 : return Some(ScheduleOptimization {
1086 : sequence: self.sequence,
1087 : action: ScheduleOptimizationAction::RemoveSecondary(*victim),
1088 : });
1089 : }
1090 : }
1091 :
1092 : // Fall through: we didn't identify one to remove. This ought to be rare.
1093 : tracing::warn!(
1094 : "Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)",
1095 : self.intent.get_secondary()
1096 : );
1097 : } else {
1098 : let victim = secondary_scores
1099 : .iter()
1100 10 : .max_by_key(|score| score.1.unwrap())
1101 : .unwrap()
1102 : .0;
1103 : return Some(ScheduleOptimization {
1104 : sequence: self.sequence,
1105 : action: ScheduleOptimizationAction::RemoveSecondary(*victim),
1106 : });
1107 : }
1108 : }
1109 :
1110 : let replacement = self.find_better_location::<AttachedShardTag>(
1111 : scheduler,
1112 : &schedule_context,
1113 : attached,
1114 : &[], // Don't exclude secondaries: our preferred attachment location may be a secondary
1115 : );
1116 :
1117 : // We have found a candidate and confirmed that its score is preferable
1118 : // to our current location. See if we have a secondary location in the preferred location already: if not,
1119 : // then create one.
1120 : if let Some(replacement) = replacement {
1121 : // If we are currently in non-preferred AZ, then the scheduler might suggest a location that is better, but still
1122 : // not in our preferred AZ. Migration has a cost in resources an impact to the workload, so we want to avoid doing
1123 : // multiple hops where we might go to some other AZ before eventually finding a suitable location in our preferred
1124 : // AZ: skip this optimization if it is not in our final, preferred AZ.
1125 : //
1126 : // This should be a transient state, there should always be capacity eventually in our preferred AZ (even if nodes
1127 : // there are too overloaded for scheduler to suggest them, more should be provisioned eventually).
1128 : if self.preferred_node.is_none()
1129 : && self.intent.preferred_az_id.is_some()
1130 : && scheduler.get_node_az(&replacement) != self.intent.preferred_az_id
1131 : {
1132 : tracing::debug!(
1133 : "Candidate node {replacement} is not in preferred AZ {:?}",
1134 : self.intent.preferred_az_id
1135 : );
1136 :
1137 : // This should only happen if our current location is not in the preferred AZ, otherwise
1138 : // [`Self::find_better_location`]` should have rejected any other location outside the preferred Az, because
1139 : // AZ is the highest priority part of NodeAttachmentSchedulingScore.
1140 : debug_assert!(scheduler.get_node_az(&attached) != self.intent.preferred_az_id);
1141 :
1142 : return None;
1143 : }
1144 :
1145 : if !self.intent.get_secondary().contains(&replacement) {
1146 : Some(ScheduleOptimization {
1147 : sequence: self.sequence,
1148 : action: ScheduleOptimizationAction::CreateSecondary(replacement),
1149 : })
1150 : } else {
1151 : // We already have a secondary in the preferred location, let's try migrating to it. Our caller
1152 : // will check the warmth of the destination before deciding whether to really execute this.
1153 : Some(ScheduleOptimization {
1154 : sequence: self.sequence,
1155 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1156 : old_attached_node_id: attached,
1157 : new_attached_node_id: replacement,
1158 : }),
1159 : })
1160 : }
1161 : } else {
1162 : // We didn't find somewhere we'd rather be, and we don't have any excess secondaries
1163 : // to clean up: no action required.
1164 : None
1165 : }
1166 : }
1167 :
1168 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1169 : pub(crate) fn optimize_secondary(
1170 : &self,
1171 : scheduler: &mut Scheduler,
1172 : schedule_context: &ScheduleContext,
1173 : ) -> Option<ScheduleOptimization> {
1174 : if self.intent.get_secondary().len() > self.policy.want_secondaries() {
1175 : // We have extra secondaries, perhaps to facilitate a migration of the attached location:
1176 : // do nothing, it is up to [`Self::optimize_attachment`] to clean them up. When that's done,
1177 : // and we are called again, we will proceed.
1178 : tracing::debug!("Too many secondaries: skipping");
1179 : return None;
1180 : }
1181 :
1182 : let schedule_context = schedule_context.project_detach(self);
1183 :
1184 : for secondary in self.intent.get_secondary() {
1185 : // Make sure we don't try to migrate a secondary to our attached location: this case happens
1186 : // easily in environments without multiple AZs.
1187 : let exclude = match self.intent.attached {
1188 : Some(attached) => vec![attached],
1189 : None => vec![],
1190 : };
1191 :
1192 : let replacement = match &self.policy {
1193 : PlacementPolicy::Attached(_) => {
1194 : // Secondaries for an attached shard should be scheduled using `SecondaryShardTag`
1195 : // to avoid placing them in the preferred AZ.
1196 : self.find_better_location::<SecondaryShardTag>(
1197 : scheduler,
1198 : &schedule_context,
1199 : *secondary,
1200 : &exclude,
1201 : )
1202 : }
1203 : PlacementPolicy::Secondary => {
1204 : // In secondary-only mode, we want our secondary locations in the preferred AZ,
1205 : // so that they're ready to take over as an attached location when we transition
1206 : // into PlacementPolicy::Attached.
1207 : self.find_better_location::<AttachedShardTag>(
1208 : scheduler,
1209 : &schedule_context,
1210 : *secondary,
1211 : &exclude,
1212 : )
1213 : }
1214 : PlacementPolicy::Detached => None,
1215 : };
1216 :
1217 : assert!(replacement != Some(*secondary));
1218 : if let Some(replacement) = replacement {
1219 : // We have found a candidate and confirmed that its score is preferable
1220 : // to our current location. See if we have a secondary location in the preferred location already: if not,
1221 : // then create one.
1222 : return Some(ScheduleOptimization {
1223 : sequence: self.sequence,
1224 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
1225 : old_node_id: *secondary,
1226 : new_node_id: replacement,
1227 : }),
1228 : });
1229 : }
1230 : }
1231 :
1232 : None
1233 : }
1234 :
1235 : /// Start or abort a graceful migration of this shard to another pageserver. This works on top of the
1236 : /// other optimisation functions, to bias them to move to the destination node.
1237 0 : pub(crate) fn set_preferred_node(&mut self, node: Option<NodeId>) {
1238 0 : if let Some(hint) = self.preferred_node.as_ref() {
1239 0 : if Some(hint) != node.as_ref() {
1240 : // This is legal but a bit surprising: we expect that administrators wouldn't usually
1241 : // change their mind about where to migrate something.
1242 0 : tracing::warn!(
1243 0 : "Changing migration destination from {hint} to {node:?} (current intent {:?})",
1244 : self.intent
1245 : );
1246 0 : }
1247 0 : }
1248 :
1249 0 : self.preferred_node = node;
1250 0 : }
1251 :
1252 0 : pub(crate) fn get_preferred_node(&self) -> Option<NodeId> {
1253 0 : self.preferred_node
1254 0 : }
1255 :
1256 : /// Return true if the optimization was really applied: it will not be applied if the optimization's
1257 : /// sequence is behind this tenant shard's
1258 17 : pub(crate) fn apply_optimization(
1259 17 : &mut self,
1260 17 : scheduler: &mut Scheduler,
1261 17 : optimization: ScheduleOptimization,
1262 17 : ) -> bool {
1263 17 : if optimization.sequence != self.sequence {
1264 0 : return false;
1265 17 : }
1266 17 :
1267 17 : metrics::METRICS_REGISTRY
1268 17 : .metrics_group
1269 17 : .storage_controller_schedule_optimization
1270 17 : .inc();
1271 17 :
1272 17 : match optimization.action {
1273 : ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1274 5 : old_attached_node_id,
1275 5 : new_attached_node_id,
1276 5 : }) => {
1277 5 : self.intent.demote_attached(scheduler, old_attached_node_id);
1278 5 : self.intent
1279 5 : .promote_attached(scheduler, new_attached_node_id);
1280 :
1281 5 : if let Some(hint) = self.preferred_node.as_ref() {
1282 1 : if hint == &new_attached_node_id {
1283 : // The migration target is not a long term pin: once we are done with the migration, clear it.
1284 1 : tracing::info!("Graceful migration to {hint} complete");
1285 1 : self.preferred_node = None;
1286 0 : }
1287 4 : }
1288 : }
1289 : ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
1290 1 : old_node_id,
1291 1 : new_node_id,
1292 1 : }) => {
1293 1 : self.intent.remove_secondary(scheduler, old_node_id);
1294 1 : self.intent.push_secondary(scheduler, new_node_id);
1295 1 : }
1296 4 : ScheduleOptimizationAction::CreateSecondary(new_node_id) => {
1297 4 : self.intent.push_secondary(scheduler, new_node_id);
1298 4 : }
1299 7 : ScheduleOptimizationAction::RemoveSecondary(old_secondary) => {
1300 7 : self.intent.remove_secondary(scheduler, old_secondary);
1301 7 : }
1302 : }
1303 :
1304 17 : true
1305 17 : }
1306 :
1307 : /// When a shard has several secondary locations, we need to pick one in situations where
1308 : /// we promote one of them to an attached location:
1309 : /// - When draining a node for restart
1310 : /// - When responding to a node failure
1311 : ///
1312 : /// In this context, 'preferred' does not mean the node with the best scheduling score: instead
1313 : /// we want to pick the node which is best for use _temporarily_ while the previous attached location
1314 : /// is unavailable (e.g. because it's down or deploying). That means we prefer to use secondary
1315 : /// locations in a non-preferred AZ, as they're more likely to have awarm cache than a temporary
1316 : /// secondary in the preferred AZ (which are usually only created for migrations, and if they exist
1317 : /// they're probably not warmed up yet). The latter behavior is based oni
1318 : ///
1319 : /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
1320 : /// caller needs to a pick a node some other way.
1321 12832 : pub(crate) fn preferred_secondary(&self, scheduler: &Scheduler) -> Option<NodeId> {
1322 12832 : let candidates = scheduler.filter_usable_nodes(&self.intent.secondary);
1323 12832 :
1324 12832 : // We will sort candidates to prefer nodes which are _not_ in our preferred AZ, i.e. we prefer
1325 12832 : // to migrate to a long-lived secondary location (which would have been scheduled in a non-preferred AZ),
1326 12832 : // rather than a short-lived secondary location being used for optimization/migration (which would have
1327 12832 : // been scheduled in our preferred AZ).
1328 12832 : let mut candidates = candidates
1329 12832 : .iter()
1330 12832 : .map(|(node_id, node_az)| {
1331 2 : if node_az == &self.intent.preferred_az_id {
1332 1 : (1, *node_id)
1333 : } else {
1334 1 : (0, *node_id)
1335 : }
1336 12832 : })
1337 12832 : .collect::<Vec<_>>();
1338 12832 :
1339 12832 : candidates.sort();
1340 12832 :
1341 12832 : candidates.first().map(|i| i.1)
1342 12832 : }
1343 :
1344 : /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
1345 : /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
1346 : /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
1347 : ///
1348 : /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
1349 : /// funciton should not be used to decide whether to reconcile.
1350 0 : pub(crate) fn stably_attached(&self) -> Option<NodeId> {
1351 0 : if let Some(attach_intent) = self.intent.attached {
1352 0 : match self.observed.locations.get(&attach_intent) {
1353 0 : Some(loc) => match &loc.conf {
1354 0 : Some(conf) => match conf.mode {
1355 : LocationConfigMode::AttachedMulti
1356 : | LocationConfigMode::AttachedSingle
1357 : | LocationConfigMode::AttachedStale => {
1358 : // Our intent and observed state agree that this node is in an attached state.
1359 0 : Some(attach_intent)
1360 : }
1361 : // Our observed config is not an attached state
1362 0 : _ => None,
1363 : },
1364 : // Our observed state is None, i.e. in flux
1365 0 : None => None,
1366 : },
1367 : // We have no observed state for this node
1368 0 : None => None,
1369 : }
1370 : } else {
1371 : // Our intent is not to attach
1372 0 : None
1373 : }
1374 0 : }
1375 :
1376 0 : fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
1377 0 : let mut dirty_nodes = HashSet::new();
1378 :
1379 0 : if let Some(node_id) = self.intent.attached {
1380 : // Maybe panic: it is a severe bug if we try to attach while generation is null.
1381 0 : let generation = self
1382 0 : .generation
1383 0 : .expect("Attempted to enter attached state without a generation");
1384 0 :
1385 0 : let wanted_conf =
1386 0 : attached_location_conf(generation, &self.shard, &self.config, &self.policy);
1387 0 : match self.observed.locations.get(&node_id) {
1388 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
1389 0 : Some(_) | None => {
1390 0 : dirty_nodes.insert(node_id);
1391 0 : }
1392 : }
1393 0 : }
1394 :
1395 0 : for node_id in &self.intent.secondary {
1396 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
1397 0 : match self.observed.locations.get(node_id) {
1398 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
1399 0 : Some(_) | None => {
1400 0 : dirty_nodes.insert(*node_id);
1401 0 : }
1402 : }
1403 : }
1404 :
1405 0 : for node_id in self.observed.locations.keys() {
1406 0 : if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
1407 0 : // We have observed state that isn't part of our intent: need to clean it up.
1408 0 : dirty_nodes.insert(*node_id);
1409 0 : }
1410 : }
1411 :
1412 0 : dirty_nodes.retain(|node_id| {
1413 0 : nodes
1414 0 : .get(node_id)
1415 0 : .map(|n| n.is_available())
1416 0 : .unwrap_or(false)
1417 0 : });
1418 0 :
1419 0 : !dirty_nodes.is_empty()
1420 0 : }
1421 :
1422 : #[allow(clippy::too_many_arguments)]
1423 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1424 : pub(crate) fn get_reconcile_needed(
1425 : &mut self,
1426 : pageservers: &Arc<HashMap<NodeId, Node>>,
1427 : ) -> ReconcileNeeded {
1428 : // If there are any ambiguous observed states, and the nodes they refer to are available,
1429 : // we should reconcile to clean them up.
1430 : let mut dirty_observed = false;
1431 : for (node_id, observed_loc) in &self.observed.locations {
1432 : let node = pageservers
1433 : .get(node_id)
1434 : .expect("Nodes may not be removed while referenced");
1435 : if observed_loc.conf.is_none() && node.is_available() {
1436 : dirty_observed = true;
1437 : break;
1438 : }
1439 : }
1440 :
1441 : let active_nodes_dirty = self.dirty(pageservers);
1442 :
1443 : let reconcile_needed = match (
1444 : active_nodes_dirty,
1445 : dirty_observed,
1446 : self.pending_compute_notification,
1447 : ) {
1448 : (true, _, _) => ReconcileNeeded::Yes(ReconcileReason::ActiveNodesDirty),
1449 : (_, true, _) => ReconcileNeeded::Yes(ReconcileReason::UnknownLocation),
1450 : (_, _, true) => ReconcileNeeded::Yes(ReconcileReason::PendingComputeNotification),
1451 : _ => ReconcileNeeded::No,
1452 : };
1453 :
1454 : if matches!(reconcile_needed, ReconcileNeeded::No) {
1455 : tracing::debug!("Not dirty, no reconciliation needed.");
1456 : return ReconcileNeeded::No;
1457 : }
1458 :
1459 : // If we are currently splitting, then never start a reconciler task: the splitting logic
1460 : // requires that shards are not interfered with while it runs. Do this check here rather than
1461 : // up top, so that we only log this message if we would otherwise have done a reconciliation.
1462 : if !matches!(self.splitting, SplitState::Idle) {
1463 : tracing::info!("Refusing to reconcile, splitting in progress");
1464 : return ReconcileNeeded::No;
1465 : }
1466 :
1467 : // Reconcile already in flight for the current sequence?
1468 : if let Some(handle) = &self.reconciler {
1469 : if handle.sequence == self.sequence {
1470 : tracing::info!(
1471 : "Reconciliation already in progress for sequence {:?}",
1472 : self.sequence,
1473 : );
1474 : return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
1475 : tenant_shard_id: self.tenant_shard_id,
1476 : seq_wait: self.waiter.clone(),
1477 : error_seq_wait: self.error_waiter.clone(),
1478 : error: self.last_error.clone(),
1479 : seq: self.sequence,
1480 : });
1481 : }
1482 : }
1483 :
1484 : // Pre-checks done: finally check whether we may actually do the work
1485 : match self.scheduling_policy {
1486 : ShardSchedulingPolicy::Active
1487 : | ShardSchedulingPolicy::Essential
1488 : | ShardSchedulingPolicy::Pause => {}
1489 : ShardSchedulingPolicy::Stop => {
1490 : // We only reach this point if there is work to do and we're going to skip
1491 : // doing it: warn it obvious why this tenant isn't doing what it ought to.
1492 : tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
1493 : return ReconcileNeeded::No;
1494 : }
1495 : }
1496 :
1497 : reconcile_needed
1498 : }
1499 :
1500 : /// Ensure the sequence number is set to a value where waiting for this value will make us wait
1501 : /// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
1502 : ///
1503 : /// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
1504 : /// that the waiter will not complete until some future Reconciler is constructed and run.
1505 0 : fn ensure_sequence_ahead(&mut self) {
1506 0 : // Find the highest sequence for which a Reconciler has previously run or is currently
1507 0 : // running
1508 0 : let max_seen = std::cmp::max(
1509 0 : self.reconciler
1510 0 : .as_ref()
1511 0 : .map(|r| r.sequence)
1512 0 : .unwrap_or(Sequence(0)),
1513 0 : std::cmp::max(self.waiter.load(), self.error_waiter.load()),
1514 0 : );
1515 0 :
1516 0 : if self.sequence <= max_seen {
1517 0 : self.sequence = max_seen.next();
1518 0 : }
1519 0 : }
1520 :
1521 : /// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
1522 : ///
1523 : /// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
1524 : /// you would like to wait on the next reconciler that gets spawned in the background.
1525 0 : pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
1526 0 : self.ensure_sequence_ahead();
1527 0 :
1528 0 : ReconcilerWaiter {
1529 0 : tenant_shard_id: self.tenant_shard_id,
1530 0 : seq_wait: self.waiter.clone(),
1531 0 : error_seq_wait: self.error_waiter.clone(),
1532 0 : error: self.last_error.clone(),
1533 0 : seq: self.sequence,
1534 0 : }
1535 0 : }
1536 :
1537 0 : async fn reconcile(
1538 0 : sequence: Sequence,
1539 0 : mut reconciler: Reconciler,
1540 0 : must_notify: bool,
1541 0 : ) -> ReconcileResult {
1542 : // Attempt to make observed state match intent state
1543 0 : let result = reconciler.reconcile().await;
1544 :
1545 : // If we know we had a pending compute notification from some previous action, send a notification irrespective
1546 : // of whether the above reconcile() did any work. It has to be Ok() though, because otherwise we might be
1547 : // sending a notification of a location that isn't really attached.
1548 0 : if result.is_ok() && must_notify {
1549 : // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
1550 0 : reconciler.compute_notify().await.ok();
1551 0 : } else if must_notify {
1552 0 : // Carry this flag so that the reconciler's result will indicate that it still needs to retry
1553 0 : // the compute hook notification eventually.
1554 0 : reconciler.compute_notify_failure = true;
1555 0 : }
1556 :
1557 : // Update result counter
1558 0 : let outcome_label = match &result {
1559 0 : Ok(_) => ReconcileOutcome::Success,
1560 0 : Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
1561 0 : Err(_) => ReconcileOutcome::Error,
1562 : };
1563 :
1564 0 : metrics::METRICS_REGISTRY
1565 0 : .metrics_group
1566 0 : .storage_controller_reconcile_complete
1567 0 : .inc(ReconcileCompleteLabelGroup {
1568 0 : status: outcome_label,
1569 0 : });
1570 0 :
1571 0 : // Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
1572 0 : // try and schedule more work in response to our result.
1573 0 : ReconcileResult {
1574 0 : sequence,
1575 0 : result,
1576 0 : tenant_shard_id: reconciler.tenant_shard_id,
1577 0 : generation: reconciler.generation,
1578 0 : observed_deltas: reconciler.observed_deltas(),
1579 0 : pending_compute_notification: reconciler.compute_notify_failure,
1580 0 : }
1581 0 : }
1582 :
1583 : #[allow(clippy::too_many_arguments)]
1584 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1585 : pub(crate) fn spawn_reconciler(
1586 : &mut self,
1587 : reason: ReconcileReason,
1588 : result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
1589 : pageservers: &Arc<HashMap<NodeId, Node>>,
1590 : compute_hook: &Arc<ComputeHook>,
1591 : reconciler_config: ReconcilerConfig,
1592 : service_config: &service::Config,
1593 : persistence: &Arc<Persistence>,
1594 : units: ReconcileUnits,
1595 : gate_guard: GateGuard,
1596 : cancel: &CancellationToken,
1597 : http_client: reqwest::Client,
1598 : ) -> Option<ReconcilerWaiter> {
1599 : // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
1600 : // doing our sequence's work.
1601 : let old_handle = self.reconciler.take();
1602 :
1603 : // Build list of nodes from which the reconciler should detach
1604 : let mut detach = Vec::new();
1605 : for node_id in self.observed.locations.keys() {
1606 : if self.intent.get_attached() != &Some(*node_id)
1607 : && !self.intent.secondary.contains(node_id)
1608 : {
1609 : detach.push(
1610 : pageservers
1611 : .get(node_id)
1612 : .expect("Intent references non-existent pageserver")
1613 : .clone(),
1614 : )
1615 : }
1616 : }
1617 :
1618 : // Advance the sequence before spawning a reconciler, so that sequence waiters
1619 : // can distinguish between before+after the reconcile completes.
1620 : self.ensure_sequence_ahead();
1621 :
1622 : let reconciler_cancel = cancel.child_token();
1623 : let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
1624 : let reconciler = Reconciler {
1625 : tenant_shard_id: self.tenant_shard_id,
1626 : shard: self.shard,
1627 : placement_policy: self.policy.clone(),
1628 : generation: self.generation,
1629 : intent: reconciler_intent,
1630 : detach,
1631 : reconciler_config,
1632 : config: self.config.clone(),
1633 : preferred_az: self.intent.preferred_az_id.clone(),
1634 : observed: self.observed.clone(),
1635 : original_observed: self.observed.clone(),
1636 : compute_hook: compute_hook.clone(),
1637 : service_config: service_config.clone(),
1638 : _gate_guard: gate_guard,
1639 : _resource_units: units,
1640 : cancel: reconciler_cancel.clone(),
1641 : persistence: persistence.clone(),
1642 : compute_notify_failure: false,
1643 : http_client,
1644 : };
1645 :
1646 : let reconcile_seq = self.sequence;
1647 : let long_reconcile_threshold = service_config.long_reconcile_threshold;
1648 :
1649 : tracing::info!(seq=%reconcile_seq, "Spawning Reconciler ({reason:?})");
1650 : let must_notify = self.pending_compute_notification;
1651 : let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
1652 : tenant_id=%reconciler.tenant_shard_id.tenant_id,
1653 : shard_id=%reconciler.tenant_shard_id.shard_slug());
1654 : metrics::METRICS_REGISTRY
1655 : .metrics_group
1656 : .storage_controller_reconcile_spawn
1657 : .inc();
1658 : let result_tx = result_tx.clone();
1659 : let join_handle = tokio::task::spawn(
1660 0 : async move {
1661 : // Wait for any previous reconcile task to complete before we start
1662 0 : if let Some(old_handle) = old_handle {
1663 0 : old_handle.cancel.cancel();
1664 0 : if let Err(e) = old_handle.handle.await {
1665 : // We can't do much with this other than log it: the task is done, so
1666 : // we may proceed with our work.
1667 0 : tracing::error!("Unexpected join error waiting for reconcile task: {e}");
1668 0 : }
1669 0 : }
1670 :
1671 : // Early check for cancellation before doing any work
1672 : // TODO: wrap all remote API operations in cancellation check
1673 : // as well.
1674 0 : if reconciler.cancel.is_cancelled() {
1675 0 : metrics::METRICS_REGISTRY
1676 0 : .metrics_group
1677 0 : .storage_controller_reconcile_complete
1678 0 : .inc(ReconcileCompleteLabelGroup {
1679 0 : status: ReconcileOutcome::Cancel,
1680 0 : });
1681 0 : return;
1682 0 : }
1683 0 :
1684 0 : let (tenant_id_label, shard_number_label, sequence_label) = {
1685 0 : (
1686 0 : reconciler.tenant_shard_id.tenant_id.to_string(),
1687 0 : reconciler.tenant_shard_id.shard_number.0.to_string(),
1688 0 : reconcile_seq.to_string(),
1689 0 : )
1690 0 : };
1691 0 :
1692 0 : let label_group = ReconcileLongRunningLabelGroup {
1693 0 : tenant_id: &tenant_id_label,
1694 0 : shard_number: &shard_number_label,
1695 0 : sequence: &sequence_label,
1696 0 : };
1697 0 :
1698 0 : let reconcile_fut = Self::reconcile(reconcile_seq, reconciler, must_notify);
1699 0 : let long_reconcile_fut = {
1700 0 : let label_group = label_group.clone();
1701 0 : async move {
1702 0 : tokio::time::sleep(long_reconcile_threshold).await;
1703 :
1704 0 : tracing::warn!("Reconcile passed the long running threshold of {long_reconcile_threshold:?}");
1705 :
1706 0 : metrics::METRICS_REGISTRY
1707 0 : .metrics_group
1708 0 : .storage_controller_reconcile_long_running
1709 0 : .inc(label_group);
1710 0 : }
1711 : };
1712 :
1713 0 : let reconcile_fut = std::pin::pin!(reconcile_fut);
1714 0 : let long_reconcile_fut = std::pin::pin!(long_reconcile_fut);
1715 :
1716 0 : let (was_long, result) =
1717 0 : match future::select(reconcile_fut, long_reconcile_fut).await {
1718 0 : Either::Left((reconcile_result, _)) => (false, reconcile_result),
1719 0 : Either::Right((_, reconcile_fut)) => (true, reconcile_fut.await),
1720 : };
1721 :
1722 0 : if was_long {
1723 0 : let id = metrics::METRICS_REGISTRY
1724 0 : .metrics_group
1725 0 : .storage_controller_reconcile_long_running
1726 0 : .with_labels(label_group);
1727 0 : metrics::METRICS_REGISTRY
1728 0 : .metrics_group
1729 0 : .storage_controller_reconcile_long_running
1730 0 : .remove_metric(id);
1731 0 : }
1732 :
1733 0 : result_tx
1734 0 : .send(ReconcileResultRequest::ReconcileResult(result))
1735 0 : .ok();
1736 0 : }
1737 : .instrument(reconciler_span),
1738 : );
1739 :
1740 : self.reconciler = Some(ReconcilerHandle {
1741 : sequence: self.sequence,
1742 : handle: join_handle,
1743 : cancel: reconciler_cancel,
1744 : });
1745 :
1746 : Some(ReconcilerWaiter {
1747 : tenant_shard_id: self.tenant_shard_id,
1748 : seq_wait: self.waiter.clone(),
1749 : error_seq_wait: self.error_waiter.clone(),
1750 : error: self.last_error.clone(),
1751 : seq: self.sequence,
1752 : })
1753 : }
1754 :
1755 0 : pub(crate) fn cancel_reconciler(&self) {
1756 0 : if let Some(handle) = self.reconciler.as_ref() {
1757 0 : handle.cancel.cancel()
1758 0 : }
1759 0 : }
1760 :
1761 : /// Get a waiter for any reconciliation in flight, but do not start reconciliation
1762 : /// if it is not already running
1763 0 : pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
1764 0 : if self.reconciler.is_some() {
1765 0 : Some(ReconcilerWaiter {
1766 0 : tenant_shard_id: self.tenant_shard_id,
1767 0 : seq_wait: self.waiter.clone(),
1768 0 : error_seq_wait: self.error_waiter.clone(),
1769 0 : error: self.last_error.clone(),
1770 0 : seq: self.sequence,
1771 0 : })
1772 : } else {
1773 0 : None
1774 : }
1775 0 : }
1776 :
1777 : /// Called when a ReconcileResult has been emitted and the service is updating
1778 : /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
1779 : /// the handle to indicate there is no longer a reconciliation in progress.
1780 0 : pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
1781 0 : if let Some(reconcile_handle) = &self.reconciler {
1782 0 : if reconcile_handle.sequence <= sequence {
1783 0 : self.reconciler = None;
1784 0 : }
1785 0 : }
1786 0 : }
1787 :
1788 : /// If we had any state at all referring to this node ID, drop it. Does not
1789 : /// attempt to reschedule.
1790 : ///
1791 : /// Returns true if we modified the node's intent state.
1792 0 : pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
1793 0 : let mut intent_modified = false;
1794 0 :
1795 0 : // Drop if this node was our attached intent
1796 0 : if self.intent.attached == Some(node_id) {
1797 0 : self.intent.attached = None;
1798 0 : intent_modified = true;
1799 0 : }
1800 :
1801 : // Drop from the list of secondaries, and check if we modified it
1802 0 : let had_secondaries = self.intent.secondary.len();
1803 0 : self.intent.secondary.retain(|n| n != &node_id);
1804 0 : intent_modified |= self.intent.secondary.len() != had_secondaries;
1805 0 :
1806 0 : debug_assert!(!self.intent.all_pageservers().contains(&node_id));
1807 :
1808 0 : if self.preferred_node == Some(node_id) {
1809 0 : self.preferred_node = None;
1810 0 : }
1811 :
1812 0 : intent_modified
1813 0 : }
1814 :
1815 0 : pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
1816 0 : self.scheduling_policy = p;
1817 0 : }
1818 :
1819 0 : pub(crate) fn get_scheduling_policy(&self) -> ShardSchedulingPolicy {
1820 0 : self.scheduling_policy
1821 0 : }
1822 :
1823 0 : pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
1824 0 : // Ordering: always set last_error before advancing sequence, so that sequence
1825 0 : // waiters are guaranteed to see a Some value when they see an error.
1826 0 : *(self.last_error.lock().unwrap()) = Some(Arc::new(error));
1827 0 : self.error_waiter.advance(sequence);
1828 0 : }
1829 :
1830 0 : pub(crate) fn from_persistent(
1831 0 : tsp: TenantShardPersistence,
1832 0 : intent: IntentState,
1833 0 : ) -> anyhow::Result<Self> {
1834 0 : let tenant_shard_id = tsp.get_tenant_shard_id()?;
1835 0 : let shard_identity = tsp.get_shard_identity()?;
1836 :
1837 0 : metrics::METRICS_REGISTRY
1838 0 : .metrics_group
1839 0 : .storage_controller_tenant_shards
1840 0 : .inc();
1841 0 :
1842 0 : Ok(Self {
1843 0 : tenant_shard_id,
1844 0 : shard: shard_identity,
1845 0 : sequence: Sequence::initial(),
1846 0 : generation: tsp.generation.map(|g| Generation::new(g as u32)),
1847 0 : policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
1848 0 : intent,
1849 0 : observed: ObservedState::new(),
1850 0 : config: serde_json::from_str(&tsp.config).unwrap(),
1851 0 : reconciler: None,
1852 0 : splitting: tsp.splitting,
1853 0 : // Filled in during [`Service::startup_reconcile`]
1854 0 : importing: TimelineImportState::Idle,
1855 0 : waiter: Arc::new(SeqWait::new(Sequence::initial())),
1856 0 : error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
1857 0 : last_error: Arc::default(),
1858 0 : pending_compute_notification: false,
1859 0 : delayed_reconcile: false,
1860 0 : scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
1861 0 : preferred_node: None,
1862 0 : })
1863 0 : }
1864 :
1865 0 : pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
1866 0 : TenantShardPersistence {
1867 0 : tenant_id: self.tenant_shard_id.tenant_id.to_string(),
1868 0 : shard_number: self.tenant_shard_id.shard_number.0 as i32,
1869 0 : shard_count: self.tenant_shard_id.shard_count.literal() as i32,
1870 0 : shard_stripe_size: self.shard.stripe_size.0 as i32,
1871 0 : generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
1872 0 : generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
1873 0 : placement_policy: serde_json::to_string(&self.policy).unwrap(),
1874 0 : config: serde_json::to_string(&self.config).unwrap(),
1875 0 : splitting: SplitState::default(),
1876 0 : scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
1877 0 : preferred_az_id: self.intent.preferred_az_id.as_ref().map(|az| az.0.clone()),
1878 0 : }
1879 0 : }
1880 :
1881 12508 : pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> {
1882 12508 : self.intent.get_preferred_az()
1883 12508 : }
1884 :
1885 2 : pub(crate) fn set_preferred_az(
1886 2 : &mut self,
1887 2 : scheduler: &mut Scheduler,
1888 2 : preferred_az_id: Option<AvailabilityZone>,
1889 2 : ) {
1890 2 : self.intent.set_preferred_az(scheduler, preferred_az_id);
1891 2 : }
1892 :
1893 : /// Returns all the nodes to which this tenant shard is attached according to the
1894 : /// observed state and the generations. Return vector is sorted from latest generation
1895 : /// to earliest.
1896 0 : pub(crate) fn attached_locations(&self) -> Vec<(NodeId, Generation)> {
1897 0 : self.observed
1898 0 : .locations
1899 0 : .iter()
1900 0 : .filter_map(|(node_id, observed)| {
1901 : use LocationConfigMode::{AttachedMulti, AttachedSingle, AttachedStale};
1902 :
1903 0 : let conf = observed.conf.as_ref()?;
1904 :
1905 0 : match (conf.generation, conf.mode) {
1906 0 : (Some(gen_), AttachedMulti | AttachedSingle | AttachedStale) => {
1907 0 : Some((*node_id, gen_))
1908 : }
1909 0 : _ => None,
1910 : }
1911 0 : })
1912 0 : .sorted_by(|(_lhs_node_id, lhs_gen), (_rhs_node_id, rhs_gen)| {
1913 0 : lhs_gen.cmp(rhs_gen).reverse()
1914 0 : })
1915 0 : .map(|(node_id, gen_)| (node_id, Generation::new(gen_)))
1916 0 : .collect()
1917 0 : }
1918 :
1919 : /// Update the observed state of the tenant by applying incremental deltas
1920 : ///
1921 : /// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
1922 : /// They are then filtered in [`crate::service::Service::process_result`].
1923 0 : pub(crate) fn apply_observed_deltas(
1924 0 : &mut self,
1925 0 : deltas: impl Iterator<Item = ObservedStateDelta>,
1926 0 : ) {
1927 0 : for delta in deltas {
1928 0 : match delta {
1929 0 : ObservedStateDelta::Upsert(ups) => {
1930 0 : let (node_id, loc) = *ups;
1931 0 :
1932 0 : // If the generation of the observed location in the delta is lagging
1933 0 : // behind the current one, then we have a race condition and cannot
1934 0 : // be certain about the true observed state. Set the observed state
1935 0 : // to None in order to reflect this.
1936 0 : let crnt_gen = self
1937 0 : .observed
1938 0 : .locations
1939 0 : .get(&node_id)
1940 0 : .and_then(|loc| loc.conf.as_ref())
1941 0 : .and_then(|conf| conf.generation);
1942 0 : let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
1943 0 : match (crnt_gen, new_gen) {
1944 0 : (Some(crnt), Some(new)) if crnt_gen > new_gen => {
1945 0 : tracing::warn!(
1946 0 : "Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
1947 : node_id,
1948 : loc,
1949 : crnt,
1950 : new
1951 : );
1952 :
1953 0 : self.observed
1954 0 : .locations
1955 0 : .insert(node_id, ObservedStateLocation { conf: None });
1956 0 :
1957 0 : continue;
1958 : }
1959 0 : _ => {}
1960 : }
1961 :
1962 0 : if let Some(conf) = &loc.conf {
1963 0 : tracing::info!("Updating observed location {}: {:?}", node_id, conf);
1964 : } else {
1965 0 : tracing::info!("Setting observed location {} to None", node_id,)
1966 : }
1967 :
1968 0 : self.observed.locations.insert(node_id, loc);
1969 : }
1970 0 : ObservedStateDelta::Delete(node_id) => {
1971 0 : tracing::info!("Deleting observed location {}", node_id);
1972 0 : self.observed.locations.remove(&node_id);
1973 : }
1974 : }
1975 : }
1976 0 : }
1977 :
1978 : /// Returns true if the tenant shard is attached to a node that is outside the preferred AZ.
1979 : ///
1980 : /// If the shard does not have a preferred AZ, returns false.
1981 0 : pub(crate) fn is_attached_outside_preferred_az(&self, nodes: &HashMap<NodeId, Node>) -> bool {
1982 0 : self.intent
1983 0 : .get_attached()
1984 0 : .map(|node_id| {
1985 0 : Some(
1986 0 : nodes
1987 0 : .get(&node_id)
1988 0 : .expect("referenced node exists")
1989 0 : .get_availability_zone_id(),
1990 0 : ) != self.intent.preferred_az_id.as_ref()
1991 0 : })
1992 0 : .unwrap_or(false)
1993 0 : }
1994 : }
1995 :
1996 : impl Drop for TenantShard {
1997 12843 : fn drop(&mut self) {
1998 12843 : metrics::METRICS_REGISTRY
1999 12843 : .metrics_group
2000 12843 : .storage_controller_tenant_shards
2001 12843 : .dec();
2002 12843 : }
2003 : }
2004 :
2005 : #[cfg(test)]
2006 : pub(crate) mod tests {
2007 : use std::cell::RefCell;
2008 : use std::rc::Rc;
2009 :
2010 : use pageserver_api::controller_api::NodeAvailability;
2011 : use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber};
2012 : use rand::SeedableRng;
2013 : use rand::rngs::StdRng;
2014 : use utils::id::TenantId;
2015 :
2016 : use super::*;
2017 : use crate::scheduler::test_utils::make_test_nodes;
2018 :
2019 12 : fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
2020 12 : let tenant_id = TenantId::generate();
2021 12 : let shard_number = ShardNumber(0);
2022 12 : let shard_count = ShardCount::new(1);
2023 12 : let stripe_size = DEFAULT_STRIPE_SIZE;
2024 12 :
2025 12 : let tenant_shard_id = TenantShardId {
2026 12 : tenant_id,
2027 12 : shard_number,
2028 12 : shard_count,
2029 12 : };
2030 12 : TenantShard::new(
2031 12 : tenant_shard_id,
2032 12 : ShardIdentity::new(shard_number, shard_count, stripe_size).unwrap(),
2033 12 : policy,
2034 12 : None,
2035 12 : )
2036 12 : }
2037 :
2038 5004 : pub(crate) fn make_test_tenant(
2039 5004 : policy: PlacementPolicy,
2040 5004 : shard_count: ShardCount,
2041 5004 : preferred_az: Option<AvailabilityZone>,
2042 5004 : ) -> Vec<TenantShard> {
2043 5004 : make_test_tenant_with_id(TenantId::generate(), policy, shard_count, preferred_az)
2044 5004 : }
2045 :
2046 5007 : pub(crate) fn make_test_tenant_with_id(
2047 5007 : tenant_id: TenantId,
2048 5007 : policy: PlacementPolicy,
2049 5007 : shard_count: ShardCount,
2050 5007 : preferred_az: Option<AvailabilityZone>,
2051 5007 : ) -> Vec<TenantShard> {
2052 5007 : let stripe_size = DEFAULT_STRIPE_SIZE;
2053 5007 : (0..shard_count.count())
2054 12522 : .map(|i| {
2055 12522 : let shard_number = ShardNumber(i);
2056 12522 :
2057 12522 : let tenant_shard_id = TenantShardId {
2058 12522 : tenant_id,
2059 12522 : shard_number,
2060 12522 : shard_count,
2061 12522 : };
2062 12522 : TenantShard::new(
2063 12522 : tenant_shard_id,
2064 12522 : ShardIdentity::new(shard_number, shard_count, stripe_size).unwrap(),
2065 12522 : policy.clone(),
2066 12522 : preferred_az.clone(),
2067 12522 : )
2068 12522 : })
2069 5007 : .collect()
2070 5007 : }
2071 :
2072 : /// Test the scheduling behaviors used when a tenant configured for HA is subject
2073 : /// to nodes being marked offline.
2074 : #[test]
2075 1 : fn tenant_ha_scheduling() -> anyhow::Result<()> {
2076 1 : // Start with three nodes. Our tenant will only use two. The third one is
2077 1 : // expected to remain unused.
2078 1 : let mut nodes = make_test_nodes(3, &[]);
2079 1 :
2080 1 : let mut scheduler = Scheduler::new(nodes.values());
2081 1 : let mut context = ScheduleContext::default();
2082 1 :
2083 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
2084 1 : tenant_shard
2085 1 : .schedule(&mut scheduler, &mut context)
2086 1 : .expect("we have enough nodes, scheduling should work");
2087 1 :
2088 1 : // Expect to initially be schedule on to different nodes
2089 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
2090 1 : assert!(tenant_shard.intent.attached.is_some());
2091 :
2092 1 : let attached_node_id = tenant_shard.intent.attached.unwrap();
2093 1 : let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
2094 1 : assert_ne!(attached_node_id, secondary_node_id);
2095 :
2096 : // Notifying the attached node is offline should demote it to a secondary
2097 1 : let changed = tenant_shard
2098 1 : .intent
2099 1 : .demote_attached(&mut scheduler, attached_node_id);
2100 1 : assert!(changed);
2101 1 : assert!(tenant_shard.intent.attached.is_none());
2102 1 : assert_eq!(tenant_shard.intent.secondary.len(), 2);
2103 :
2104 : // Update the scheduler state to indicate the node is offline
2105 1 : nodes
2106 1 : .get_mut(&attached_node_id)
2107 1 : .unwrap()
2108 1 : .set_availability(NodeAvailability::Offline);
2109 1 : scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
2110 1 :
2111 1 : // Scheduling the node should promote the still-available secondary node to attached
2112 1 : tenant_shard
2113 1 : .schedule(&mut scheduler, &mut context)
2114 1 : .expect("active nodes are available");
2115 1 : assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
2116 :
2117 : // The original attached node should have been retained as a secondary
2118 1 : assert_eq!(
2119 1 : *tenant_shard.intent.secondary.iter().last().unwrap(),
2120 1 : attached_node_id
2121 1 : );
2122 :
2123 1 : tenant_shard.intent.clear(&mut scheduler);
2124 1 :
2125 1 : Ok(())
2126 1 : }
2127 :
2128 : #[test]
2129 1 : fn intent_from_observed() -> anyhow::Result<()> {
2130 1 : let nodes = make_test_nodes(3, &[]);
2131 1 : let mut scheduler = Scheduler::new(nodes.values());
2132 1 :
2133 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
2134 1 :
2135 1 : tenant_shard.observed.locations.insert(
2136 1 : NodeId(3),
2137 1 : ObservedStateLocation {
2138 1 : conf: Some(LocationConfig {
2139 1 : mode: LocationConfigMode::AttachedMulti,
2140 1 : generation: Some(2),
2141 1 : secondary_conf: None,
2142 1 : shard_number: tenant_shard.shard.number.0,
2143 1 : shard_count: tenant_shard.shard.count.literal(),
2144 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
2145 1 : tenant_conf: TenantConfig::default(),
2146 1 : }),
2147 1 : },
2148 1 : );
2149 1 :
2150 1 : tenant_shard.observed.locations.insert(
2151 1 : NodeId(2),
2152 1 : ObservedStateLocation {
2153 1 : conf: Some(LocationConfig {
2154 1 : mode: LocationConfigMode::AttachedStale,
2155 1 : generation: Some(1),
2156 1 : secondary_conf: None,
2157 1 : shard_number: tenant_shard.shard.number.0,
2158 1 : shard_count: tenant_shard.shard.count.literal(),
2159 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
2160 1 : tenant_conf: TenantConfig::default(),
2161 1 : }),
2162 1 : },
2163 1 : );
2164 1 :
2165 1 : tenant_shard.intent_from_observed(&mut scheduler);
2166 1 :
2167 1 : // The highest generationed attached location gets used as attached
2168 1 : assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
2169 : // Other locations get used as secondary
2170 1 : assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
2171 :
2172 1 : scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
2173 :
2174 1 : tenant_shard.intent.clear(&mut scheduler);
2175 1 : Ok(())
2176 1 : }
2177 :
2178 : #[test]
2179 1 : fn scheduling_mode() -> anyhow::Result<()> {
2180 1 : let nodes = make_test_nodes(3, &[]);
2181 1 : let mut scheduler = Scheduler::new(nodes.values());
2182 1 :
2183 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
2184 1 :
2185 1 : // In pause mode, schedule() shouldn't do anything
2186 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
2187 1 : assert!(
2188 1 : tenant_shard
2189 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
2190 1 : .is_ok()
2191 1 : );
2192 1 : assert!(tenant_shard.intent.all_pageservers().is_empty());
2193 :
2194 : // In active mode, schedule() works
2195 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
2196 1 : assert!(
2197 1 : tenant_shard
2198 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
2199 1 : .is_ok()
2200 1 : );
2201 1 : assert!(!tenant_shard.intent.all_pageservers().is_empty());
2202 :
2203 1 : tenant_shard.intent.clear(&mut scheduler);
2204 1 : Ok(())
2205 1 : }
2206 :
2207 : #[test]
2208 : /// Simple case: moving attachment to somewhere better where we already have a secondary
2209 1 : fn optimize_attachment_simple() -> anyhow::Result<()> {
2210 1 : let nodes = make_test_nodes(
2211 1 : 3,
2212 1 : &[
2213 1 : AvailabilityZone("az-a".to_string()),
2214 1 : AvailabilityZone("az-b".to_string()),
2215 1 : AvailabilityZone("az-c".to_string()),
2216 1 : ],
2217 1 : );
2218 1 : let mut scheduler = Scheduler::new(nodes.values());
2219 1 :
2220 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2221 1 : shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2222 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
2223 1 : shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2224 1 :
2225 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
2226 1 : // on different nodes.
2227 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
2228 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(1));
2229 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
2230 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
2231 :
2232 1 : fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
2233 1 : let mut schedule_context = ScheduleContext::default();
2234 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
2235 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
2236 1 : schedule_context
2237 1 : }
2238 :
2239 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2240 1 : let optimization_a = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2241 1 : assert_eq!(
2242 1 : optimization_a,
2243 1 : Some(ScheduleOptimization {
2244 1 : sequence: shard_a.sequence,
2245 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2246 1 : old_attached_node_id: NodeId(2),
2247 1 : new_attached_node_id: NodeId(1)
2248 1 : })
2249 1 : })
2250 1 : );
2251 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
2252 1 :
2253 1 : // // Either shard should recognize that it has the option to switch to a secondary location where there
2254 1 : // // would be no other shards from the same tenant, and request to do so.
2255 1 : // assert_eq!(
2256 1 : // optimization_a_prepare,
2257 1 : // Some(ScheduleOptimization {
2258 1 : // sequence: shard_a.sequence,
2259 1 : // action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
2260 1 : // })
2261 1 : // );
2262 1 : // shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
2263 1 :
2264 1 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2265 1 : // let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2266 1 : // assert_eq!(
2267 1 : // optimization_a_migrate,
2268 1 : // Some(ScheduleOptimization {
2269 1 : // sequence: shard_a.sequence,
2270 1 : // action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2271 1 : // old_attached_node_id: NodeId(1),
2272 1 : // new_attached_node_id: NodeId(2)
2273 1 : // })
2274 1 : // })
2275 1 : // );
2276 1 : // shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
2277 1 :
2278 1 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2279 1 : // let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2280 1 : // assert_eq!(
2281 1 : // optimization_a_cleanup,
2282 1 : // Some(ScheduleOptimization {
2283 1 : // sequence: shard_a.sequence,
2284 1 : // action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
2285 1 : // })
2286 1 : // );
2287 1 : // shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
2288 1 :
2289 1 : // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
2290 1 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2291 1 : // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
2292 1 :
2293 1 : shard_a.intent.clear(&mut scheduler);
2294 1 : shard_b.intent.clear(&mut scheduler);
2295 1 :
2296 1 : Ok(())
2297 1 : }
2298 :
2299 : #[test]
2300 : /// Complicated case: moving attachment to somewhere better where we do not have a secondary
2301 : /// already, creating one as needed.
2302 1 : fn optimize_attachment_multistep() -> anyhow::Result<()> {
2303 1 : let nodes = make_test_nodes(
2304 1 : 3,
2305 1 : &[
2306 1 : AvailabilityZone("az-a".to_string()),
2307 1 : AvailabilityZone("az-b".to_string()),
2308 1 : AvailabilityZone("az-c".to_string()),
2309 1 : ],
2310 1 : );
2311 1 : let mut scheduler = Scheduler::new(nodes.values());
2312 1 :
2313 1 : // Two shards of a tenant that wants to be in AZ A
2314 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2315 1 : shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2316 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
2317 1 : shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2318 1 :
2319 1 : // Both shards are initially attached in non-home AZ _and_ have secondaries in non-home AZs
2320 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
2321 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
2322 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(3)));
2323 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
2324 :
2325 3 : fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
2326 3 : let mut schedule_context = ScheduleContext::default();
2327 3 : schedule_context.avoid(&shard_a.intent.all_pageservers());
2328 3 : schedule_context.avoid(&shard_b.intent.all_pageservers());
2329 3 : schedule_context
2330 3 : }
2331 :
2332 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2333 1 : let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2334 1 : assert_eq!(
2335 1 : optimization_a_prepare,
2336 1 : Some(ScheduleOptimization {
2337 1 : sequence: shard_a.sequence,
2338 1 : action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
2339 1 : })
2340 1 : );
2341 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
2342 1 :
2343 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2344 1 : let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2345 1 : assert_eq!(
2346 1 : optimization_a_migrate,
2347 1 : Some(ScheduleOptimization {
2348 1 : sequence: shard_a.sequence,
2349 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2350 1 : old_attached_node_id: NodeId(2),
2351 1 : new_attached_node_id: NodeId(1)
2352 1 : })
2353 1 : })
2354 1 : );
2355 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
2356 1 :
2357 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2358 1 : let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2359 1 : assert_eq!(
2360 1 : optimization_a_cleanup,
2361 1 : Some(ScheduleOptimization {
2362 1 : sequence: shard_a.sequence,
2363 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
2364 1 : })
2365 1 : );
2366 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
2367 1 :
2368 1 : // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
2369 1 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2370 1 : // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
2371 1 :
2372 1 : shard_a.intent.clear(&mut scheduler);
2373 1 : shard_b.intent.clear(&mut scheduler);
2374 1 :
2375 1 : Ok(())
2376 1 : }
2377 :
2378 : #[test]
2379 : /// How the optimisation code handles a shard with a preferred node set; this is an example
2380 : /// of the multi-step migration, but driven by a different input.
2381 1 : fn optimize_attachment_multi_preferred_node() -> anyhow::Result<()> {
2382 1 : let nodes = make_test_nodes(
2383 1 : 4,
2384 1 : &[
2385 1 : AvailabilityZone("az-a".to_string()),
2386 1 : AvailabilityZone("az-a".to_string()),
2387 1 : AvailabilityZone("az-b".to_string()),
2388 1 : AvailabilityZone("az-b".to_string()),
2389 1 : ],
2390 1 : );
2391 1 : let mut scheduler = Scheduler::new(nodes.values());
2392 1 :
2393 1 : // Two shards of a tenant that wants to be in AZ A
2394 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2395 1 : shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2396 1 :
2397 1 : // Initially attached in a stable location
2398 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
2399 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
2400 1 :
2401 1 : // Set the preferred node to node 2, an equally high scoring node to its current location
2402 1 : shard_a.preferred_node = Some(NodeId(2));
2403 :
2404 3 : fn make_schedule_context(shard_a: &TenantShard) -> ScheduleContext {
2405 3 : let mut schedule_context = ScheduleContext::default();
2406 3 : schedule_context.avoid(&shard_a.intent.all_pageservers());
2407 3 : schedule_context
2408 3 : }
2409 :
2410 1 : let schedule_context = make_schedule_context(&shard_a);
2411 1 : let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2412 1 : assert_eq!(
2413 1 : optimization_a_prepare,
2414 1 : Some(ScheduleOptimization {
2415 1 : sequence: shard_a.sequence,
2416 1 : action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
2417 1 : })
2418 1 : );
2419 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
2420 1 :
2421 1 : // The first step of the optimisation should not have cleared the preferred node
2422 1 : assert_eq!(shard_a.preferred_node, Some(NodeId(2)));
2423 :
2424 1 : let schedule_context = make_schedule_context(&shard_a);
2425 1 : let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2426 1 : assert_eq!(
2427 1 : optimization_a_migrate,
2428 1 : Some(ScheduleOptimization {
2429 1 : sequence: shard_a.sequence,
2430 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2431 1 : old_attached_node_id: NodeId(1),
2432 1 : new_attached_node_id: NodeId(2)
2433 1 : })
2434 1 : })
2435 1 : );
2436 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
2437 1 :
2438 1 : // The cutover step of the optimisation should have cleared the preferred node
2439 1 : assert_eq!(shard_a.preferred_node, None);
2440 :
2441 1 : let schedule_context = make_schedule_context(&shard_a);
2442 1 : let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2443 1 : assert_eq!(
2444 1 : optimization_a_cleanup,
2445 1 : Some(ScheduleOptimization {
2446 1 : sequence: shard_a.sequence,
2447 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
2448 1 : })
2449 1 : );
2450 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
2451 1 :
2452 1 : shard_a.intent.clear(&mut scheduler);
2453 1 :
2454 1 : Ok(())
2455 1 : }
2456 :
2457 : #[test]
2458 : /// Check that multi-step migration works when moving to somewhere that is only better by
2459 : /// 1 AffinityScore -- this ensures that we don't have a bug like the intermediate secondary
2460 : /// counting toward the affinity score such that it prevents the rest of the migration from happening.
2461 1 : fn optimize_attachment_marginal() -> anyhow::Result<()> {
2462 1 : let nodes = make_test_nodes(2, &[]);
2463 1 : let mut scheduler = Scheduler::new(nodes.values());
2464 1 :
2465 1 : // Multi-sharded tenant, we will craft a situation where affinity
2466 1 : // scores differ only slightly
2467 1 : let mut shards = make_test_tenant(PlacementPolicy::Attached(0), ShardCount::new(4), None);
2468 1 :
2469 1 : // 1 attached on node 1
2470 1 : shards[0]
2471 1 : .intent
2472 1 : .set_attached(&mut scheduler, Some(NodeId(1)));
2473 1 : // 3 attached on node 2
2474 1 : shards[1]
2475 1 : .intent
2476 1 : .set_attached(&mut scheduler, Some(NodeId(2)));
2477 1 : shards[2]
2478 1 : .intent
2479 1 : .set_attached(&mut scheduler, Some(NodeId(2)));
2480 1 : shards[3]
2481 1 : .intent
2482 1 : .set_attached(&mut scheduler, Some(NodeId(2)));
2483 :
2484 : // The scheduler should figure out that we need to:
2485 : // - Create a secondary for shard 3 on node 1
2486 : // - Migrate shard 3 to node 1
2487 : // - Remove shard 3's location on node 2
2488 :
2489 4 : fn make_schedule_context(shards: &Vec<TenantShard>) -> ScheduleContext {
2490 4 : let mut schedule_context = ScheduleContext::default();
2491 20 : for shard in shards {
2492 16 : schedule_context.avoid(&shard.intent.all_pageservers());
2493 16 : }
2494 4 : schedule_context
2495 4 : }
2496 :
2497 1 : let schedule_context = make_schedule_context(&shards);
2498 1 : let optimization_a_prepare =
2499 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context);
2500 1 : assert_eq!(
2501 1 : optimization_a_prepare,
2502 1 : Some(ScheduleOptimization {
2503 1 : sequence: shards[1].sequence,
2504 1 : action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
2505 1 : })
2506 1 : );
2507 1 : shards[1].apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
2508 1 :
2509 1 : let schedule_context = make_schedule_context(&shards);
2510 1 : let optimization_a_migrate =
2511 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context);
2512 1 : assert_eq!(
2513 1 : optimization_a_migrate,
2514 1 : Some(ScheduleOptimization {
2515 1 : sequence: shards[1].sequence,
2516 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2517 1 : old_attached_node_id: NodeId(2),
2518 1 : new_attached_node_id: NodeId(1)
2519 1 : })
2520 1 : })
2521 1 : );
2522 1 : shards[1].apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
2523 1 :
2524 1 : let schedule_context = make_schedule_context(&shards);
2525 1 : let optimization_a_cleanup =
2526 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context);
2527 1 : assert_eq!(
2528 1 : optimization_a_cleanup,
2529 1 : Some(ScheduleOptimization {
2530 1 : sequence: shards[1].sequence,
2531 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
2532 1 : })
2533 1 : );
2534 1 : shards[1].apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
2535 1 :
2536 1 : // Everything should be stable now
2537 1 : let schedule_context = make_schedule_context(&shards);
2538 1 : assert_eq!(
2539 1 : shards[0].optimize_attachment(&mut scheduler, &schedule_context),
2540 1 : None
2541 1 : );
2542 1 : assert_eq!(
2543 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context),
2544 1 : None
2545 1 : );
2546 1 : assert_eq!(
2547 1 : shards[2].optimize_attachment(&mut scheduler, &schedule_context),
2548 1 : None
2549 1 : );
2550 1 : assert_eq!(
2551 1 : shards[3].optimize_attachment(&mut scheduler, &schedule_context),
2552 1 : None
2553 1 : );
2554 :
2555 5 : for mut shard in shards {
2556 4 : shard.intent.clear(&mut scheduler);
2557 4 : }
2558 :
2559 1 : Ok(())
2560 1 : }
2561 :
2562 : #[test]
2563 1 : fn optimize_secondary() -> anyhow::Result<()> {
2564 1 : let nodes = make_test_nodes(4, &[]);
2565 1 : let mut scheduler = Scheduler::new(nodes.values());
2566 1 :
2567 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2568 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
2569 1 :
2570 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
2571 1 : // on different nodes.
2572 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
2573 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
2574 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
2575 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
2576 1 :
2577 1 : let mut schedule_context = ScheduleContext::default();
2578 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
2579 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
2580 1 :
2581 1 : let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);
2582 1 :
2583 1 : // Since there is a node with no locations available, the node with two locations for the
2584 1 : // same tenant should generate an optimization to move one away
2585 1 : assert_eq!(
2586 1 : optimization_a,
2587 1 : Some(ScheduleOptimization {
2588 1 : sequence: shard_a.sequence,
2589 1 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
2590 1 : old_node_id: NodeId(3),
2591 1 : new_node_id: NodeId(4)
2592 1 : })
2593 1 : })
2594 1 : );
2595 :
2596 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
2597 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
2598 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
2599 :
2600 1 : shard_a.intent.clear(&mut scheduler);
2601 1 : shard_b.intent.clear(&mut scheduler);
2602 1 :
2603 1 : Ok(())
2604 1 : }
2605 :
2606 : /// Test how the optimisation code behaves with an extra secondary
2607 : #[test]
2608 1 : fn optimize_removes_secondary() -> anyhow::Result<()> {
2609 1 : let az_a_tag = AvailabilityZone("az-a".to_string());
2610 1 : let az_b_tag = AvailabilityZone("az-b".to_string());
2611 1 : let mut nodes = make_test_nodes(
2612 1 : 4,
2613 1 : &[
2614 1 : az_a_tag.clone(),
2615 1 : az_b_tag.clone(),
2616 1 : az_a_tag.clone(),
2617 1 : az_b_tag.clone(),
2618 1 : ],
2619 1 : );
2620 1 : let mut scheduler = Scheduler::new(nodes.values());
2621 1 :
2622 1 : let mut schedule_context = ScheduleContext::default();
2623 1 :
2624 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2625 1 : shard_a.intent.preferred_az_id = Some(az_a_tag.clone());
2626 1 : shard_a
2627 1 : .schedule(&mut scheduler, &mut schedule_context)
2628 1 : .unwrap();
2629 1 :
2630 1 : // Attached on node 1, secondary on node 2
2631 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
2632 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(2)]);
2633 :
2634 : // Initially optimiser is idle
2635 1 : assert_eq!(
2636 1 : shard_a.optimize_attachment(&mut scheduler, &schedule_context),
2637 1 : None
2638 1 : );
2639 1 : assert_eq!(
2640 1 : shard_a.optimize_secondary(&mut scheduler, &schedule_context),
2641 1 : None
2642 1 : );
2643 :
2644 : // A spare secondary in the home AZ: it should be removed -- this is the situation when we're midway through a graceful migration, after cutting over
2645 : // to our new location
2646 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
2647 1 : let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2648 1 : assert_eq!(
2649 1 : optimization,
2650 1 : Some(ScheduleOptimization {
2651 1 : sequence: shard_a.sequence,
2652 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
2653 1 : })
2654 1 : );
2655 1 : shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
2656 1 :
2657 1 : // A spare secondary in the non-home AZ, and one of them is offline
2658 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(4));
2659 1 : nodes
2660 1 : .get_mut(&NodeId(4))
2661 1 : .unwrap()
2662 1 : .set_availability(NodeAvailability::Offline);
2663 1 : scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
2664 1 : let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2665 1 : assert_eq!(
2666 1 : optimization,
2667 1 : Some(ScheduleOptimization {
2668 1 : sequence: shard_a.sequence,
2669 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(4))
2670 1 : })
2671 1 : );
2672 1 : shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
2673 1 :
2674 1 : // A spare secondary when should have none
2675 1 : shard_a.policy = PlacementPolicy::Attached(0);
2676 1 : let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2677 1 : assert_eq!(
2678 1 : optimization,
2679 1 : Some(ScheduleOptimization {
2680 1 : sequence: shard_a.sequence,
2681 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
2682 1 : })
2683 1 : );
2684 1 : shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
2685 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
2686 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![]);
2687 :
2688 : // Check that in secondary mode, we preserve the secondary in the preferred AZ
2689 1 : let mut schedule_context = ScheduleContext::default(); // Fresh context, we're about to call schedule()
2690 1 : shard_a.policy = PlacementPolicy::Secondary;
2691 1 : shard_a
2692 1 : .schedule(&mut scheduler, &mut schedule_context)
2693 1 : .unwrap();
2694 1 : assert_eq!(shard_a.intent.get_attached(), &None);
2695 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
2696 1 : assert_eq!(
2697 1 : shard_a.optimize_attachment(&mut scheduler, &schedule_context),
2698 1 : None
2699 1 : );
2700 1 : assert_eq!(
2701 1 : shard_a.optimize_secondary(&mut scheduler, &schedule_context),
2702 1 : None
2703 1 : );
2704 :
2705 1 : shard_a.intent.clear(&mut scheduler);
2706 1 :
2707 1 : Ok(())
2708 1 : }
2709 :
2710 : // Optimize til quiescent: this emulates what Service::optimize_all does, when
2711 : // called repeatedly in the background.
2712 : // Returns the applied optimizations
2713 3 : fn optimize_til_idle(
2714 3 : scheduler: &mut Scheduler,
2715 3 : shards: &mut [TenantShard],
2716 3 : ) -> Vec<ScheduleOptimization> {
2717 3 : let mut loop_n = 0;
2718 3 : let mut optimizations = Vec::default();
2719 : loop {
2720 6 : let mut schedule_context = ScheduleContext::default();
2721 6 : let mut any_changed = false;
2722 :
2723 24 : for shard in shards.iter() {
2724 24 : schedule_context.avoid(&shard.intent.all_pageservers());
2725 24 : }
2726 :
2727 15 : for shard in shards.iter_mut() {
2728 15 : let optimization = shard.optimize_attachment(scheduler, &schedule_context);
2729 15 : tracing::info!(
2730 0 : "optimize_attachment({})={:?}",
2731 : shard.tenant_shard_id,
2732 : optimization
2733 : );
2734 15 : if let Some(optimization) = optimization {
2735 : // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
2736 3 : assert!(shard.maybe_optimizable(scheduler, &schedule_context));
2737 3 : optimizations.push(optimization.clone());
2738 3 : shard.apply_optimization(scheduler, optimization);
2739 3 : any_changed = true;
2740 3 : break;
2741 12 : }
2742 12 :
2743 12 : let optimization = shard.optimize_secondary(scheduler, &schedule_context);
2744 12 : tracing::info!(
2745 0 : "optimize_secondary({})={:?}",
2746 : shard.tenant_shard_id,
2747 : optimization
2748 : );
2749 12 : if let Some(optimization) = optimization {
2750 : // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
2751 0 : assert!(shard.maybe_optimizable(scheduler, &schedule_context));
2752 :
2753 0 : optimizations.push(optimization.clone());
2754 0 : shard.apply_optimization(scheduler, optimization);
2755 0 : any_changed = true;
2756 0 : break;
2757 12 : }
2758 : }
2759 :
2760 6 : if !any_changed {
2761 3 : break;
2762 3 : }
2763 3 :
2764 3 : // Assert no infinite loop
2765 3 : loop_n += 1;
2766 3 : assert!(loop_n < 1000);
2767 : }
2768 :
2769 3 : optimizations
2770 3 : }
2771 :
2772 : /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
2773 : /// that it converges.
2774 : #[test]
2775 1 : fn optimize_add_nodes() -> anyhow::Result<()> {
2776 1 : let nodes = make_test_nodes(
2777 1 : 9,
2778 1 : &[
2779 1 : // Initial 6 nodes
2780 1 : AvailabilityZone("az-a".to_string()),
2781 1 : AvailabilityZone("az-a".to_string()),
2782 1 : AvailabilityZone("az-b".to_string()),
2783 1 : AvailabilityZone("az-b".to_string()),
2784 1 : AvailabilityZone("az-c".to_string()),
2785 1 : AvailabilityZone("az-c".to_string()),
2786 1 : // Three we will add later
2787 1 : AvailabilityZone("az-a".to_string()),
2788 1 : AvailabilityZone("az-b".to_string()),
2789 1 : AvailabilityZone("az-c".to_string()),
2790 1 : ],
2791 1 : );
2792 1 :
2793 1 : // Only show the scheduler two nodes in each AZ to start with
2794 1 : let mut scheduler = Scheduler::new([].iter());
2795 7 : for i in 1..=6 {
2796 6 : scheduler.node_upsert(nodes.get(&NodeId(i)).unwrap());
2797 6 : }
2798 :
2799 1 : let mut shards = make_test_tenant(
2800 1 : PlacementPolicy::Attached(1),
2801 1 : ShardCount::new(4),
2802 1 : Some(AvailabilityZone("az-a".to_string())),
2803 1 : );
2804 1 : let mut schedule_context = ScheduleContext::default();
2805 5 : for shard in &mut shards {
2806 4 : assert!(
2807 4 : shard
2808 4 : .schedule(&mut scheduler, &mut schedule_context)
2809 4 : .is_ok()
2810 4 : );
2811 : }
2812 :
2813 : // Initial: attached locations land in the tenant's home AZ.
2814 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
2815 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
2816 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
2817 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
2818 :
2819 : // Initial: secondary locations in a remote AZ
2820 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
2821 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
2822 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
2823 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
2824 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
2825 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
2826 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
2827 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
2828 :
2829 : // Add another three nodes: we should see the shards spread out when their optimize
2830 : // methods are called
2831 1 : scheduler.node_upsert(nodes.get(&NodeId(7)).unwrap());
2832 1 : scheduler.node_upsert(nodes.get(&NodeId(8)).unwrap());
2833 1 : scheduler.node_upsert(nodes.get(&NodeId(9)).unwrap());
2834 1 : optimize_til_idle(&mut scheduler, &mut shards);
2835 1 :
2836 1 : // We expect one attached location was moved to the new node in the tenant's home AZ
2837 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(7)), 1);
2838 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(7)), 1);
2839 : // The original node has one less attached shard
2840 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
2841 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
2842 :
2843 : // One of the original nodes still has two attachments, since there are an odd number of nodes
2844 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
2845 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
2846 :
2847 : // None of our secondaries moved, since we already had enough nodes for those to be
2848 : // scheduled perfectly
2849 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
2850 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
2851 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
2852 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
2853 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
2854 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
2855 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
2856 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
2857 :
2858 4 : for shard in shards.iter_mut() {
2859 4 : shard.intent.clear(&mut scheduler);
2860 4 : }
2861 :
2862 1 : Ok(())
2863 1 : }
2864 :
2865 : /// Test that initial shard scheduling is optimal. By optimal we mean
2866 : /// that the optimizer cannot find a way to improve it.
2867 : ///
2868 : /// This test is an example of the scheduling issue described in
2869 : /// https://github.com/neondatabase/neon/issues/8969
2870 : #[test]
2871 1 : fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
2872 : use itertools::Itertools;
2873 :
2874 1 : let nodes = make_test_nodes(2, &[]);
2875 1 :
2876 1 : let mut scheduler = Scheduler::new([].iter());
2877 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
2878 1 : scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
2879 1 :
2880 1 : let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
2881 1 : let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
2882 1 :
2883 1 : let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
2884 1 : let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
2885 1 :
2886 4 : let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
2887 4 : let b_shards_with_context = b.iter_mut().map(|shard| (shard, b_context.clone()));
2888 1 :
2889 1 : let schedule_order = a_shards_with_context.interleave(b_shards_with_context);
2890 :
2891 9 : for (shard, context) in schedule_order {
2892 8 : let context = &mut *context.borrow_mut();
2893 8 : shard.schedule(&mut scheduler, context).unwrap();
2894 8 : }
2895 :
2896 1 : let applied_to_a = optimize_til_idle(&mut scheduler, &mut a);
2897 1 : assert_eq!(applied_to_a, vec![]);
2898 :
2899 1 : let applied_to_b = optimize_til_idle(&mut scheduler, &mut b);
2900 1 : assert_eq!(applied_to_b, vec![]);
2901 :
2902 8 : for shard in a.iter_mut().chain(b.iter_mut()) {
2903 8 : shard.intent.clear(&mut scheduler);
2904 8 : }
2905 :
2906 1 : Ok(())
2907 1 : }
2908 :
2909 : #[test]
2910 1 : fn random_az_shard_scheduling() -> anyhow::Result<()> {
2911 : use rand::seq::SliceRandom;
2912 :
2913 51 : for seed in 0..50 {
2914 50 : eprintln!("Running test with seed {seed}");
2915 50 : let mut rng = StdRng::seed_from_u64(seed);
2916 50 :
2917 50 : let az_a_tag = AvailabilityZone("az-a".to_string());
2918 50 : let az_b_tag = AvailabilityZone("az-b".to_string());
2919 50 : let azs = [az_a_tag, az_b_tag];
2920 50 : let nodes = make_test_nodes(4, &azs);
2921 50 : let mut shards_per_az: HashMap<AvailabilityZone, u32> = HashMap::new();
2922 50 :
2923 50 : let mut scheduler = Scheduler::new([].iter());
2924 200 : for node in nodes.values() {
2925 200 : scheduler.node_upsert(node);
2926 200 : }
2927 :
2928 50 : let mut shards = Vec::default();
2929 50 : let mut contexts = Vec::default();
2930 50 : let mut az_picker = azs.iter().cycle().cloned();
2931 5050 : for i in 0..100 {
2932 5000 : let az = az_picker.next().unwrap();
2933 5000 : let shard_count = i % 4 + 1;
2934 5000 : *shards_per_az.entry(az.clone()).or_default() += shard_count;
2935 5000 :
2936 5000 : let tenant_shards = make_test_tenant(
2937 5000 : PlacementPolicy::Attached(1),
2938 5000 : ShardCount::new(shard_count.try_into().unwrap()),
2939 5000 : Some(az),
2940 5000 : );
2941 5000 : let context = Rc::new(RefCell::new(ScheduleContext::default()));
2942 5000 :
2943 5000 : contexts.push(context.clone());
2944 5000 : let with_ctx = tenant_shards
2945 5000 : .into_iter()
2946 12500 : .map(|shard| (shard, context.clone()));
2947 17500 : for shard_with_ctx in with_ctx {
2948 12500 : shards.push(shard_with_ctx);
2949 12500 : }
2950 : }
2951 :
2952 50 : shards.shuffle(&mut rng);
2953 :
2954 : #[derive(Default, Debug)]
2955 : struct NodeStats {
2956 : attachments: u32,
2957 : secondaries: u32,
2958 : }
2959 :
2960 50 : let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::default();
2961 50 : let mut attachments_in_wrong_az = 0;
2962 50 : let mut secondaries_in_wrong_az = 0;
2963 :
2964 12550 : for (shard, context) in &mut shards {
2965 12500 : let context = &mut *context.borrow_mut();
2966 12500 : shard.schedule(&mut scheduler, context).unwrap();
2967 12500 :
2968 12500 : let attached_node = shard.intent.get_attached().unwrap();
2969 12500 : let stats = node_stats.entry(attached_node).or_default();
2970 12500 : stats.attachments += 1;
2971 12500 :
2972 12500 : let secondary_node = *shard.intent.get_secondary().first().unwrap();
2973 12500 : let stats = node_stats.entry(secondary_node).or_default();
2974 12500 : stats.secondaries += 1;
2975 12500 :
2976 12500 : let attached_node_az = nodes
2977 12500 : .get(&attached_node)
2978 12500 : .unwrap()
2979 12500 : .get_availability_zone_id();
2980 12500 : let secondary_node_az = nodes
2981 12500 : .get(&secondary_node)
2982 12500 : .unwrap()
2983 12500 : .get_availability_zone_id();
2984 12500 : let preferred_az = shard.preferred_az().unwrap();
2985 12500 :
2986 12500 : if attached_node_az != preferred_az {
2987 0 : eprintln!(
2988 0 : "{} attachment was scheduled in AZ {} but preferred AZ {}",
2989 0 : shard.tenant_shard_id, attached_node_az, preferred_az
2990 0 : );
2991 0 : attachments_in_wrong_az += 1;
2992 12500 : }
2993 :
2994 12500 : if secondary_node_az == preferred_az {
2995 0 : eprintln!(
2996 0 : "{} secondary was scheduled in AZ {} which matches preference",
2997 0 : shard.tenant_shard_id, attached_node_az
2998 0 : );
2999 0 : secondaries_in_wrong_az += 1;
3000 12500 : }
3001 : }
3002 :
3003 50 : let mut violations = Vec::default();
3004 50 :
3005 50 : if attachments_in_wrong_az > 0 {
3006 0 : violations.push(format!(
3007 0 : "{} attachments scheduled to the incorrect AZ",
3008 0 : attachments_in_wrong_az
3009 0 : ));
3010 50 : }
3011 :
3012 50 : if secondaries_in_wrong_az > 0 {
3013 0 : violations.push(format!(
3014 0 : "{} secondaries scheduled to the incorrect AZ",
3015 0 : secondaries_in_wrong_az
3016 0 : ));
3017 50 : }
3018 :
3019 50 : eprintln!(
3020 50 : "attachments_in_wrong_az={} secondaries_in_wrong_az={}",
3021 50 : attachments_in_wrong_az, secondaries_in_wrong_az
3022 50 : );
3023 :
3024 250 : for (node_id, stats) in &node_stats {
3025 200 : let node_az = nodes.get(node_id).unwrap().get_availability_zone_id();
3026 200 : let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2;
3027 200 : let allowed_attachment_load =
3028 200 : (ideal_attachment_load - 1)..(ideal_attachment_load + 2);
3029 200 :
3030 200 : if !allowed_attachment_load.contains(&stats.attachments) {
3031 0 : violations.push(format!(
3032 0 : "Found {} attachments on node {}, but expected {}",
3033 0 : stats.attachments, node_id, ideal_attachment_load
3034 0 : ));
3035 200 : }
3036 :
3037 200 : eprintln!(
3038 200 : "{}: attachments={} secondaries={} ideal_attachment_load={}",
3039 200 : node_id, stats.attachments, stats.secondaries, ideal_attachment_load
3040 200 : );
3041 : }
3042 :
3043 50 : assert!(violations.is_empty(), "{violations:?}");
3044 :
3045 12550 : for (mut shard, _ctx) in shards {
3046 12500 : shard.intent.clear(&mut scheduler);
3047 12500 : }
3048 : }
3049 1 : Ok(())
3050 1 : }
3051 :
3052 : /// Check how the shard's scheduling behaves when in PlacementPolicy::Secondary mode.
3053 : #[test]
3054 1 : fn tenant_secondary_scheduling() -> anyhow::Result<()> {
3055 1 : let az_a = AvailabilityZone("az-a".to_string());
3056 1 : let nodes = make_test_nodes(
3057 1 : 3,
3058 1 : &[
3059 1 : az_a.clone(),
3060 1 : AvailabilityZone("az-b".to_string()),
3061 1 : AvailabilityZone("az-c".to_string()),
3062 1 : ],
3063 1 : );
3064 1 :
3065 1 : let mut scheduler = Scheduler::new(nodes.values());
3066 1 : let mut context = ScheduleContext::default();
3067 1 :
3068 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Secondary);
3069 1 : tenant_shard.intent.preferred_az_id = Some(az_a.clone());
3070 1 : tenant_shard
3071 1 : .schedule(&mut scheduler, &mut context)
3072 1 : .expect("we have enough nodes, scheduling should work");
3073 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
3074 1 : assert!(tenant_shard.intent.attached.is_none());
3075 :
3076 : // Should have scheduled into the preferred AZ
3077 1 : assert_eq!(
3078 1 : scheduler
3079 1 : .get_node_az(&tenant_shard.intent.secondary[0])
3080 1 : .as_ref(),
3081 1 : tenant_shard.preferred_az()
3082 1 : );
3083 :
3084 : // Optimizer should agree
3085 1 : assert_eq!(
3086 1 : tenant_shard.optimize_attachment(&mut scheduler, &context),
3087 1 : None
3088 1 : );
3089 1 : assert_eq!(
3090 1 : tenant_shard.optimize_secondary(&mut scheduler, &context),
3091 1 : None
3092 1 : );
3093 :
3094 : // Switch to PlacementPolicy::Attached
3095 1 : tenant_shard.policy = PlacementPolicy::Attached(1);
3096 1 : tenant_shard
3097 1 : .schedule(&mut scheduler, &mut context)
3098 1 : .expect("we have enough nodes, scheduling should work");
3099 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
3100 1 : assert!(tenant_shard.intent.attached.is_some());
3101 : // Secondary should now be in non-preferred AZ
3102 1 : assert_ne!(
3103 1 : scheduler
3104 1 : .get_node_az(&tenant_shard.intent.secondary[0])
3105 1 : .as_ref(),
3106 1 : tenant_shard.preferred_az()
3107 1 : );
3108 : // Attached should be in preferred AZ
3109 1 : assert_eq!(
3110 1 : scheduler
3111 1 : .get_node_az(&tenant_shard.intent.attached.unwrap())
3112 1 : .as_ref(),
3113 1 : tenant_shard.preferred_az()
3114 1 : );
3115 :
3116 : // Optimizer should agree
3117 1 : assert_eq!(
3118 1 : tenant_shard.optimize_attachment(&mut scheduler, &context),
3119 1 : None
3120 1 : );
3121 1 : assert_eq!(
3122 1 : tenant_shard.optimize_secondary(&mut scheduler, &context),
3123 1 : None
3124 1 : );
3125 :
3126 : // Switch back to PlacementPolicy::Secondary
3127 1 : tenant_shard.policy = PlacementPolicy::Secondary;
3128 1 : tenant_shard
3129 1 : .schedule(&mut scheduler, &mut context)
3130 1 : .expect("we have enough nodes, scheduling should work");
3131 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
3132 1 : assert!(tenant_shard.intent.attached.is_none());
3133 : // When we picked a location to keep, we should have kept the one in the preferred AZ
3134 1 : assert_eq!(
3135 1 : scheduler
3136 1 : .get_node_az(&tenant_shard.intent.secondary[0])
3137 1 : .as_ref(),
3138 1 : tenant_shard.preferred_az()
3139 1 : );
3140 :
3141 : // Optimizer should agree
3142 1 : assert_eq!(
3143 1 : tenant_shard.optimize_attachment(&mut scheduler, &context),
3144 1 : None
3145 1 : );
3146 1 : assert_eq!(
3147 1 : tenant_shard.optimize_secondary(&mut scheduler, &context),
3148 1 : None
3149 1 : );
3150 :
3151 1 : tenant_shard.intent.clear(&mut scheduler);
3152 1 :
3153 1 : Ok(())
3154 1 : }
3155 : }
|