Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : sync::Arc,
4 : time::Duration,
5 : };
6 :
7 : use crate::{
8 : metrics::{
9 : self, ReconcileCompleteLabelGroup, ReconcileLongRunningLabelGroup, ReconcileOutcome,
10 : },
11 : persistence::TenantShardPersistence,
12 : reconciler::{ReconcileUnits, ReconcilerConfig},
13 : scheduler::{
14 : AffinityScore, AttachedShardTag, NodeSchedulingScore, NodeSecondarySchedulingScore,
15 : RefCountUpdate, ScheduleContext, SecondaryShardTag, ShardTag,
16 : },
17 : service::ReconcileResultRequest,
18 : };
19 : use futures::future::{self, Either};
20 : use itertools::Itertools;
21 : use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy, ShardSchedulingPolicy};
22 : use pageserver_api::{
23 : models::{LocationConfig, LocationConfigMode, TenantConfig},
24 : shard::{ShardIdentity, TenantShardId},
25 : };
26 : use serde::{Deserialize, Serialize};
27 : use tokio::task::JoinHandle;
28 : use tokio_util::sync::CancellationToken;
29 : use tracing::{instrument, Instrument};
30 : use utils::{
31 : generation::Generation,
32 : id::NodeId,
33 : seqwait::{SeqWait, SeqWaitError},
34 : shard::ShardCount,
35 : sync::gate::GateGuard,
36 : };
37 :
38 : use crate::{
39 : compute_hook::ComputeHook,
40 : node::Node,
41 : persistence::{split_state::SplitState, Persistence},
42 : reconciler::{
43 : attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
44 : },
45 : scheduler::{ScheduleError, Scheduler},
46 : service, Sequence,
47 : };
48 :
49 : /// Serialization helper
50 0 : fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
51 0 : where
52 0 : S: serde::ser::Serializer,
53 0 : T: std::fmt::Display,
54 0 : {
55 0 : serializer.collect_str(
56 0 : &v.lock()
57 0 : .unwrap()
58 0 : .as_ref()
59 0 : .map(|e| format!("{e}"))
60 0 : .unwrap_or("".to_string()),
61 0 : )
62 0 : }
63 :
64 : /// In-memory state for a particular tenant shard.
65 : ///
66 : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
67 : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
68 0 : #[derive(Serialize)]
69 : pub(crate) struct TenantShard {
70 : pub(crate) tenant_shard_id: TenantShardId,
71 :
72 : pub(crate) shard: ShardIdentity,
73 :
74 : // Runtime only: sequence used to coordinate when updating this object while
75 : // with background reconcilers may be running. A reconciler runs to a particular
76 : // sequence.
77 : pub(crate) sequence: Sequence,
78 :
79 : // Latest generation number: next time we attach, increment this
80 : // and use the incremented number when attaching.
81 : //
82 : // None represents an incompletely onboarded tenant via the [`Service::location_config`]
83 : // API, where this tenant may only run in PlacementPolicy::Secondary.
84 : pub(crate) generation: Option<Generation>,
85 :
86 : // High level description of how the tenant should be set up. Provided
87 : // externally.
88 : pub(crate) policy: PlacementPolicy,
89 :
90 : // Low level description of exactly which pageservers should fulfil
91 : // which role. Generated by `Self::schedule`.
92 : pub(crate) intent: IntentState,
93 :
94 : // Low level description of how the tenant is configured on pageservers:
95 : // if this does not match `Self::intent` then the tenant needs reconciliation
96 : // with `Self::reconcile`.
97 : pub(crate) observed: ObservedState,
98 :
99 : // Tenant configuration, passed through opaquely to the pageserver. Identical
100 : // for all shards in a tenant.
101 : pub(crate) config: TenantConfig,
102 :
103 : /// If a reconcile task is currently in flight, it may be joined here (it is
104 : /// only safe to join if either the result has been received or the reconciler's
105 : /// cancellation token has been fired)
106 : #[serde(skip)]
107 : pub(crate) reconciler: Option<ReconcilerHandle>,
108 :
109 : /// If a tenant is being split, then all shards with that TenantId will have a
110 : /// SplitState set, this acts as a guard against other operations such as background
111 : /// reconciliation, and timeline creation.
112 : pub(crate) splitting: SplitState,
113 :
114 : /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
115 : /// is set. This flag is cleared when the tenant is popped off the delay queue.
116 : pub(crate) delayed_reconcile: bool,
117 :
118 : /// Optionally wait for reconciliation to complete up to a particular
119 : /// sequence number.
120 : #[serde(skip)]
121 : pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
122 :
123 : /// Indicates sequence number for which we have encountered an error reconciling. If
124 : /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
125 : /// and callers should stop waiting for `waiter` and propagate the error.
126 : #[serde(skip)]
127 : pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
128 :
129 : /// The most recent error from a reconcile on this tenant. This is a nested Arc
130 : /// because:
131 : /// - ReconcileWaiters need to Arc-clone the overall object to read it later
132 : /// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
133 : /// many waiters for one shard, and the underlying error types are not Clone.
134 : ///
135 : /// TODO: generalize to an array of recent events
136 : /// TOOD: use a ArcSwap instead of mutex for faster reads?
137 : #[serde(serialize_with = "read_last_error")]
138 : pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
139 :
140 : /// If we have a pending compute notification that for some reason we weren't able to send,
141 : /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
142 : /// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
143 : /// of state that we publish externally in an eventually consistent way.
144 : pub(crate) pending_compute_notification: bool,
145 :
146 : // Support/debug tool: if something is going wrong or flapping with scheduling, this may
147 : // be set to a non-active state to avoid making changes while the issue is fixed.
148 : scheduling_policy: ShardSchedulingPolicy,
149 : }
150 :
151 : #[derive(Clone, Debug, Serialize)]
152 : pub(crate) struct IntentState {
153 : attached: Option<NodeId>,
154 : secondary: Vec<NodeId>,
155 :
156 : // We should attempt to schedule this shard in the provided AZ to
157 : // decrease chances of cross-AZ compute.
158 : preferred_az_id: Option<AvailabilityZone>,
159 : }
160 :
161 : impl IntentState {
162 12856 : pub(crate) fn new(preferred_az_id: Option<AvailabilityZone>) -> Self {
163 12856 : Self {
164 12856 : attached: None,
165 12856 : secondary: vec![],
166 12856 : preferred_az_id,
167 12856 : }
168 12856 : }
169 0 : pub(crate) fn single(
170 0 : scheduler: &mut Scheduler,
171 0 : node_id: Option<NodeId>,
172 0 : preferred_az_id: Option<AvailabilityZone>,
173 0 : ) -> Self {
174 0 : if let Some(node_id) = node_id {
175 0 : scheduler.update_node_ref_counts(
176 0 : node_id,
177 0 : preferred_az_id.as_ref(),
178 0 : RefCountUpdate::Attach,
179 0 : );
180 0 : }
181 0 : Self {
182 0 : attached: node_id,
183 0 : secondary: vec![],
184 0 : preferred_az_id,
185 0 : }
186 0 : }
187 :
188 12856 : pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
189 12856 : if self.attached != new_attached {
190 12856 : if let Some(old_attached) = self.attached.take() {
191 0 : scheduler.update_node_ref_counts(
192 0 : old_attached,
193 0 : self.preferred_az_id.as_ref(),
194 0 : RefCountUpdate::Detach,
195 0 : );
196 12856 : }
197 12856 : if let Some(new_attached) = &new_attached {
198 12856 : scheduler.update_node_ref_counts(
199 12856 : *new_attached,
200 12856 : self.preferred_az_id.as_ref(),
201 12856 : RefCountUpdate::Attach,
202 12856 : );
203 12856 : }
204 12856 : self.attached = new_attached;
205 0 : }
206 :
207 12856 : if let Some(new_attached) = &new_attached {
208 12856 : assert!(!self.secondary.contains(new_attached));
209 0 : }
210 12856 : }
211 :
212 : /// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
213 : /// secondary to attached while maintaining the scheduler's reference counts.
214 5 : pub(crate) fn promote_attached(
215 5 : &mut self,
216 5 : scheduler: &mut Scheduler,
217 5 : promote_secondary: NodeId,
218 5 : ) {
219 5 : // If we call this with a node that isn't in secondary, it would cause incorrect
220 5 : // scheduler reference counting, since we assume the node is already referenced as a secondary.
221 5 : debug_assert!(self.secondary.contains(&promote_secondary));
222 :
223 12 : self.secondary.retain(|n| n != &promote_secondary);
224 5 :
225 5 : let demoted = self.attached;
226 5 : self.attached = Some(promote_secondary);
227 5 :
228 5 : scheduler.update_node_ref_counts(
229 5 : promote_secondary,
230 5 : self.preferred_az_id.as_ref(),
231 5 : RefCountUpdate::PromoteSecondary,
232 5 : );
233 5 : if let Some(demoted) = demoted {
234 0 : scheduler.update_node_ref_counts(
235 0 : demoted,
236 0 : self.preferred_az_id.as_ref(),
237 0 : RefCountUpdate::DemoteAttached,
238 0 : );
239 5 : }
240 5 : }
241 :
242 12840 : pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
243 12840 : assert!(!self.secondary.contains(&new_secondary));
244 12840 : assert!(self.attached != Some(new_secondary));
245 12840 : scheduler.update_node_ref_counts(
246 12840 : new_secondary,
247 12840 : self.preferred_az_id.as_ref(),
248 12840 : RefCountUpdate::AddSecondary,
249 12840 : );
250 12840 : self.secondary.push(new_secondary);
251 12840 : }
252 :
253 : /// It is legal to call this with a node that is not currently a secondary: that is a no-op
254 4 : pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
255 5 : let index = self.secondary.iter().position(|n| *n == node_id);
256 4 : if let Some(index) = index {
257 4 : scheduler.update_node_ref_counts(
258 4 : node_id,
259 4 : self.preferred_az_id.as_ref(),
260 4 : RefCountUpdate::RemoveSecondary,
261 4 : );
262 4 : self.secondary.remove(index);
263 4 : }
264 4 : }
265 :
266 12855 : pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
267 12855 : for secondary in self.secondary.drain(..) {
268 12836 : scheduler.update_node_ref_counts(
269 12836 : secondary,
270 12836 : self.preferred_az_id.as_ref(),
271 12836 : RefCountUpdate::RemoveSecondary,
272 12836 : );
273 12836 : }
274 12855 : }
275 :
276 : /// Remove the last secondary node from the list of secondaries
277 0 : pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
278 0 : if let Some(node_id) = self.secondary.pop() {
279 0 : scheduler.update_node_ref_counts(
280 0 : node_id,
281 0 : self.preferred_az_id.as_ref(),
282 0 : RefCountUpdate::RemoveSecondary,
283 0 : );
284 0 : }
285 0 : }
286 :
287 12855 : pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
288 12855 : if let Some(old_attached) = self.attached.take() {
289 12855 : scheduler.update_node_ref_counts(
290 12855 : old_attached,
291 12855 : self.preferred_az_id.as_ref(),
292 12855 : RefCountUpdate::Detach,
293 12855 : );
294 12855 : }
295 :
296 12855 : self.clear_secondary(scheduler);
297 12855 : }
298 :
299 12892 : pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
300 12892 : let mut result = Vec::new();
301 12892 : if let Some(p) = self.attached {
302 12890 : result.push(p)
303 2 : }
304 :
305 12892 : result.extend(self.secondary.iter().copied());
306 12892 :
307 12892 : result
308 12892 : }
309 :
310 12594 : pub(crate) fn get_attached(&self) -> &Option<NodeId> {
311 12594 : &self.attached
312 12594 : }
313 :
314 12656 : pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
315 12656 : &self.secondary
316 12656 : }
317 :
318 : /// If the node is in use as the attached location, demote it into
319 : /// the list of secondary locations. This is used when a node goes offline,
320 : /// and we want to use a different node for attachment, but not permanently
321 : /// forget the location on the offline node.
322 : ///
323 : /// Returns true if a change was made
324 5 : pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
325 5 : if self.attached == Some(node_id) {
326 5 : self.attached = None;
327 5 : self.secondary.push(node_id);
328 5 : scheduler.update_node_ref_counts(
329 5 : node_id,
330 5 : self.preferred_az_id.as_ref(),
331 5 : RefCountUpdate::DemoteAttached,
332 5 : );
333 5 : true
334 : } else {
335 0 : false
336 : }
337 5 : }
338 : }
339 :
340 : impl Drop for IntentState {
341 12856 : fn drop(&mut self) {
342 12856 : // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
343 12856 : // We do not check this while panicking, to avoid polluting unit test failures or
344 12856 : // other assertions with this assertion's output. It's still wrong to leak these,
345 12856 : // but if we already have a panic then we don't need to independently flag this case.
346 12856 : if !(std::thread::panicking()) {
347 12856 : debug_assert!(self.attached.is_none() && self.secondary.is_empty());
348 0 : }
349 12855 : }
350 : }
351 :
352 0 : #[derive(Default, Clone, Serialize, Deserialize, Debug)]
353 : pub(crate) struct ObservedState {
354 : pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
355 : }
356 :
357 : /// Our latest knowledge of how this tenant is configured in the outside world.
358 : ///
359 : /// Meaning:
360 : /// * No instance of this type exists for a node: we are certain that we have nothing configured on that
361 : /// node for this shard.
362 : /// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
363 : /// what it is (e.g. we failed partway through configuring it)
364 : /// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
365 : /// and that configuration will still be present unless something external interfered.
366 0 : #[derive(Clone, Serialize, Deserialize, Debug)]
367 : pub(crate) struct ObservedStateLocation {
368 : /// If None, it means we do not know the status of this shard's location on this node, but
369 : /// we know that we might have some state on this node.
370 : pub(crate) conf: Option<LocationConfig>,
371 : }
372 :
373 : pub(crate) struct ReconcilerWaiter {
374 : // For observability purposes, remember the ID of the shard we're
375 : // waiting for.
376 : pub(crate) tenant_shard_id: TenantShardId,
377 :
378 : seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
379 : error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
380 : error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
381 : seq: Sequence,
382 : }
383 :
384 : pub(crate) enum ReconcilerStatus {
385 : Done,
386 : Failed,
387 : InProgress,
388 : }
389 :
390 : #[derive(thiserror::Error, Debug)]
391 : pub(crate) enum ReconcileWaitError {
392 : #[error("Timeout waiting for shard {0}")]
393 : Timeout(TenantShardId),
394 : #[error("shutting down")]
395 : Shutdown,
396 : #[error("Reconcile error on shard {0}: {1}")]
397 : Failed(TenantShardId, Arc<ReconcileError>),
398 : }
399 :
400 : #[derive(Eq, PartialEq, Debug, Clone)]
401 : pub(crate) struct ReplaceSecondary {
402 : old_node_id: NodeId,
403 : new_node_id: NodeId,
404 : }
405 :
406 : #[derive(Eq, PartialEq, Debug, Clone)]
407 : pub(crate) struct MigrateAttachment {
408 : pub(crate) old_attached_node_id: NodeId,
409 : pub(crate) new_attached_node_id: NodeId,
410 : }
411 :
412 : #[derive(Eq, PartialEq, Debug, Clone)]
413 : pub(crate) enum ScheduleOptimizationAction {
414 : // Replace one of our secondary locations with a different node
415 : ReplaceSecondary(ReplaceSecondary),
416 : // Migrate attachment to an existing secondary location
417 : MigrateAttachment(MigrateAttachment),
418 : // Create a secondary location, with the intent of later migrating to it
419 : CreateSecondary(NodeId),
420 : // Remove a secondary location that we previously created to facilitate a migration
421 : RemoveSecondary(NodeId),
422 : }
423 :
424 : #[derive(Eq, PartialEq, Debug, Clone)]
425 : pub(crate) struct ScheduleOptimization {
426 : // What was the reconcile sequence when we generated this optimization? The optimization
427 : // should only be applied if the shard's sequence is still at this value, in case other changes
428 : // happened between planning the optimization and applying it.
429 : sequence: Sequence,
430 :
431 : pub(crate) action: ScheduleOptimizationAction,
432 : }
433 :
434 : impl ReconcilerWaiter {
435 0 : pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
436 0 : tokio::select! {
437 0 : result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
438 0 : result.map_err(|e| match e {
439 0 : SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
440 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
441 0 : })?;
442 : },
443 0 : result = self.error_seq_wait.wait_for(self.seq) => {
444 0 : result.map_err(|e| match e {
445 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
446 0 : SeqWaitError::Timeout => unreachable!()
447 0 : })?;
448 :
449 0 : return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
450 0 : self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
451 : }
452 : }
453 :
454 0 : Ok(())
455 0 : }
456 :
457 0 : pub(crate) fn get_status(&self) -> ReconcilerStatus {
458 0 : if self.seq_wait.would_wait_for(self.seq).is_ok() {
459 0 : ReconcilerStatus::Done
460 0 : } else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
461 0 : ReconcilerStatus::Failed
462 : } else {
463 0 : ReconcilerStatus::InProgress
464 : }
465 0 : }
466 : }
467 :
468 : /// Having spawned a reconciler task, the tenant shard's state will carry enough
469 : /// information to optionally cancel & await it later.
470 : pub(crate) struct ReconcilerHandle {
471 : sequence: Sequence,
472 : handle: JoinHandle<()>,
473 : cancel: CancellationToken,
474 : }
475 :
476 : pub(crate) enum ReconcileNeeded {
477 : /// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
478 : /// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
479 : No,
480 : /// shard has a reconciler running, and its intent hasn't changed since that one was
481 : /// spawned: wait for the existing reconciler rather than spawning a new one.
482 : WaitExisting(ReconcilerWaiter),
483 : /// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
484 : Yes,
485 : }
486 :
487 : /// Pending modification to the observed state of a tenant shard.
488 : /// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
489 : pub(crate) enum ObservedStateDelta {
490 : Upsert(Box<(NodeId, ObservedStateLocation)>),
491 : Delete(NodeId),
492 : }
493 :
494 : impl ObservedStateDelta {
495 0 : pub(crate) fn node_id(&self) -> &NodeId {
496 0 : match self {
497 0 : Self::Upsert(up) => &up.0,
498 0 : Self::Delete(nid) => nid,
499 : }
500 0 : }
501 : }
502 :
503 : /// When a reconcile task completes, it sends this result object
504 : /// to be applied to the primary TenantShard.
505 : pub(crate) struct ReconcileResult {
506 : pub(crate) sequence: Sequence,
507 : /// On errors, `observed` should be treated as an incompleted description
508 : /// of state (i.e. any nodes present in the result should override nodes
509 : /// present in the parent tenant state, but any unmentioned nodes should
510 : /// not be removed from parent tenant state)
511 : pub(crate) result: Result<(), ReconcileError>,
512 :
513 : pub(crate) tenant_shard_id: TenantShardId,
514 : pub(crate) generation: Option<Generation>,
515 : pub(crate) observed_deltas: Vec<ObservedStateDelta>,
516 :
517 : /// Set [`TenantShard::pending_compute_notification`] from this flag
518 : pub(crate) pending_compute_notification: bool,
519 : }
520 :
521 : impl ObservedState {
522 0 : pub(crate) fn new() -> Self {
523 0 : Self {
524 0 : locations: HashMap::new(),
525 0 : }
526 0 : }
527 :
528 0 : pub(crate) fn is_empty(&self) -> bool {
529 0 : self.locations.is_empty()
530 0 : }
531 : }
532 :
533 : impl TenantShard {
534 12839 : pub(crate) fn new(
535 12839 : tenant_shard_id: TenantShardId,
536 12839 : shard: ShardIdentity,
537 12839 : policy: PlacementPolicy,
538 12839 : preferred_az_id: Option<AvailabilityZone>,
539 12839 : ) -> Self {
540 12839 : metrics::METRICS_REGISTRY
541 12839 : .metrics_group
542 12839 : .storage_controller_tenant_shards
543 12839 : .inc();
544 12839 :
545 12839 : Self {
546 12839 : tenant_shard_id,
547 12839 : policy,
548 12839 : intent: IntentState::new(preferred_az_id),
549 12839 : generation: Some(Generation::new(0)),
550 12839 : shard,
551 12839 : observed: ObservedState::default(),
552 12839 : config: TenantConfig::default(),
553 12839 : reconciler: None,
554 12839 : splitting: SplitState::Idle,
555 12839 : sequence: Sequence(1),
556 12839 : delayed_reconcile: false,
557 12839 : waiter: Arc::new(SeqWait::new(Sequence(0))),
558 12839 : error_waiter: Arc::new(SeqWait::new(Sequence(0))),
559 12839 : last_error: Arc::default(),
560 12839 : pending_compute_notification: false,
561 12839 : scheduling_policy: ShardSchedulingPolicy::default(),
562 12839 : }
563 12839 : }
564 :
565 : /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
566 : /// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
567 : /// to get an intent state that complies with placement policy. The overall goal is to do scheduling
568 : /// in a way that makes use of any configured locations that already exist in the outside world.
569 1 : pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
570 1 : // Choose an attached location by filtering observed locations, and then sorting to get the highest
571 1 : // generation
572 1 : let mut attached_locs = self
573 1 : .observed
574 1 : .locations
575 1 : .iter()
576 2 : .filter_map(|(node_id, l)| {
577 2 : if let Some(conf) = &l.conf {
578 2 : if conf.mode == LocationConfigMode::AttachedMulti
579 1 : || conf.mode == LocationConfigMode::AttachedSingle
580 1 : || conf.mode == LocationConfigMode::AttachedStale
581 : {
582 2 : Some((node_id, conf.generation))
583 : } else {
584 0 : None
585 : }
586 : } else {
587 0 : None
588 : }
589 2 : })
590 1 : .collect::<Vec<_>>();
591 1 :
592 2 : attached_locs.sort_by_key(|i| i.1);
593 1 : if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
594 1 : self.intent.set_attached(scheduler, Some(*node_id));
595 1 : }
596 :
597 : // All remaining observed locations generate secondary intents. This includes None
598 : // observations, as these may well have some local content on disk that is usable (this
599 : // is an edge case that might occur if we restarted during a migration or other change)
600 : //
601 : // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
602 : // will take care of promoting one of these secondaries to be attached.
603 2 : self.observed.locations.keys().for_each(|node_id| {
604 2 : if Some(*node_id) != self.intent.attached {
605 1 : self.intent.push_secondary(scheduler, *node_id);
606 1 : }
607 2 : });
608 1 : }
609 :
610 : /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
611 : /// attached pageserver for a shard.
612 : ///
613 : /// Returns whether we modified it, and the NodeId selected.
614 12829 : fn schedule_attached(
615 12829 : &mut self,
616 12829 : scheduler: &mut Scheduler,
617 12829 : context: &ScheduleContext,
618 12829 : ) -> Result<(bool, NodeId), ScheduleError> {
619 : // No work to do if we already have an attached tenant
620 12829 : if let Some(node_id) = self.intent.attached {
621 0 : return Ok((false, node_id));
622 12829 : }
623 :
624 12829 : if let Some(promote_secondary) = self.preferred_secondary(scheduler) {
625 : // Promote a secondary
626 1 : tracing::debug!("Promoted secondary {} to attached", promote_secondary);
627 1 : self.intent.promote_attached(scheduler, promote_secondary);
628 1 : Ok((true, promote_secondary))
629 : } else {
630 : // Pick a fresh node: either we had no secondaries or none were schedulable
631 12828 : let node_id = scheduler.schedule_shard::<AttachedShardTag>(
632 12828 : &self.intent.secondary,
633 12828 : &self.intent.preferred_az_id,
634 12828 : context,
635 12828 : )?;
636 12828 : tracing::debug!("Selected {} as attached", node_id);
637 12828 : self.intent.set_attached(scheduler, Some(node_id));
638 12828 : Ok((true, node_id))
639 : }
640 12829 : }
641 :
642 : #[instrument(skip_all, fields(
643 : tenant_id=%self.tenant_shard_id.tenant_id,
644 : shard_id=%self.tenant_shard_id.shard_slug(),
645 : sequence=%self.sequence
646 : ))]
647 : pub(crate) fn schedule(
648 : &mut self,
649 : scheduler: &mut Scheduler,
650 : context: &mut ScheduleContext,
651 : ) -> Result<(), ScheduleError> {
652 : let r = self.do_schedule(scheduler, context);
653 :
654 : context.avoid(&self.intent.all_pageservers());
655 :
656 : r
657 : }
658 :
659 12830 : pub(crate) fn do_schedule(
660 12830 : &mut self,
661 12830 : scheduler: &mut Scheduler,
662 12830 : context: &ScheduleContext,
663 12830 : ) -> Result<(), ScheduleError> {
664 12830 : // TODO: before scheduling new nodes, check if any existing content in
665 12830 : // self.intent refers to pageservers that are offline, and pick other
666 12830 : // pageservers if so.
667 12830 :
668 12830 : // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
669 12830 : // change their attach location.
670 12830 :
671 12830 : match self.scheduling_policy {
672 12829 : ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
673 : ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
674 : // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
675 1 : tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
676 0 : "Scheduling is disabled by policy {:?}", self.scheduling_policy);
677 1 : return Ok(());
678 : }
679 : }
680 :
681 : // Build the set of pageservers already in use by this tenant, to avoid scheduling
682 : // more work on the same pageservers we're already using.
683 12829 : let mut modified = false;
684 :
685 : // Add/remove nodes to fulfil policy
686 : use PlacementPolicy::*;
687 12829 : match self.policy {
688 12829 : Attached(secondary_count) => {
689 : // Should have exactly one attached, and at least N secondaries
690 12829 : let (modified_attached, attached_node_id) =
691 12829 : self.schedule_attached(scheduler, context)?;
692 12829 : modified |= modified_attached;
693 12829 :
694 12829 : let mut used_pageservers = vec![attached_node_id];
695 25657 : while self.intent.secondary.len() < secondary_count {
696 12828 : let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
697 12828 : &used_pageservers,
698 12828 : &self.intent.preferred_az_id,
699 12828 : context,
700 12828 : )?;
701 12828 : self.intent.push_secondary(scheduler, node_id);
702 12828 : used_pageservers.push(node_id);
703 12828 : modified = true;
704 : }
705 : }
706 : Secondary => {
707 0 : if let Some(node_id) = self.intent.get_attached() {
708 0 : // Populate secondary by demoting the attached node
709 0 : self.intent.demote_attached(scheduler, *node_id);
710 0 : modified = true;
711 0 : } else if self.intent.secondary.is_empty() {
712 : // Populate secondary by scheduling a fresh node
713 0 : let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
714 0 : &[],
715 0 : &self.intent.preferred_az_id,
716 0 : context,
717 0 : )?;
718 0 : self.intent.push_secondary(scheduler, node_id);
719 0 : modified = true;
720 0 : }
721 0 : while self.intent.secondary.len() > 1 {
722 0 : // We have no particular preference for one secondary location over another: just
723 0 : // arbitrarily drop from the end
724 0 : self.intent.pop_secondary(scheduler);
725 0 : modified = true;
726 0 : }
727 : }
728 : Detached => {
729 : // Never add locations in this mode
730 0 : if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
731 0 : self.intent.clear(scheduler);
732 0 : modified = true;
733 0 : }
734 : }
735 : }
736 :
737 12829 : if modified {
738 12829 : self.sequence.0 += 1;
739 12829 : }
740 :
741 12829 : Ok(())
742 12830 : }
743 :
744 : /// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
745 : /// if the swap is not possible and leaves the intent state in its original state.
746 : ///
747 : /// Arguments:
748 : /// `attached_to`: the currently attached location matching the intent state (may be None if the
749 : /// shard is not attached)
750 : /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
751 : /// the scheduler to recommend a node
752 0 : pub(crate) fn reschedule_to_secondary(
753 0 : &mut self,
754 0 : promote_to: Option<NodeId>,
755 0 : scheduler: &mut Scheduler,
756 0 : ) -> Result<(), ScheduleError> {
757 0 : let promote_to = match promote_to {
758 0 : Some(node) => node,
759 0 : None => match self.preferred_secondary(scheduler) {
760 0 : Some(node) => node,
761 : None => {
762 0 : return Err(ScheduleError::ImpossibleConstraint);
763 : }
764 : },
765 : };
766 :
767 0 : assert!(self.intent.get_secondary().contains(&promote_to));
768 :
769 0 : if let Some(node) = self.intent.get_attached() {
770 0 : let demoted = self.intent.demote_attached(scheduler, *node);
771 0 : if !demoted {
772 0 : return Err(ScheduleError::ImpossibleConstraint);
773 0 : }
774 0 : }
775 :
776 0 : self.intent.promote_attached(scheduler, promote_to);
777 0 :
778 0 : // Increment the sequence number for the edge case where a
779 0 : // reconciler is already running to avoid waiting on the
780 0 : // current reconcile instead of spawning a new one.
781 0 : self.sequence = self.sequence.next();
782 0 :
783 0 : Ok(())
784 0 : }
785 :
786 : /// Returns None if the current location's score is unavailable, i.e. cannot draw a conclusion
787 53 : fn is_better_location<T: ShardTag>(
788 53 : &self,
789 53 : scheduler: &mut Scheduler,
790 53 : schedule_context: &ScheduleContext,
791 53 : current: NodeId,
792 53 : candidate: NodeId,
793 53 : ) -> Option<bool> {
794 53 : let Some(candidate_score) = scheduler.compute_node_score::<T::Score>(
795 53 : candidate,
796 53 : &self.intent.preferred_az_id,
797 53 : schedule_context,
798 53 : ) else {
799 : // The candidate node is unavailable for scheduling or otherwise couldn't get a score
800 0 : return None;
801 : };
802 :
803 53 : match scheduler.compute_node_score::<T::Score>(
804 53 : current,
805 53 : &self.intent.preferred_az_id,
806 53 : schedule_context,
807 53 : ) {
808 53 : Some(current_score) => {
809 53 : // Ignore utilization components when comparing scores: we don't want to migrate
810 53 : // because of transient load variations, it risks making the system thrash, and
811 53 : // migrating for utilization requires a separate high level view of the system to
812 53 : // e.g. prioritize moving larger or smaller tenants, rather than arbitrarily
813 53 : // moving things around in the order that we hit this function.
814 53 : let candidate_score = candidate_score.for_optimization();
815 53 : let current_score = current_score.for_optimization();
816 53 :
817 53 : if candidate_score < current_score {
818 8 : tracing::info!("Found a lower scoring location! {candidate} is better than {current} ({candidate_score:?} is better than {current_score:?})");
819 8 : Some(true)
820 : } else {
821 : // The candidate node is no better than our current location, so don't migrate
822 45 : tracing::debug!(
823 0 : "Candidate node {candidate} is no better than our current location {current} (candidate {candidate_score:?} vs current {current_score:?})",
824 : );
825 45 : Some(false)
826 : }
827 : }
828 : None => {
829 : // The current node is unavailable for scheduling, so we can't make any sensible
830 : // decisions about optimisation. This should be a transient state -- if the node
831 : // is offline then it will get evacuated, if is blocked by a scheduling mode
832 : // then we will respect that mode by doing nothing.
833 0 : tracing::debug!("Current node {current} is unavailable for scheduling");
834 0 : None
835 : }
836 : }
837 53 : }
838 :
839 40 : fn find_better_location<T: ShardTag>(
840 40 : &self,
841 40 : scheduler: &mut Scheduler,
842 40 : schedule_context: &ScheduleContext,
843 40 : current: NodeId,
844 40 : hard_exclude: &[NodeId],
845 40 : ) -> Option<NodeId> {
846 : // Look for a lower-scoring location to attach to
847 40 : let Ok(candidate_node) = scheduler.schedule_shard::<T>(
848 40 : hard_exclude,
849 40 : &self.intent.preferred_az_id,
850 40 : schedule_context,
851 40 : ) else {
852 : // A scheduling error means we have no possible candidate replacements
853 0 : tracing::debug!("No candidate node found");
854 0 : return None;
855 : };
856 :
857 40 : if candidate_node == current {
858 : // We're already at the best possible location, so don't migrate
859 20 : tracing::debug!("Candidate node {candidate_node} is already in use");
860 20 : return None;
861 20 : }
862 20 :
863 20 : self.is_better_location::<T>(scheduler, schedule_context, current, candidate_node)
864 20 : .and_then(|better| if better { Some(candidate_node) } else { None })
865 40 : }
866 :
867 : /// This function is an optimization, used to avoid doing large numbers of scheduling operations
868 : /// when looking for optimizations. This function uses knowledge of how scores work to do some
869 : /// fast checks for whether it may to be possible to improve a score.
870 : ///
871 : /// If we return true, it only means that optimization _might_ be possible, not that it necessarily is. If we
872 : /// return no, it definitely means that calling [`Self::optimize_attachment`] or [`Self::optimize_secondary`] would do no
873 : /// work.
874 3 : pub(crate) fn maybe_optimizable(
875 3 : &self,
876 3 : scheduler: &mut Scheduler,
877 3 : schedule_context: &ScheduleContext,
878 3 : ) -> bool {
879 3 : // Sharded tenant: check if any locations have a nonzero affinity score
880 3 : if self.shard.count >= ShardCount(1) {
881 3 : let schedule_context = schedule_context.project_detach(self);
882 5 : for node in self.intent.all_pageservers() {
883 5 : if let Some(af) = schedule_context.nodes.get(&node) {
884 5 : if *af > AffinityScore(0) {
885 3 : return true;
886 2 : }
887 0 : }
888 : }
889 0 : }
890 :
891 : // Attached tenant: check if the attachment is outside the preferred AZ
892 0 : if let PlacementPolicy::Attached(_) = self.policy {
893 0 : if let Some(attached) = self.intent.get_attached() {
894 0 : if scheduler.get_node_az(attached) != self.intent.preferred_az_id {
895 0 : return true;
896 0 : }
897 0 : }
898 0 : }
899 :
900 : // Tenant with secondary locations: check if any are within the preferred AZ
901 0 : for secondary in self.intent.get_secondary() {
902 0 : if scheduler.get_node_az(secondary) == self.intent.preferred_az_id {
903 0 : return true;
904 0 : }
905 : }
906 :
907 : // Does the tenant have excess secondaries?
908 0 : if self.intent.get_secondary().len() > self.policy.want_secondaries() {
909 0 : return true;
910 0 : }
911 0 :
912 0 : // Fall through: no optimizations possible
913 0 : false
914 3 : }
915 :
916 : /// Optimize attachments: if a shard has a secondary location that is preferable to
917 : /// its primary location based on soft constraints, switch that secondary location
918 : /// to be attached.
919 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
920 : pub(crate) fn optimize_attachment(
921 : &self,
922 : scheduler: &mut Scheduler,
923 : schedule_context: &ScheduleContext,
924 : ) -> Option<ScheduleOptimization> {
925 : let attached = (*self.intent.get_attached())?;
926 :
927 : let schedule_context = schedule_context.project_detach(self);
928 :
929 : // If we already have a secondary that is higher-scoring than out current location,
930 : // then simply migrate to it.
931 : for secondary in self.intent.get_secondary() {
932 : if let Some(true) = self.is_better_location::<AttachedShardTag>(
933 : scheduler,
934 : &schedule_context,
935 : attached,
936 : *secondary,
937 : ) {
938 : return Some(ScheduleOptimization {
939 : sequence: self.sequence,
940 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
941 : old_attached_node_id: attached,
942 : new_attached_node_id: *secondary,
943 : }),
944 : });
945 : }
946 : }
947 :
948 : // Given that none of our current secondaries is a better location than our current
949 : // attached location (checked above), we may trim any secondaries that are not needed
950 : // for the placement policy.
951 : if self.intent.get_secondary().len() > self.policy.want_secondaries() {
952 : // This code path cleans up extra secondaries after migrating, and/or
953 : // trims extra secondaries after a PlacementPolicy::Attached(N) was
954 : // modified to decrease N.
955 :
956 : let secondary_scores = self
957 : .intent
958 : .get_secondary()
959 : .iter()
960 5 : .map(|node_id| {
961 5 : (
962 5 : *node_id,
963 5 : scheduler.compute_node_score::<NodeSecondarySchedulingScore>(
964 5 : *node_id,
965 5 : &self.intent.preferred_az_id,
966 5 : &schedule_context,
967 5 : ),
968 5 : )
969 5 : })
970 : .collect::<Vec<_>>();
971 :
972 5 : if secondary_scores.iter().any(|score| score.1.is_none()) {
973 : // Don't have full list of scores, so can't make a good decision about which to drop unless
974 : // there is an obvious one in the wrong AZ
975 : for secondary in self.intent.get_secondary() {
976 : if scheduler.get_node_az(secondary) == self.intent.preferred_az_id {
977 : return Some(ScheduleOptimization {
978 : sequence: self.sequence,
979 : action: ScheduleOptimizationAction::RemoveSecondary(*secondary),
980 : });
981 : }
982 : }
983 :
984 : // Fall through: we didn't identify one to remove. This ought to be rare.
985 : tracing::warn!("Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)",
986 : self.intent.get_secondary()
987 : );
988 : } else {
989 : let victim = secondary_scores
990 : .iter()
991 5 : .max_by_key(|score| score.1.unwrap())
992 : .unwrap()
993 : .0;
994 : return Some(ScheduleOptimization {
995 : sequence: self.sequence,
996 : action: ScheduleOptimizationAction::RemoveSecondary(victim),
997 : });
998 : }
999 : }
1000 :
1001 : let replacement = self.find_better_location::<AttachedShardTag>(
1002 : scheduler,
1003 : &schedule_context,
1004 : attached,
1005 : &[], // Don't exclude secondaries: our preferred attachment location may be a secondary
1006 : );
1007 :
1008 : // We have found a candidate and confirmed that its score is preferable
1009 : // to our current location. See if we have a secondary location in the preferred location already: if not,
1010 : // then create one.
1011 : if let Some(replacement) = replacement {
1012 : // If we are currently in non-preferred AZ, then the scheduler might suggest a location that is better, but still
1013 : // not in our preferred AZ. Migration has a cost in resources an impact to the workload, so we want to avoid doing
1014 : // multiple hops where we might go to some other AZ before eventually finding a suitable location in our preferred
1015 : // AZ: skip this optimization if it is not in our final, preferred AZ.
1016 : //
1017 : // This should be a transient state, there should always be capacity eventually in our preferred AZ (even if nodes
1018 : // there are too overloaded for scheduler to suggest them, more should be provisioned eventually).
1019 : if self.intent.preferred_az_id.is_some()
1020 : && scheduler.get_node_az(&replacement) != self.intent.preferred_az_id
1021 : {
1022 : tracing::debug!(
1023 : "Candidate node {replacement} is not in preferred AZ {:?}",
1024 : self.intent.preferred_az_id
1025 : );
1026 :
1027 : // This should only happen if our current location is not in the preferred AZ, otherwise
1028 : // [`Self::find_better_location`]` should have rejected any other location outside the preferred Az, because
1029 : // AZ is the highest priority part of NodeAttachmentSchedulingScore.
1030 : debug_assert!(scheduler.get_node_az(&attached) != self.intent.preferred_az_id);
1031 :
1032 : return None;
1033 : }
1034 :
1035 : if !self.intent.get_secondary().contains(&replacement) {
1036 : Some(ScheduleOptimization {
1037 : sequence: self.sequence,
1038 : action: ScheduleOptimizationAction::CreateSecondary(replacement),
1039 : })
1040 : } else {
1041 : // We already have a secondary in the preferred location, let's try migrating to it. Our caller
1042 : // will check the warmth of the destination before deciding whether to really execute this.
1043 : Some(ScheduleOptimization {
1044 : sequence: self.sequence,
1045 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1046 : old_attached_node_id: attached,
1047 : new_attached_node_id: replacement,
1048 : }),
1049 : })
1050 : }
1051 : } else {
1052 : // We didn't find somewhere we'd rather be, and we don't have any excess secondaries
1053 : // to clean up: no action required.
1054 : None
1055 : }
1056 : }
1057 :
1058 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1059 : pub(crate) fn optimize_secondary(
1060 : &self,
1061 : scheduler: &mut Scheduler,
1062 : schedule_context: &ScheduleContext,
1063 : ) -> Option<ScheduleOptimization> {
1064 : if self.intent.get_secondary().len() > self.policy.want_secondaries() {
1065 : // We have extra secondaries, perhaps to facilitate a migration of the attached location:
1066 : // do nothing, it is up to [`Self::optimize_attachment`] to clean them up. When that's done,
1067 : // and we are called again, we will proceed.
1068 : tracing::debug!("Too many secondaries: skipping");
1069 : return None;
1070 : }
1071 :
1072 : let schedule_context = schedule_context.project_detach(self);
1073 :
1074 : for secondary in self.intent.get_secondary() {
1075 : // Make sure we don't try to migrate a secondary to our attached location: this case happens
1076 : // easily in environments without multiple AZs.
1077 : let exclude = match self.intent.attached {
1078 : Some(attached) => vec![attached],
1079 : None => vec![],
1080 : };
1081 :
1082 : let replacement = self.find_better_location::<SecondaryShardTag>(
1083 : scheduler,
1084 : &schedule_context,
1085 : *secondary,
1086 : &exclude,
1087 : );
1088 : assert!(replacement != Some(*secondary));
1089 : if let Some(replacement) = replacement {
1090 : // We have found a candidate and confirmed that its score is preferable
1091 : // to our current location. See if we have a secondary location in the preferred location already: if not,
1092 : // then create one.
1093 : return Some(ScheduleOptimization {
1094 : sequence: self.sequence,
1095 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
1096 : old_node_id: *secondary,
1097 : new_node_id: replacement,
1098 : }),
1099 : });
1100 : }
1101 : }
1102 :
1103 : None
1104 : }
1105 :
1106 : /// Return true if the optimization was really applied: it will not be applied if the optimization's
1107 : /// sequence is behind this tenant shard's
1108 11 : pub(crate) fn apply_optimization(
1109 11 : &mut self,
1110 11 : scheduler: &mut Scheduler,
1111 11 : optimization: ScheduleOptimization,
1112 11 : ) -> bool {
1113 11 : if optimization.sequence != self.sequence {
1114 0 : return false;
1115 11 : }
1116 11 :
1117 11 : metrics::METRICS_REGISTRY
1118 11 : .metrics_group
1119 11 : .storage_controller_schedule_optimization
1120 11 : .inc();
1121 11 :
1122 11 : match optimization.action {
1123 : ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1124 4 : old_attached_node_id,
1125 4 : new_attached_node_id,
1126 4 : }) => {
1127 4 : self.intent.demote_attached(scheduler, old_attached_node_id);
1128 4 : self.intent
1129 4 : .promote_attached(scheduler, new_attached_node_id);
1130 4 : }
1131 : ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
1132 1 : old_node_id,
1133 1 : new_node_id,
1134 1 : }) => {
1135 1 : self.intent.remove_secondary(scheduler, old_node_id);
1136 1 : self.intent.push_secondary(scheduler, new_node_id);
1137 1 : }
1138 3 : ScheduleOptimizationAction::CreateSecondary(new_node_id) => {
1139 3 : self.intent.push_secondary(scheduler, new_node_id);
1140 3 : }
1141 3 : ScheduleOptimizationAction::RemoveSecondary(old_secondary) => {
1142 3 : self.intent.remove_secondary(scheduler, old_secondary);
1143 3 : }
1144 : }
1145 :
1146 11 : true
1147 11 : }
1148 :
1149 : /// When a shard has several secondary locations, we need to pick one in situations where
1150 : /// we promote one of them to an attached location:
1151 : /// - When draining a node for restart
1152 : /// - When responding to a node failure
1153 : ///
1154 : /// In this context, 'preferred' does not mean the node with the best scheduling score: instead
1155 : /// we want to pick the node which is best for use _temporarily_ while the previous attached location
1156 : /// is unavailable (e.g. because it's down or deploying). That means we prefer to use secondary
1157 : /// locations in a non-preferred AZ, as they're more likely to have awarm cache than a temporary
1158 : /// secondary in the preferred AZ (which are usually only created for migrations, and if they exist
1159 : /// they're probably not warmed up yet). The latter behavior is based oni
1160 : ///
1161 : /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
1162 : /// caller needs to a pick a node some other way.
1163 12829 : pub(crate) fn preferred_secondary(&self, scheduler: &Scheduler) -> Option<NodeId> {
1164 12829 : let candidates = scheduler.filter_usable_nodes(&self.intent.secondary);
1165 12829 :
1166 12829 : // We will sort candidates to prefer nodes which are _not_ in our preferred AZ, i.e. we prefer
1167 12829 : // to migrate to a long-lived secondary location (which would have been scheduled in a non-preferred AZ),
1168 12829 : // rather than a short-lived secondary location being used for optimization/migration (which would have
1169 12829 : // been scheduled in our preferred AZ).
1170 12829 : let mut candidates = candidates
1171 12829 : .iter()
1172 12829 : .map(|(node_id, node_az)| {
1173 1 : if node_az == &self.intent.preferred_az_id {
1174 0 : (1, *node_id)
1175 : } else {
1176 1 : (0, *node_id)
1177 : }
1178 12829 : })
1179 12829 : .collect::<Vec<_>>();
1180 12829 :
1181 12829 : candidates.sort();
1182 12829 :
1183 12829 : candidates.first().map(|i| i.1)
1184 12829 : }
1185 :
1186 : /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
1187 : /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
1188 : /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
1189 : ///
1190 : /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
1191 : /// funciton should not be used to decide whether to reconcile.
1192 0 : pub(crate) fn stably_attached(&self) -> Option<NodeId> {
1193 0 : if let Some(attach_intent) = self.intent.attached {
1194 0 : match self.observed.locations.get(&attach_intent) {
1195 0 : Some(loc) => match &loc.conf {
1196 0 : Some(conf) => match conf.mode {
1197 : LocationConfigMode::AttachedMulti
1198 : | LocationConfigMode::AttachedSingle
1199 : | LocationConfigMode::AttachedStale => {
1200 : // Our intent and observed state agree that this node is in an attached state.
1201 0 : Some(attach_intent)
1202 : }
1203 : // Our observed config is not an attached state
1204 0 : _ => None,
1205 : },
1206 : // Our observed state is None, i.e. in flux
1207 0 : None => None,
1208 : },
1209 : // We have no observed state for this node
1210 0 : None => None,
1211 : }
1212 : } else {
1213 : // Our intent is not to attach
1214 0 : None
1215 : }
1216 0 : }
1217 :
1218 0 : fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
1219 0 : let mut dirty_nodes = HashSet::new();
1220 :
1221 0 : if let Some(node_id) = self.intent.attached {
1222 : // Maybe panic: it is a severe bug if we try to attach while generation is null.
1223 0 : let generation = self
1224 0 : .generation
1225 0 : .expect("Attempted to enter attached state without a generation");
1226 0 :
1227 0 : let wanted_conf =
1228 0 : attached_location_conf(generation, &self.shard, &self.config, &self.policy);
1229 0 : match self.observed.locations.get(&node_id) {
1230 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
1231 0 : Some(_) | None => {
1232 0 : dirty_nodes.insert(node_id);
1233 0 : }
1234 : }
1235 0 : }
1236 :
1237 0 : for node_id in &self.intent.secondary {
1238 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
1239 0 : match self.observed.locations.get(node_id) {
1240 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
1241 0 : Some(_) | None => {
1242 0 : dirty_nodes.insert(*node_id);
1243 0 : }
1244 : }
1245 : }
1246 :
1247 0 : for node_id in self.observed.locations.keys() {
1248 0 : if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
1249 0 : // We have observed state that isn't part of our intent: need to clean it up.
1250 0 : dirty_nodes.insert(*node_id);
1251 0 : }
1252 : }
1253 :
1254 0 : dirty_nodes.retain(|node_id| {
1255 0 : nodes
1256 0 : .get(node_id)
1257 0 : .map(|n| n.is_available())
1258 0 : .unwrap_or(false)
1259 0 : });
1260 0 :
1261 0 : !dirty_nodes.is_empty()
1262 0 : }
1263 :
1264 : #[allow(clippy::too_many_arguments)]
1265 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1266 : pub(crate) fn get_reconcile_needed(
1267 : &mut self,
1268 : pageservers: &Arc<HashMap<NodeId, Node>>,
1269 : ) -> ReconcileNeeded {
1270 : // If there are any ambiguous observed states, and the nodes they refer to are available,
1271 : // we should reconcile to clean them up.
1272 : let mut dirty_observed = false;
1273 : for (node_id, observed_loc) in &self.observed.locations {
1274 : let node = pageservers
1275 : .get(node_id)
1276 : .expect("Nodes may not be removed while referenced");
1277 : if observed_loc.conf.is_none() && node.is_available() {
1278 : dirty_observed = true;
1279 : break;
1280 : }
1281 : }
1282 :
1283 : let active_nodes_dirty = self.dirty(pageservers);
1284 :
1285 : // Even if there is no pageserver work to be done, if we have a pending notification to computes,
1286 : // wake up a reconciler to send it.
1287 : let do_reconcile =
1288 : active_nodes_dirty || dirty_observed || self.pending_compute_notification;
1289 :
1290 : if !do_reconcile {
1291 : tracing::debug!("Not dirty, no reconciliation needed.");
1292 : return ReconcileNeeded::No;
1293 : }
1294 :
1295 : // If we are currently splitting, then never start a reconciler task: the splitting logic
1296 : // requires that shards are not interfered with while it runs. Do this check here rather than
1297 : // up top, so that we only log this message if we would otherwise have done a reconciliation.
1298 : if !matches!(self.splitting, SplitState::Idle) {
1299 : tracing::info!("Refusing to reconcile, splitting in progress");
1300 : return ReconcileNeeded::No;
1301 : }
1302 :
1303 : // Reconcile already in flight for the current sequence?
1304 : if let Some(handle) = &self.reconciler {
1305 : if handle.sequence == self.sequence {
1306 : tracing::info!(
1307 : "Reconciliation already in progress for sequence {:?}",
1308 : self.sequence,
1309 : );
1310 : return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
1311 : tenant_shard_id: self.tenant_shard_id,
1312 : seq_wait: self.waiter.clone(),
1313 : error_seq_wait: self.error_waiter.clone(),
1314 : error: self.last_error.clone(),
1315 : seq: self.sequence,
1316 : });
1317 : }
1318 : }
1319 :
1320 : // Pre-checks done: finally check whether we may actually do the work
1321 : match self.scheduling_policy {
1322 : ShardSchedulingPolicy::Active
1323 : | ShardSchedulingPolicy::Essential
1324 : | ShardSchedulingPolicy::Pause => {}
1325 : ShardSchedulingPolicy::Stop => {
1326 : // We only reach this point if there is work to do and we're going to skip
1327 : // doing it: warn it obvious why this tenant isn't doing what it ought to.
1328 : tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
1329 : return ReconcileNeeded::No;
1330 : }
1331 : }
1332 :
1333 : ReconcileNeeded::Yes
1334 : }
1335 :
1336 : /// Ensure the sequence number is set to a value where waiting for this value will make us wait
1337 : /// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
1338 : ///
1339 : /// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
1340 : /// that the waiter will not complete until some future Reconciler is constructed and run.
1341 0 : fn ensure_sequence_ahead(&mut self) {
1342 0 : // Find the highest sequence for which a Reconciler has previously run or is currently
1343 0 : // running
1344 0 : let max_seen = std::cmp::max(
1345 0 : self.reconciler
1346 0 : .as_ref()
1347 0 : .map(|r| r.sequence)
1348 0 : .unwrap_or(Sequence(0)),
1349 0 : std::cmp::max(self.waiter.load(), self.error_waiter.load()),
1350 0 : );
1351 0 :
1352 0 : if self.sequence <= max_seen {
1353 0 : self.sequence = max_seen.next();
1354 0 : }
1355 0 : }
1356 :
1357 : /// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
1358 : ///
1359 : /// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
1360 : /// you would like to wait on the next reconciler that gets spawned in the background.
1361 0 : pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
1362 0 : self.ensure_sequence_ahead();
1363 0 :
1364 0 : ReconcilerWaiter {
1365 0 : tenant_shard_id: self.tenant_shard_id,
1366 0 : seq_wait: self.waiter.clone(),
1367 0 : error_seq_wait: self.error_waiter.clone(),
1368 0 : error: self.last_error.clone(),
1369 0 : seq: self.sequence,
1370 0 : }
1371 0 : }
1372 :
1373 0 : async fn reconcile(
1374 0 : sequence: Sequence,
1375 0 : mut reconciler: Reconciler,
1376 0 : must_notify: bool,
1377 0 : ) -> ReconcileResult {
1378 : // Attempt to make observed state match intent state
1379 0 : let result = reconciler.reconcile().await;
1380 :
1381 : // If we know we had a pending compute notification from some previous action, send a notification irrespective
1382 : // of whether the above reconcile() did any work. It has to be Ok() though, because otherwise we might be
1383 : // sending a notification of a location that isn't really attached.
1384 0 : if result.is_ok() && must_notify {
1385 : // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
1386 0 : reconciler.compute_notify().await.ok();
1387 0 : } else if must_notify {
1388 0 : // Carry this flag so that the reconciler's result will indicate that it still needs to retry
1389 0 : // the compute hook notification eventually.
1390 0 : reconciler.compute_notify_failure = true;
1391 0 : }
1392 :
1393 : // Update result counter
1394 0 : let outcome_label = match &result {
1395 0 : Ok(_) => ReconcileOutcome::Success,
1396 0 : Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
1397 0 : Err(_) => ReconcileOutcome::Error,
1398 : };
1399 :
1400 0 : metrics::METRICS_REGISTRY
1401 0 : .metrics_group
1402 0 : .storage_controller_reconcile_complete
1403 0 : .inc(ReconcileCompleteLabelGroup {
1404 0 : status: outcome_label,
1405 0 : });
1406 0 :
1407 0 : // Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
1408 0 : // try and schedule more work in response to our result.
1409 0 : ReconcileResult {
1410 0 : sequence,
1411 0 : result,
1412 0 : tenant_shard_id: reconciler.tenant_shard_id,
1413 0 : generation: reconciler.generation,
1414 0 : observed_deltas: reconciler.observed_deltas(),
1415 0 : pending_compute_notification: reconciler.compute_notify_failure,
1416 0 : }
1417 0 : }
1418 :
1419 : #[allow(clippy::too_many_arguments)]
1420 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1421 : pub(crate) fn spawn_reconciler(
1422 : &mut self,
1423 : result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
1424 : pageservers: &Arc<HashMap<NodeId, Node>>,
1425 : compute_hook: &Arc<ComputeHook>,
1426 : reconciler_config: ReconcilerConfig,
1427 : service_config: &service::Config,
1428 : persistence: &Arc<Persistence>,
1429 : units: ReconcileUnits,
1430 : gate_guard: GateGuard,
1431 : cancel: &CancellationToken,
1432 : ) -> Option<ReconcilerWaiter> {
1433 : // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
1434 : // doing our sequence's work.
1435 : let old_handle = self.reconciler.take();
1436 :
1437 : // Build list of nodes from which the reconciler should detach
1438 : let mut detach = Vec::new();
1439 : for node_id in self.observed.locations.keys() {
1440 : if self.intent.get_attached() != &Some(*node_id)
1441 : && !self.intent.secondary.contains(node_id)
1442 : {
1443 : detach.push(
1444 : pageservers
1445 : .get(node_id)
1446 : .expect("Intent references non-existent pageserver")
1447 : .clone(),
1448 : )
1449 : }
1450 : }
1451 :
1452 : // Advance the sequence before spawning a reconciler, so that sequence waiters
1453 : // can distinguish between before+after the reconcile completes.
1454 : self.ensure_sequence_ahead();
1455 :
1456 : let reconciler_cancel = cancel.child_token();
1457 : let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
1458 : let reconciler = Reconciler {
1459 : tenant_shard_id: self.tenant_shard_id,
1460 : shard: self.shard,
1461 : placement_policy: self.policy.clone(),
1462 : generation: self.generation,
1463 : intent: reconciler_intent,
1464 : detach,
1465 : reconciler_config,
1466 : config: self.config.clone(),
1467 : preferred_az: self.intent.preferred_az_id.clone(),
1468 : observed: self.observed.clone(),
1469 : original_observed: self.observed.clone(),
1470 : compute_hook: compute_hook.clone(),
1471 : service_config: service_config.clone(),
1472 : _gate_guard: gate_guard,
1473 : _resource_units: units,
1474 : cancel: reconciler_cancel.clone(),
1475 : persistence: persistence.clone(),
1476 : compute_notify_failure: false,
1477 : };
1478 :
1479 : let reconcile_seq = self.sequence;
1480 : let long_reconcile_threshold = service_config.long_reconcile_threshold;
1481 :
1482 : tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
1483 : let must_notify = self.pending_compute_notification;
1484 : let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
1485 : tenant_id=%reconciler.tenant_shard_id.tenant_id,
1486 : shard_id=%reconciler.tenant_shard_id.shard_slug());
1487 : metrics::METRICS_REGISTRY
1488 : .metrics_group
1489 : .storage_controller_reconcile_spawn
1490 : .inc();
1491 : let result_tx = result_tx.clone();
1492 : let join_handle = tokio::task::spawn(
1493 0 : async move {
1494 : // Wait for any previous reconcile task to complete before we start
1495 0 : if let Some(old_handle) = old_handle {
1496 0 : old_handle.cancel.cancel();
1497 0 : if let Err(e) = old_handle.handle.await {
1498 : // We can't do much with this other than log it: the task is done, so
1499 : // we may proceed with our work.
1500 0 : tracing::error!("Unexpected join error waiting for reconcile task: {e}");
1501 0 : }
1502 0 : }
1503 :
1504 : // Early check for cancellation before doing any work
1505 : // TODO: wrap all remote API operations in cancellation check
1506 : // as well.
1507 0 : if reconciler.cancel.is_cancelled() {
1508 0 : metrics::METRICS_REGISTRY
1509 0 : .metrics_group
1510 0 : .storage_controller_reconcile_complete
1511 0 : .inc(ReconcileCompleteLabelGroup {
1512 0 : status: ReconcileOutcome::Cancel,
1513 0 : });
1514 0 : return;
1515 0 : }
1516 0 :
1517 0 : let (tenant_id_label, shard_number_label, sequence_label) = {
1518 0 : (
1519 0 : reconciler.tenant_shard_id.tenant_id.to_string(),
1520 0 : reconciler.tenant_shard_id.shard_number.0.to_string(),
1521 0 : reconcile_seq.to_string(),
1522 0 : )
1523 0 : };
1524 0 :
1525 0 : let label_group = ReconcileLongRunningLabelGroup {
1526 0 : tenant_id: &tenant_id_label,
1527 0 : shard_number: &shard_number_label,
1528 0 : sequence: &sequence_label,
1529 0 : };
1530 0 :
1531 0 : let reconcile_fut = Self::reconcile(reconcile_seq, reconciler, must_notify);
1532 0 : let long_reconcile_fut = {
1533 0 : let label_group = label_group.clone();
1534 0 : async move {
1535 0 : tokio::time::sleep(long_reconcile_threshold).await;
1536 :
1537 0 : tracing::warn!("Reconcile passed the long running threshold of {long_reconcile_threshold:?}");
1538 :
1539 0 : metrics::METRICS_REGISTRY
1540 0 : .metrics_group
1541 0 : .storage_controller_reconcile_long_running
1542 0 : .inc(label_group);
1543 0 : }
1544 : };
1545 :
1546 0 : let reconcile_fut = std::pin::pin!(reconcile_fut);
1547 0 : let long_reconcile_fut = std::pin::pin!(long_reconcile_fut);
1548 :
1549 0 : let (was_long, result) =
1550 0 : match future::select(reconcile_fut, long_reconcile_fut).await {
1551 0 : Either::Left((reconcile_result, _)) => (false, reconcile_result),
1552 0 : Either::Right((_, reconcile_fut)) => (true, reconcile_fut.await),
1553 : };
1554 :
1555 0 : if was_long {
1556 0 : let id = metrics::METRICS_REGISTRY
1557 0 : .metrics_group
1558 0 : .storage_controller_reconcile_long_running
1559 0 : .with_labels(label_group);
1560 0 : metrics::METRICS_REGISTRY
1561 0 : .metrics_group
1562 0 : .storage_controller_reconcile_long_running
1563 0 : .remove_metric(id);
1564 0 : }
1565 :
1566 0 : result_tx
1567 0 : .send(ReconcileResultRequest::ReconcileResult(result))
1568 0 : .ok();
1569 0 : }
1570 : .instrument(reconciler_span),
1571 : );
1572 :
1573 : self.reconciler = Some(ReconcilerHandle {
1574 : sequence: self.sequence,
1575 : handle: join_handle,
1576 : cancel: reconciler_cancel,
1577 : });
1578 :
1579 : Some(ReconcilerWaiter {
1580 : tenant_shard_id: self.tenant_shard_id,
1581 : seq_wait: self.waiter.clone(),
1582 : error_seq_wait: self.error_waiter.clone(),
1583 : error: self.last_error.clone(),
1584 : seq: self.sequence,
1585 : })
1586 : }
1587 :
1588 0 : pub(crate) fn cancel_reconciler(&self) {
1589 0 : if let Some(handle) = self.reconciler.as_ref() {
1590 0 : handle.cancel.cancel()
1591 0 : }
1592 0 : }
1593 :
1594 : /// Get a waiter for any reconciliation in flight, but do not start reconciliation
1595 : /// if it is not already running
1596 0 : pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
1597 0 : if self.reconciler.is_some() {
1598 0 : Some(ReconcilerWaiter {
1599 0 : tenant_shard_id: self.tenant_shard_id,
1600 0 : seq_wait: self.waiter.clone(),
1601 0 : error_seq_wait: self.error_waiter.clone(),
1602 0 : error: self.last_error.clone(),
1603 0 : seq: self.sequence,
1604 0 : })
1605 : } else {
1606 0 : None
1607 : }
1608 0 : }
1609 :
1610 : /// Called when a ReconcileResult has been emitted and the service is updating
1611 : /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
1612 : /// the handle to indicate there is no longer a reconciliation in progress.
1613 0 : pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
1614 0 : if let Some(reconcile_handle) = &self.reconciler {
1615 0 : if reconcile_handle.sequence <= sequence {
1616 0 : self.reconciler = None;
1617 0 : }
1618 0 : }
1619 0 : }
1620 :
1621 : /// If we had any state at all referring to this node ID, drop it. Does not
1622 : /// attempt to reschedule.
1623 : ///
1624 : /// Returns true if we modified the node's intent state.
1625 0 : pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
1626 0 : let mut intent_modified = false;
1627 0 :
1628 0 : // Drop if this node was our attached intent
1629 0 : if self.intent.attached == Some(node_id) {
1630 0 : self.intent.attached = None;
1631 0 : intent_modified = true;
1632 0 : }
1633 :
1634 : // Drop from the list of secondaries, and check if we modified it
1635 0 : let had_secondaries = self.intent.secondary.len();
1636 0 : self.intent.secondary.retain(|n| n != &node_id);
1637 0 : intent_modified |= self.intent.secondary.len() != had_secondaries;
1638 0 :
1639 0 : debug_assert!(!self.intent.all_pageservers().contains(&node_id));
1640 :
1641 0 : intent_modified
1642 0 : }
1643 :
1644 0 : pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
1645 0 : self.scheduling_policy = p;
1646 0 : }
1647 :
1648 0 : pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
1649 0 : &self.scheduling_policy
1650 0 : }
1651 :
1652 0 : pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
1653 0 : // Ordering: always set last_error before advancing sequence, so that sequence
1654 0 : // waiters are guaranteed to see a Some value when they see an error.
1655 0 : *(self.last_error.lock().unwrap()) = Some(Arc::new(error));
1656 0 : self.error_waiter.advance(sequence);
1657 0 : }
1658 :
1659 0 : pub(crate) fn from_persistent(
1660 0 : tsp: TenantShardPersistence,
1661 0 : intent: IntentState,
1662 0 : ) -> anyhow::Result<Self> {
1663 0 : let tenant_shard_id = tsp.get_tenant_shard_id()?;
1664 0 : let shard_identity = tsp.get_shard_identity()?;
1665 :
1666 0 : metrics::METRICS_REGISTRY
1667 0 : .metrics_group
1668 0 : .storage_controller_tenant_shards
1669 0 : .inc();
1670 0 :
1671 0 : Ok(Self {
1672 0 : tenant_shard_id,
1673 0 : shard: shard_identity,
1674 0 : sequence: Sequence::initial(),
1675 0 : generation: tsp.generation.map(|g| Generation::new(g as u32)),
1676 0 : policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
1677 0 : intent,
1678 0 : observed: ObservedState::new(),
1679 0 : config: serde_json::from_str(&tsp.config).unwrap(),
1680 0 : reconciler: None,
1681 0 : splitting: tsp.splitting,
1682 0 : waiter: Arc::new(SeqWait::new(Sequence::initial())),
1683 0 : error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
1684 0 : last_error: Arc::default(),
1685 0 : pending_compute_notification: false,
1686 0 : delayed_reconcile: false,
1687 0 : scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
1688 0 : })
1689 0 : }
1690 :
1691 0 : pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
1692 0 : TenantShardPersistence {
1693 0 : tenant_id: self.tenant_shard_id.tenant_id.to_string(),
1694 0 : shard_number: self.tenant_shard_id.shard_number.0 as i32,
1695 0 : shard_count: self.tenant_shard_id.shard_count.literal() as i32,
1696 0 : shard_stripe_size: self.shard.stripe_size.0 as i32,
1697 0 : generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
1698 0 : generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
1699 0 : placement_policy: serde_json::to_string(&self.policy).unwrap(),
1700 0 : config: serde_json::to_string(&self.config).unwrap(),
1701 0 : splitting: SplitState::default(),
1702 0 : scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
1703 0 : preferred_az_id: self.intent.preferred_az_id.as_ref().map(|az| az.0.clone()),
1704 0 : }
1705 0 : }
1706 :
1707 12502 : pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> {
1708 12502 : self.intent.preferred_az_id.as_ref()
1709 12502 : }
1710 :
1711 0 : pub(crate) fn set_preferred_az(&mut self, preferred_az_id: Option<AvailabilityZone>) {
1712 0 : self.intent.preferred_az_id = preferred_az_id;
1713 0 : }
1714 :
1715 : /// Returns all the nodes to which this tenant shard is attached according to the
1716 : /// observed state and the generations. Return vector is sorted from latest generation
1717 : /// to earliest.
1718 0 : pub(crate) fn attached_locations(&self) -> Vec<(NodeId, Generation)> {
1719 0 : self.observed
1720 0 : .locations
1721 0 : .iter()
1722 0 : .filter_map(|(node_id, observed)| {
1723 : use LocationConfigMode::{AttachedMulti, AttachedSingle, AttachedStale};
1724 :
1725 0 : let conf = observed.conf.as_ref()?;
1726 :
1727 0 : match (conf.generation, conf.mode) {
1728 0 : (Some(gen), AttachedMulti | AttachedSingle | AttachedStale) => {
1729 0 : Some((*node_id, gen))
1730 : }
1731 0 : _ => None,
1732 : }
1733 0 : })
1734 0 : .sorted_by(|(_lhs_node_id, lhs_gen), (_rhs_node_id, rhs_gen)| {
1735 0 : lhs_gen.cmp(rhs_gen).reverse()
1736 0 : })
1737 0 : .map(|(node_id, gen)| (node_id, Generation::new(gen)))
1738 0 : .collect()
1739 0 : }
1740 :
1741 : /// Update the observed state of the tenant by applying incremental deltas
1742 : ///
1743 : /// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
1744 : /// They are then filtered in [`crate::service::Service::process_result`].
1745 0 : pub(crate) fn apply_observed_deltas(
1746 0 : &mut self,
1747 0 : deltas: impl Iterator<Item = ObservedStateDelta>,
1748 0 : ) {
1749 0 : for delta in deltas {
1750 0 : match delta {
1751 0 : ObservedStateDelta::Upsert(ups) => {
1752 0 : let (node_id, loc) = *ups;
1753 0 :
1754 0 : // If the generation of the observed location in the delta is lagging
1755 0 : // behind the current one, then we have a race condition and cannot
1756 0 : // be certain about the true observed state. Set the observed state
1757 0 : // to None in order to reflect this.
1758 0 : let crnt_gen = self
1759 0 : .observed
1760 0 : .locations
1761 0 : .get(&node_id)
1762 0 : .and_then(|loc| loc.conf.as_ref())
1763 0 : .and_then(|conf| conf.generation);
1764 0 : let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
1765 0 : match (crnt_gen, new_gen) {
1766 0 : (Some(crnt), Some(new)) if crnt_gen > new_gen => {
1767 0 : tracing::warn!(
1768 0 : "Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
1769 : node_id, loc, crnt, new
1770 : );
1771 :
1772 0 : self.observed
1773 0 : .locations
1774 0 : .insert(node_id, ObservedStateLocation { conf: None });
1775 0 :
1776 0 : continue;
1777 : }
1778 0 : _ => {}
1779 : }
1780 :
1781 0 : if let Some(conf) = &loc.conf {
1782 0 : tracing::info!("Updating observed location {}: {:?}", node_id, conf);
1783 : } else {
1784 0 : tracing::info!("Setting observed location {} to None", node_id,)
1785 : }
1786 :
1787 0 : self.observed.locations.insert(node_id, loc);
1788 : }
1789 0 : ObservedStateDelta::Delete(node_id) => {
1790 0 : tracing::info!("Deleting observed location {}", node_id);
1791 0 : self.observed.locations.remove(&node_id);
1792 : }
1793 : }
1794 : }
1795 0 : }
1796 :
1797 : /// Returns true if the tenant shard is attached to a node that is outside the preferred AZ.
1798 : ///
1799 : /// If the shard does not have a preferred AZ, returns false.
1800 0 : pub(crate) fn is_attached_outside_preferred_az(&self, nodes: &HashMap<NodeId, Node>) -> bool {
1801 0 : self.intent
1802 0 : .get_attached()
1803 0 : .map(|node_id| {
1804 0 : Some(
1805 0 : nodes
1806 0 : .get(&node_id)
1807 0 : .expect("referenced node exists")
1808 0 : .get_availability_zone_id(),
1809 0 : ) == self.intent.preferred_az_id.as_ref()
1810 0 : })
1811 0 : .unwrap_or(false)
1812 0 : }
1813 : }
1814 :
1815 : impl Drop for TenantShard {
1816 12839 : fn drop(&mut self) {
1817 12839 : metrics::METRICS_REGISTRY
1818 12839 : .metrics_group
1819 12839 : .storage_controller_tenant_shards
1820 12839 : .dec();
1821 12839 : }
1822 : }
1823 :
1824 : #[cfg(test)]
1825 : pub(crate) mod tests {
1826 : use std::{cell::RefCell, rc::Rc};
1827 :
1828 : use pageserver_api::{
1829 : controller_api::NodeAvailability,
1830 : shard::{ShardCount, ShardNumber},
1831 : };
1832 : use rand::{rngs::StdRng, SeedableRng};
1833 : use utils::id::TenantId;
1834 :
1835 : use crate::scheduler::test_utils::make_test_nodes;
1836 :
1837 : use super::*;
1838 :
1839 9 : fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
1840 9 : let tenant_id = TenantId::generate();
1841 9 : let shard_number = ShardNumber(0);
1842 9 : let shard_count = ShardCount::new(1);
1843 9 :
1844 9 : let tenant_shard_id = TenantShardId {
1845 9 : tenant_id,
1846 9 : shard_number,
1847 9 : shard_count,
1848 9 : };
1849 9 : TenantShard::new(
1850 9 : tenant_shard_id,
1851 9 : ShardIdentity::new(
1852 9 : shard_number,
1853 9 : shard_count,
1854 9 : pageserver_api::shard::ShardStripeSize(32768),
1855 9 : )
1856 9 : .unwrap(),
1857 9 : policy,
1858 9 : None,
1859 9 : )
1860 9 : }
1861 :
1862 5004 : pub(crate) fn make_test_tenant(
1863 5004 : policy: PlacementPolicy,
1864 5004 : shard_count: ShardCount,
1865 5004 : preferred_az: Option<AvailabilityZone>,
1866 5004 : ) -> Vec<TenantShard> {
1867 5004 : make_test_tenant_with_id(TenantId::generate(), policy, shard_count, preferred_az)
1868 5004 : }
1869 :
1870 5007 : pub(crate) fn make_test_tenant_with_id(
1871 5007 : tenant_id: TenantId,
1872 5007 : policy: PlacementPolicy,
1873 5007 : shard_count: ShardCount,
1874 5007 : preferred_az: Option<AvailabilityZone>,
1875 5007 : ) -> Vec<TenantShard> {
1876 5007 : (0..shard_count.count())
1877 12522 : .map(|i| {
1878 12522 : let shard_number = ShardNumber(i);
1879 12522 :
1880 12522 : let tenant_shard_id = TenantShardId {
1881 12522 : tenant_id,
1882 12522 : shard_number,
1883 12522 : shard_count,
1884 12522 : };
1885 12522 : TenantShard::new(
1886 12522 : tenant_shard_id,
1887 12522 : ShardIdentity::new(
1888 12522 : shard_number,
1889 12522 : shard_count,
1890 12522 : pageserver_api::shard::ShardStripeSize(32768),
1891 12522 : )
1892 12522 : .unwrap(),
1893 12522 : policy.clone(),
1894 12522 : preferred_az.clone(),
1895 12522 : )
1896 12522 : })
1897 5007 : .collect()
1898 5007 : }
1899 :
1900 : /// Test the scheduling behaviors used when a tenant configured for HA is subject
1901 : /// to nodes being marked offline.
1902 : #[test]
1903 1 : fn tenant_ha_scheduling() -> anyhow::Result<()> {
1904 1 : // Start with three nodes. Our tenant will only use two. The third one is
1905 1 : // expected to remain unused.
1906 1 : let mut nodes = make_test_nodes(3, &[]);
1907 1 :
1908 1 : let mut scheduler = Scheduler::new(nodes.values());
1909 1 : let mut context = ScheduleContext::default();
1910 1 :
1911 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1912 1 : tenant_shard
1913 1 : .schedule(&mut scheduler, &mut context)
1914 1 : .expect("we have enough nodes, scheduling should work");
1915 1 :
1916 1 : // Expect to initially be schedule on to different nodes
1917 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
1918 1 : assert!(tenant_shard.intent.attached.is_some());
1919 :
1920 1 : let attached_node_id = tenant_shard.intent.attached.unwrap();
1921 1 : let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
1922 1 : assert_ne!(attached_node_id, secondary_node_id);
1923 :
1924 : // Notifying the attached node is offline should demote it to a secondary
1925 1 : let changed = tenant_shard
1926 1 : .intent
1927 1 : .demote_attached(&mut scheduler, attached_node_id);
1928 1 : assert!(changed);
1929 1 : assert!(tenant_shard.intent.attached.is_none());
1930 1 : assert_eq!(tenant_shard.intent.secondary.len(), 2);
1931 :
1932 : // Update the scheduler state to indicate the node is offline
1933 1 : nodes
1934 1 : .get_mut(&attached_node_id)
1935 1 : .unwrap()
1936 1 : .set_availability(NodeAvailability::Offline);
1937 1 : scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
1938 1 :
1939 1 : // Scheduling the node should promote the still-available secondary node to attached
1940 1 : tenant_shard
1941 1 : .schedule(&mut scheduler, &mut context)
1942 1 : .expect("active nodes are available");
1943 1 : assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
1944 :
1945 : // The original attached node should have been retained as a secondary
1946 1 : assert_eq!(
1947 1 : *tenant_shard.intent.secondary.iter().last().unwrap(),
1948 1 : attached_node_id
1949 1 : );
1950 :
1951 1 : tenant_shard.intent.clear(&mut scheduler);
1952 1 :
1953 1 : Ok(())
1954 1 : }
1955 :
1956 : #[test]
1957 1 : fn intent_from_observed() -> anyhow::Result<()> {
1958 1 : let nodes = make_test_nodes(3, &[]);
1959 1 : let mut scheduler = Scheduler::new(nodes.values());
1960 1 :
1961 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1962 1 :
1963 1 : tenant_shard.observed.locations.insert(
1964 1 : NodeId(3),
1965 1 : ObservedStateLocation {
1966 1 : conf: Some(LocationConfig {
1967 1 : mode: LocationConfigMode::AttachedMulti,
1968 1 : generation: Some(2),
1969 1 : secondary_conf: None,
1970 1 : shard_number: tenant_shard.shard.number.0,
1971 1 : shard_count: tenant_shard.shard.count.literal(),
1972 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1973 1 : tenant_conf: TenantConfig::default(),
1974 1 : }),
1975 1 : },
1976 1 : );
1977 1 :
1978 1 : tenant_shard.observed.locations.insert(
1979 1 : NodeId(2),
1980 1 : ObservedStateLocation {
1981 1 : conf: Some(LocationConfig {
1982 1 : mode: LocationConfigMode::AttachedStale,
1983 1 : generation: Some(1),
1984 1 : secondary_conf: None,
1985 1 : shard_number: tenant_shard.shard.number.0,
1986 1 : shard_count: tenant_shard.shard.count.literal(),
1987 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1988 1 : tenant_conf: TenantConfig::default(),
1989 1 : }),
1990 1 : },
1991 1 : );
1992 1 :
1993 1 : tenant_shard.intent_from_observed(&mut scheduler);
1994 1 :
1995 1 : // The highest generationed attached location gets used as attached
1996 1 : assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
1997 : // Other locations get used as secondary
1998 1 : assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
1999 :
2000 1 : scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
2001 :
2002 1 : tenant_shard.intent.clear(&mut scheduler);
2003 1 : Ok(())
2004 1 : }
2005 :
2006 : #[test]
2007 1 : fn scheduling_mode() -> anyhow::Result<()> {
2008 1 : let nodes = make_test_nodes(3, &[]);
2009 1 : let mut scheduler = Scheduler::new(nodes.values());
2010 1 :
2011 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
2012 1 :
2013 1 : // In pause mode, schedule() shouldn't do anything
2014 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
2015 1 : assert!(tenant_shard
2016 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
2017 1 : .is_ok());
2018 1 : assert!(tenant_shard.intent.all_pageservers().is_empty());
2019 :
2020 : // In active mode, schedule() works
2021 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
2022 1 : assert!(tenant_shard
2023 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
2024 1 : .is_ok());
2025 1 : assert!(!tenant_shard.intent.all_pageservers().is_empty());
2026 :
2027 1 : tenant_shard.intent.clear(&mut scheduler);
2028 1 : Ok(())
2029 1 : }
2030 :
2031 : #[test]
2032 : /// Simple case: moving attachment to somewhere better where we already have a secondary
2033 1 : fn optimize_attachment_simple() -> anyhow::Result<()> {
2034 1 : let nodes = make_test_nodes(
2035 1 : 3,
2036 1 : &[
2037 1 : AvailabilityZone("az-a".to_string()),
2038 1 : AvailabilityZone("az-b".to_string()),
2039 1 : AvailabilityZone("az-c".to_string()),
2040 1 : ],
2041 1 : );
2042 1 : let mut scheduler = Scheduler::new(nodes.values());
2043 1 :
2044 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2045 1 : shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2046 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
2047 1 : shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2048 1 :
2049 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
2050 1 : // on different nodes.
2051 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
2052 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(1));
2053 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
2054 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
2055 :
2056 1 : fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
2057 1 : let mut schedule_context = ScheduleContext::default();
2058 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
2059 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
2060 1 : schedule_context
2061 1 : }
2062 :
2063 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2064 1 : let optimization_a = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2065 1 : assert_eq!(
2066 1 : optimization_a,
2067 1 : Some(ScheduleOptimization {
2068 1 : sequence: shard_a.sequence,
2069 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2070 1 : old_attached_node_id: NodeId(2),
2071 1 : new_attached_node_id: NodeId(1)
2072 1 : })
2073 1 : })
2074 1 : );
2075 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
2076 1 :
2077 1 : // // Either shard should recognize that it has the option to switch to a secondary location where there
2078 1 : // // would be no other shards from the same tenant, and request to do so.
2079 1 : // assert_eq!(
2080 1 : // optimization_a_prepare,
2081 1 : // Some(ScheduleOptimization {
2082 1 : // sequence: shard_a.sequence,
2083 1 : // action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
2084 1 : // })
2085 1 : // );
2086 1 : // shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
2087 1 :
2088 1 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2089 1 : // let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2090 1 : // assert_eq!(
2091 1 : // optimization_a_migrate,
2092 1 : // Some(ScheduleOptimization {
2093 1 : // sequence: shard_a.sequence,
2094 1 : // action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2095 1 : // old_attached_node_id: NodeId(1),
2096 1 : // new_attached_node_id: NodeId(2)
2097 1 : // })
2098 1 : // })
2099 1 : // );
2100 1 : // shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
2101 1 :
2102 1 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2103 1 : // let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2104 1 : // assert_eq!(
2105 1 : // optimization_a_cleanup,
2106 1 : // Some(ScheduleOptimization {
2107 1 : // sequence: shard_a.sequence,
2108 1 : // action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
2109 1 : // })
2110 1 : // );
2111 1 : // shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
2112 1 :
2113 1 : // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
2114 1 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2115 1 : // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
2116 1 :
2117 1 : shard_a.intent.clear(&mut scheduler);
2118 1 : shard_b.intent.clear(&mut scheduler);
2119 1 :
2120 1 : Ok(())
2121 1 : }
2122 :
2123 : #[test]
2124 : /// Complicated case: moving attachment to somewhere better where we do not have a secondary
2125 : /// already, creating one as needed.
2126 1 : fn optimize_attachment_multistep() -> anyhow::Result<()> {
2127 1 : let nodes = make_test_nodes(
2128 1 : 3,
2129 1 : &[
2130 1 : AvailabilityZone("az-a".to_string()),
2131 1 : AvailabilityZone("az-b".to_string()),
2132 1 : AvailabilityZone("az-c".to_string()),
2133 1 : ],
2134 1 : );
2135 1 : let mut scheduler = Scheduler::new(nodes.values());
2136 1 :
2137 1 : // Two shards of a tenant that wants to be in AZ A
2138 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2139 1 : shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2140 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
2141 1 : shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
2142 1 :
2143 1 : // Both shards are initially attached in non-home AZ _and_ have secondaries in non-home AZs
2144 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
2145 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
2146 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(3)));
2147 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
2148 :
2149 3 : fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
2150 3 : let mut schedule_context = ScheduleContext::default();
2151 3 : schedule_context.avoid(&shard_a.intent.all_pageservers());
2152 3 : schedule_context.avoid(&shard_b.intent.all_pageservers());
2153 3 : schedule_context
2154 3 : }
2155 :
2156 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2157 1 : let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2158 1 : assert_eq!(
2159 1 : optimization_a_prepare,
2160 1 : Some(ScheduleOptimization {
2161 1 : sequence: shard_a.sequence,
2162 1 : action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
2163 1 : })
2164 1 : );
2165 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
2166 1 :
2167 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2168 1 : let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2169 1 : assert_eq!(
2170 1 : optimization_a_migrate,
2171 1 : Some(ScheduleOptimization {
2172 1 : sequence: shard_a.sequence,
2173 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2174 1 : old_attached_node_id: NodeId(2),
2175 1 : new_attached_node_id: NodeId(1)
2176 1 : })
2177 1 : })
2178 1 : );
2179 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
2180 1 :
2181 1 : let schedule_context = make_schedule_context(&shard_a, &shard_b);
2182 1 : let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
2183 1 : assert_eq!(
2184 1 : optimization_a_cleanup,
2185 1 : Some(ScheduleOptimization {
2186 1 : sequence: shard_a.sequence,
2187 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
2188 1 : })
2189 1 : );
2190 1 : shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
2191 1 :
2192 1 : // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
2193 1 : // let schedule_context = make_schedule_context(&shard_a, &shard_b);
2194 1 : // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
2195 1 :
2196 1 : shard_a.intent.clear(&mut scheduler);
2197 1 : shard_b.intent.clear(&mut scheduler);
2198 1 :
2199 1 : Ok(())
2200 1 : }
2201 :
2202 : #[test]
2203 : /// Check that multi-step migration works when moving to somewhere that is only better by
2204 : /// 1 AffinityScore -- this ensures that we don't have a bug like the intermediate secondary
2205 : /// counting toward the affinity score such that it prevents the rest of the migration from happening.
2206 1 : fn optimize_attachment_marginal() -> anyhow::Result<()> {
2207 1 : let nodes = make_test_nodes(2, &[]);
2208 1 : let mut scheduler = Scheduler::new(nodes.values());
2209 1 :
2210 1 : // Multi-sharded tenant, we will craft a situation where affinity
2211 1 : // scores differ only slightly
2212 1 : let mut shards = make_test_tenant(PlacementPolicy::Attached(0), ShardCount::new(4), None);
2213 1 :
2214 1 : // 1 attached on node 1
2215 1 : shards[0]
2216 1 : .intent
2217 1 : .set_attached(&mut scheduler, Some(NodeId(1)));
2218 1 : // 3 attached on node 2
2219 1 : shards[1]
2220 1 : .intent
2221 1 : .set_attached(&mut scheduler, Some(NodeId(2)));
2222 1 : shards[2]
2223 1 : .intent
2224 1 : .set_attached(&mut scheduler, Some(NodeId(2)));
2225 1 : shards[3]
2226 1 : .intent
2227 1 : .set_attached(&mut scheduler, Some(NodeId(2)));
2228 :
2229 : // The scheduler should figure out that we need to:
2230 : // - Create a secondary for shard 3 on node 1
2231 : // - Migrate shard 3 to node 1
2232 : // - Remove shard 3's location on node 2
2233 :
2234 4 : fn make_schedule_context(shards: &Vec<TenantShard>) -> ScheduleContext {
2235 4 : let mut schedule_context = ScheduleContext::default();
2236 20 : for shard in shards {
2237 16 : schedule_context.avoid(&shard.intent.all_pageservers());
2238 16 : }
2239 4 : schedule_context
2240 4 : }
2241 :
2242 1 : let schedule_context = make_schedule_context(&shards);
2243 1 : let optimization_a_prepare =
2244 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context);
2245 1 : assert_eq!(
2246 1 : optimization_a_prepare,
2247 1 : Some(ScheduleOptimization {
2248 1 : sequence: shards[1].sequence,
2249 1 : action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
2250 1 : })
2251 1 : );
2252 1 : shards[1].apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
2253 1 :
2254 1 : let schedule_context = make_schedule_context(&shards);
2255 1 : let optimization_a_migrate =
2256 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context);
2257 1 : assert_eq!(
2258 1 : optimization_a_migrate,
2259 1 : Some(ScheduleOptimization {
2260 1 : sequence: shards[1].sequence,
2261 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
2262 1 : old_attached_node_id: NodeId(2),
2263 1 : new_attached_node_id: NodeId(1)
2264 1 : })
2265 1 : })
2266 1 : );
2267 1 : shards[1].apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
2268 1 :
2269 1 : let schedule_context = make_schedule_context(&shards);
2270 1 : let optimization_a_cleanup =
2271 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context);
2272 1 : assert_eq!(
2273 1 : optimization_a_cleanup,
2274 1 : Some(ScheduleOptimization {
2275 1 : sequence: shards[1].sequence,
2276 1 : action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
2277 1 : })
2278 1 : );
2279 1 : shards[1].apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
2280 1 :
2281 1 : // Everything should be stable now
2282 1 : let schedule_context = make_schedule_context(&shards);
2283 1 : assert_eq!(
2284 1 : shards[0].optimize_attachment(&mut scheduler, &schedule_context),
2285 1 : None
2286 1 : );
2287 1 : assert_eq!(
2288 1 : shards[1].optimize_attachment(&mut scheduler, &schedule_context),
2289 1 : None
2290 1 : );
2291 1 : assert_eq!(
2292 1 : shards[2].optimize_attachment(&mut scheduler, &schedule_context),
2293 1 : None
2294 1 : );
2295 1 : assert_eq!(
2296 1 : shards[3].optimize_attachment(&mut scheduler, &schedule_context),
2297 1 : None
2298 1 : );
2299 :
2300 5 : for mut shard in shards {
2301 4 : shard.intent.clear(&mut scheduler);
2302 4 : }
2303 :
2304 1 : Ok(())
2305 1 : }
2306 :
2307 : #[test]
2308 1 : fn optimize_secondary() -> anyhow::Result<()> {
2309 1 : let nodes = make_test_nodes(4, &[]);
2310 1 : let mut scheduler = Scheduler::new(nodes.values());
2311 1 :
2312 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
2313 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
2314 1 :
2315 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
2316 1 : // on different nodes.
2317 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
2318 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
2319 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
2320 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
2321 1 :
2322 1 : let mut schedule_context = ScheduleContext::default();
2323 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
2324 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
2325 1 :
2326 1 : let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);
2327 1 :
2328 1 : // Since there is a node with no locations available, the node with two locations for the
2329 1 : // same tenant should generate an optimization to move one away
2330 1 : assert_eq!(
2331 1 : optimization_a,
2332 1 : Some(ScheduleOptimization {
2333 1 : sequence: shard_a.sequence,
2334 1 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
2335 1 : old_node_id: NodeId(3),
2336 1 : new_node_id: NodeId(4)
2337 1 : })
2338 1 : })
2339 1 : );
2340 :
2341 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
2342 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
2343 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
2344 :
2345 1 : shard_a.intent.clear(&mut scheduler);
2346 1 : shard_b.intent.clear(&mut scheduler);
2347 1 :
2348 1 : Ok(())
2349 1 : }
2350 :
2351 : // Optimize til quiescent: this emulates what Service::optimize_all does, when
2352 : // called repeatedly in the background.
2353 : // Returns the applied optimizations
2354 3 : fn optimize_til_idle(
2355 3 : scheduler: &mut Scheduler,
2356 3 : shards: &mut [TenantShard],
2357 3 : ) -> Vec<ScheduleOptimization> {
2358 3 : let mut loop_n = 0;
2359 3 : let mut optimizations = Vec::default();
2360 : loop {
2361 6 : let mut schedule_context = ScheduleContext::default();
2362 6 : let mut any_changed = false;
2363 :
2364 24 : for shard in shards.iter() {
2365 24 : schedule_context.avoid(&shard.intent.all_pageservers());
2366 24 : }
2367 :
2368 15 : for shard in shards.iter_mut() {
2369 15 : let optimization = shard.optimize_attachment(scheduler, &schedule_context);
2370 15 : tracing::info!(
2371 0 : "optimize_attachment({})={:?}",
2372 : shard.tenant_shard_id,
2373 : optimization
2374 : );
2375 15 : if let Some(optimization) = optimization {
2376 : // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
2377 3 : assert!(shard.maybe_optimizable(scheduler, &schedule_context));
2378 3 : optimizations.push(optimization.clone());
2379 3 : shard.apply_optimization(scheduler, optimization);
2380 3 : any_changed = true;
2381 3 : break;
2382 12 : }
2383 12 :
2384 12 : let optimization = shard.optimize_secondary(scheduler, &schedule_context);
2385 12 : tracing::info!(
2386 0 : "optimize_secondary({})={:?}",
2387 : shard.tenant_shard_id,
2388 : optimization
2389 : );
2390 12 : if let Some(optimization) = optimization {
2391 : // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
2392 0 : assert!(shard.maybe_optimizable(scheduler, &schedule_context));
2393 :
2394 0 : optimizations.push(optimization.clone());
2395 0 : shard.apply_optimization(scheduler, optimization);
2396 0 : any_changed = true;
2397 0 : break;
2398 12 : }
2399 : }
2400 :
2401 6 : if !any_changed {
2402 3 : break;
2403 3 : }
2404 3 :
2405 3 : // Assert no infinite loop
2406 3 : loop_n += 1;
2407 3 : assert!(loop_n < 1000);
2408 : }
2409 :
2410 3 : optimizations
2411 3 : }
2412 :
2413 : /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
2414 : /// that it converges.
2415 : #[test]
2416 1 : fn optimize_add_nodes() -> anyhow::Result<()> {
2417 1 : let nodes = make_test_nodes(
2418 1 : 9,
2419 1 : &[
2420 1 : // Initial 6 nodes
2421 1 : AvailabilityZone("az-a".to_string()),
2422 1 : AvailabilityZone("az-a".to_string()),
2423 1 : AvailabilityZone("az-b".to_string()),
2424 1 : AvailabilityZone("az-b".to_string()),
2425 1 : AvailabilityZone("az-c".to_string()),
2426 1 : AvailabilityZone("az-c".to_string()),
2427 1 : // Three we will add later
2428 1 : AvailabilityZone("az-a".to_string()),
2429 1 : AvailabilityZone("az-b".to_string()),
2430 1 : AvailabilityZone("az-c".to_string()),
2431 1 : ],
2432 1 : );
2433 1 :
2434 1 : // Only show the scheduler two nodes in each AZ to start with
2435 1 : let mut scheduler = Scheduler::new([].iter());
2436 7 : for i in 1..=6 {
2437 6 : scheduler.node_upsert(nodes.get(&NodeId(i)).unwrap());
2438 6 : }
2439 :
2440 1 : let mut shards = make_test_tenant(
2441 1 : PlacementPolicy::Attached(1),
2442 1 : ShardCount::new(4),
2443 1 : Some(AvailabilityZone("az-a".to_string())),
2444 1 : );
2445 1 : let mut schedule_context = ScheduleContext::default();
2446 5 : for shard in &mut shards {
2447 4 : assert!(shard
2448 4 : .schedule(&mut scheduler, &mut schedule_context)
2449 4 : .is_ok());
2450 : }
2451 :
2452 : // Initial: attached locations land in the tenant's home AZ.
2453 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
2454 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
2455 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
2456 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
2457 :
2458 : // Initial: secondary locations in a remote AZ
2459 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
2460 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
2461 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
2462 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
2463 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
2464 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
2465 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
2466 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
2467 :
2468 : // Add another three nodes: we should see the shards spread out when their optimize
2469 : // methods are called
2470 1 : scheduler.node_upsert(nodes.get(&NodeId(7)).unwrap());
2471 1 : scheduler.node_upsert(nodes.get(&NodeId(8)).unwrap());
2472 1 : scheduler.node_upsert(nodes.get(&NodeId(9)).unwrap());
2473 1 : optimize_til_idle(&mut scheduler, &mut shards);
2474 1 :
2475 1 : // We expect one attached location was moved to the new node in the tenant's home AZ
2476 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(7)), 1);
2477 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(7)), 1);
2478 : // The original node has one less attached shard
2479 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
2480 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
2481 :
2482 : // One of the original nodes still has two attachments, since there are an odd number of nodes
2483 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
2484 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
2485 :
2486 : // None of our secondaries moved, since we already had enough nodes for those to be
2487 : // scheduled perfectly
2488 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
2489 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
2490 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
2491 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
2492 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
2493 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
2494 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
2495 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
2496 :
2497 4 : for shard in shards.iter_mut() {
2498 4 : shard.intent.clear(&mut scheduler);
2499 4 : }
2500 :
2501 1 : Ok(())
2502 1 : }
2503 :
2504 : /// Test that initial shard scheduling is optimal. By optimal we mean
2505 : /// that the optimizer cannot find a way to improve it.
2506 : ///
2507 : /// This test is an example of the scheduling issue described in
2508 : /// https://github.com/neondatabase/neon/issues/8969
2509 : #[test]
2510 1 : fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
2511 : use itertools::Itertools;
2512 :
2513 1 : let nodes = make_test_nodes(2, &[]);
2514 1 :
2515 1 : let mut scheduler = Scheduler::new([].iter());
2516 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
2517 1 : scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
2518 1 :
2519 1 : let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
2520 1 : let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
2521 1 :
2522 1 : let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
2523 1 : let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
2524 1 :
2525 4 : let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
2526 4 : let b_shards_with_context = b.iter_mut().map(|shard| (shard, b_context.clone()));
2527 1 :
2528 1 : let schedule_order = a_shards_with_context.interleave(b_shards_with_context);
2529 :
2530 9 : for (shard, context) in schedule_order {
2531 8 : let context = &mut *context.borrow_mut();
2532 8 : shard.schedule(&mut scheduler, context).unwrap();
2533 8 : }
2534 :
2535 1 : let applied_to_a = optimize_til_idle(&mut scheduler, &mut a);
2536 1 : assert_eq!(applied_to_a, vec![]);
2537 :
2538 1 : let applied_to_b = optimize_til_idle(&mut scheduler, &mut b);
2539 1 : assert_eq!(applied_to_b, vec![]);
2540 :
2541 8 : for shard in a.iter_mut().chain(b.iter_mut()) {
2542 8 : shard.intent.clear(&mut scheduler);
2543 8 : }
2544 :
2545 1 : Ok(())
2546 1 : }
2547 :
2548 : #[test]
2549 1 : fn random_az_shard_scheduling() -> anyhow::Result<()> {
2550 : use rand::seq::SliceRandom;
2551 :
2552 51 : for seed in 0..50 {
2553 50 : eprintln!("Running test with seed {seed}");
2554 50 : let mut rng = StdRng::seed_from_u64(seed);
2555 50 :
2556 50 : let az_a_tag = AvailabilityZone("az-a".to_string());
2557 50 : let az_b_tag = AvailabilityZone("az-b".to_string());
2558 50 : let azs = [az_a_tag, az_b_tag];
2559 50 : let nodes = make_test_nodes(4, &azs);
2560 50 : let mut shards_per_az: HashMap<AvailabilityZone, u32> = HashMap::new();
2561 50 :
2562 50 : let mut scheduler = Scheduler::new([].iter());
2563 200 : for node in nodes.values() {
2564 200 : scheduler.node_upsert(node);
2565 200 : }
2566 :
2567 50 : let mut shards = Vec::default();
2568 50 : let mut contexts = Vec::default();
2569 50 : let mut az_picker = azs.iter().cycle().cloned();
2570 5050 : for i in 0..100 {
2571 5000 : let az = az_picker.next().unwrap();
2572 5000 : let shard_count = i % 4 + 1;
2573 5000 : *shards_per_az.entry(az.clone()).or_default() += shard_count;
2574 5000 :
2575 5000 : let tenant_shards = make_test_tenant(
2576 5000 : PlacementPolicy::Attached(1),
2577 5000 : ShardCount::new(shard_count.try_into().unwrap()),
2578 5000 : Some(az),
2579 5000 : );
2580 5000 : let context = Rc::new(RefCell::new(ScheduleContext::default()));
2581 5000 :
2582 5000 : contexts.push(context.clone());
2583 5000 : let with_ctx = tenant_shards
2584 5000 : .into_iter()
2585 12500 : .map(|shard| (shard, context.clone()));
2586 17500 : for shard_with_ctx in with_ctx {
2587 12500 : shards.push(shard_with_ctx);
2588 12500 : }
2589 : }
2590 :
2591 50 : shards.shuffle(&mut rng);
2592 :
2593 : #[derive(Default, Debug)]
2594 : struct NodeStats {
2595 : attachments: u32,
2596 : secondaries: u32,
2597 : }
2598 :
2599 50 : let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::default();
2600 50 : let mut attachments_in_wrong_az = 0;
2601 50 : let mut secondaries_in_wrong_az = 0;
2602 :
2603 12550 : for (shard, context) in &mut shards {
2604 12500 : let context = &mut *context.borrow_mut();
2605 12500 : shard.schedule(&mut scheduler, context).unwrap();
2606 12500 :
2607 12500 : let attached_node = shard.intent.get_attached().unwrap();
2608 12500 : let stats = node_stats.entry(attached_node).or_default();
2609 12500 : stats.attachments += 1;
2610 12500 :
2611 12500 : let secondary_node = *shard.intent.get_secondary().first().unwrap();
2612 12500 : let stats = node_stats.entry(secondary_node).or_default();
2613 12500 : stats.secondaries += 1;
2614 12500 :
2615 12500 : let attached_node_az = nodes
2616 12500 : .get(&attached_node)
2617 12500 : .unwrap()
2618 12500 : .get_availability_zone_id();
2619 12500 : let secondary_node_az = nodes
2620 12500 : .get(&secondary_node)
2621 12500 : .unwrap()
2622 12500 : .get_availability_zone_id();
2623 12500 : let preferred_az = shard.preferred_az().unwrap();
2624 12500 :
2625 12500 : if attached_node_az != preferred_az {
2626 0 : eprintln!(
2627 0 : "{} attachment was scheduled in AZ {} but preferred AZ {}",
2628 0 : shard.tenant_shard_id, attached_node_az, preferred_az
2629 0 : );
2630 0 : attachments_in_wrong_az += 1;
2631 12500 : }
2632 :
2633 12500 : if secondary_node_az == preferred_az {
2634 0 : eprintln!(
2635 0 : "{} secondary was scheduled in AZ {} which matches preference",
2636 0 : shard.tenant_shard_id, attached_node_az
2637 0 : );
2638 0 : secondaries_in_wrong_az += 1;
2639 12500 : }
2640 : }
2641 :
2642 50 : let mut violations = Vec::default();
2643 50 :
2644 50 : if attachments_in_wrong_az > 0 {
2645 0 : violations.push(format!(
2646 0 : "{} attachments scheduled to the incorrect AZ",
2647 0 : attachments_in_wrong_az
2648 0 : ));
2649 50 : }
2650 :
2651 50 : if secondaries_in_wrong_az > 0 {
2652 0 : violations.push(format!(
2653 0 : "{} secondaries scheduled to the incorrect AZ",
2654 0 : secondaries_in_wrong_az
2655 0 : ));
2656 50 : }
2657 :
2658 50 : eprintln!(
2659 50 : "attachments_in_wrong_az={} secondaries_in_wrong_az={}",
2660 50 : attachments_in_wrong_az, secondaries_in_wrong_az
2661 50 : );
2662 :
2663 250 : for (node_id, stats) in &node_stats {
2664 200 : let node_az = nodes.get(node_id).unwrap().get_availability_zone_id();
2665 200 : let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2;
2666 200 : let allowed_attachment_load =
2667 200 : (ideal_attachment_load - 1)..(ideal_attachment_load + 2);
2668 200 :
2669 200 : if !allowed_attachment_load.contains(&stats.attachments) {
2670 0 : violations.push(format!(
2671 0 : "Found {} attachments on node {}, but expected {}",
2672 0 : stats.attachments, node_id, ideal_attachment_load
2673 0 : ));
2674 200 : }
2675 :
2676 200 : eprintln!(
2677 200 : "{}: attachments={} secondaries={} ideal_attachment_load={}",
2678 200 : node_id, stats.attachments, stats.secondaries, ideal_attachment_load
2679 200 : );
2680 : }
2681 :
2682 50 : assert!(violations.is_empty(), "{violations:?}");
2683 :
2684 12550 : for (mut shard, _ctx) in shards {
2685 12500 : shard.intent.clear(&mut scheduler);
2686 12500 : }
2687 : }
2688 1 : Ok(())
2689 1 : }
2690 : }
|