Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : sync::Arc,
4 : time::Duration,
5 : };
6 :
7 : use crate::{
8 : metrics::{
9 : self, ReconcileCompleteLabelGroup, ReconcileLongRunningLabelGroup, ReconcileOutcome,
10 : },
11 : persistence::TenantShardPersistence,
12 : reconciler::{ReconcileUnits, ReconcilerConfig},
13 : scheduler::{
14 : AffinityScore, AttachedShardTag, MaySchedule, RefCountUpdate, ScheduleContext,
15 : SecondaryShardTag,
16 : },
17 : service::ReconcileResultRequest,
18 : };
19 : use futures::future::{self, Either};
20 : use itertools::Itertools;
21 : use pageserver_api::controller_api::{
22 : AvailabilityZone, NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
23 : };
24 : use pageserver_api::{
25 : models::{LocationConfig, LocationConfigMode, TenantConfig},
26 : shard::{ShardIdentity, TenantShardId},
27 : };
28 : use serde::{Deserialize, Serialize};
29 : use tokio::task::JoinHandle;
30 : use tokio_util::sync::CancellationToken;
31 : use tracing::{instrument, Instrument};
32 : use utils::{
33 : generation::Generation,
34 : id::NodeId,
35 : seqwait::{SeqWait, SeqWaitError},
36 : sync::gate::GateGuard,
37 : };
38 :
39 : use crate::{
40 : compute_hook::ComputeHook,
41 : node::Node,
42 : persistence::{split_state::SplitState, Persistence},
43 : reconciler::{
44 : attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
45 : },
46 : scheduler::{ScheduleError, Scheduler},
47 : service, Sequence,
48 : };
49 :
50 : /// Serialization helper
51 0 : fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
52 0 : where
53 0 : S: serde::ser::Serializer,
54 0 : T: std::fmt::Display,
55 0 : {
56 0 : serializer.collect_str(
57 0 : &v.lock()
58 0 : .unwrap()
59 0 : .as_ref()
60 0 : .map(|e| format!("{e}"))
61 0 : .unwrap_or("".to_string()),
62 0 : )
63 0 : }
64 :
65 : /// In-memory state for a particular tenant shard.
66 : ///
67 : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
68 : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
69 0 : #[derive(Serialize)]
70 : pub(crate) struct TenantShard {
71 : pub(crate) tenant_shard_id: TenantShardId,
72 :
73 : pub(crate) shard: ShardIdentity,
74 :
75 : // Runtime only: sequence used to coordinate when updating this object while
76 : // with background reconcilers may be running. A reconciler runs to a particular
77 : // sequence.
78 : pub(crate) sequence: Sequence,
79 :
80 : // Latest generation number: next time we attach, increment this
81 : // and use the incremented number when attaching.
82 : //
83 : // None represents an incompletely onboarded tenant via the [`Service::location_config`]
84 : // API, where this tenant may only run in PlacementPolicy::Secondary.
85 : pub(crate) generation: Option<Generation>,
86 :
87 : // High level description of how the tenant should be set up. Provided
88 : // externally.
89 : pub(crate) policy: PlacementPolicy,
90 :
91 : // Low level description of exactly which pageservers should fulfil
92 : // which role. Generated by `Self::schedule`.
93 : pub(crate) intent: IntentState,
94 :
95 : // Low level description of how the tenant is configured on pageservers:
96 : // if this does not match `Self::intent` then the tenant needs reconciliation
97 : // with `Self::reconcile`.
98 : pub(crate) observed: ObservedState,
99 :
100 : // Tenant configuration, passed through opaquely to the pageserver. Identical
101 : // for all shards in a tenant.
102 : pub(crate) config: TenantConfig,
103 :
104 : /// If a reconcile task is currently in flight, it may be joined here (it is
105 : /// only safe to join if either the result has been received or the reconciler's
106 : /// cancellation token has been fired)
107 : #[serde(skip)]
108 : pub(crate) reconciler: Option<ReconcilerHandle>,
109 :
110 : /// If a tenant is being split, then all shards with that TenantId will have a
111 : /// SplitState set, this acts as a guard against other operations such as background
112 : /// reconciliation, and timeline creation.
113 : pub(crate) splitting: SplitState,
114 :
115 : /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
116 : /// is set. This flag is cleared when the tenant is popped off the delay queue.
117 : pub(crate) delayed_reconcile: bool,
118 :
119 : /// Optionally wait for reconciliation to complete up to a particular
120 : /// sequence number.
121 : #[serde(skip)]
122 : pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
123 :
124 : /// Indicates sequence number for which we have encountered an error reconciling. If
125 : /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
126 : /// and callers should stop waiting for `waiter` and propagate the error.
127 : #[serde(skip)]
128 : pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
129 :
130 : /// The most recent error from a reconcile on this tenant. This is a nested Arc
131 : /// because:
132 : /// - ReconcileWaiters need to Arc-clone the overall object to read it later
133 : /// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
134 : /// many waiters for one shard, and the underlying error types are not Clone.
135 : ///
136 : /// TODO: generalize to an array of recent events
137 : /// TOOD: use a ArcSwap instead of mutex for faster reads?
138 : #[serde(serialize_with = "read_last_error")]
139 : pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
140 :
141 : /// If we have a pending compute notification that for some reason we weren't able to send,
142 : /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
143 : /// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
144 : /// of state that we publish externally in an eventually consistent way.
145 : pub(crate) pending_compute_notification: bool,
146 :
147 : // Support/debug tool: if something is going wrong or flapping with scheduling, this may
148 : // be set to a non-active state to avoid making changes while the issue is fixed.
149 : scheduling_policy: ShardSchedulingPolicy,
150 :
151 : // We should attempt to schedule this shard in the provided AZ to
152 : // decrease chances of cross-AZ compute.
153 : preferred_az_id: Option<AvailabilityZone>,
154 : }
155 :
156 : #[derive(Default, Clone, Debug, Serialize)]
157 : pub(crate) struct IntentState {
158 : attached: Option<NodeId>,
159 : secondary: Vec<NodeId>,
160 : }
161 :
162 : impl IntentState {
163 19 : pub(crate) fn new() -> Self {
164 19 : Self {
165 19 : attached: None,
166 19 : secondary: vec![],
167 19 : }
168 19 : }
169 0 : pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
170 0 : if let Some(node_id) = node_id {
171 0 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach);
172 0 : }
173 0 : Self {
174 0 : attached: node_id,
175 0 : secondary: vec![],
176 0 : }
177 0 : }
178 :
179 12538 : pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
180 12538 : if self.attached != new_attached {
181 12538 : if let Some(old_attached) = self.attached.take() {
182 0 : scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
183 12538 : }
184 12538 : if let Some(new_attached) = &new_attached {
185 12538 : scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach);
186 12538 : }
187 12538 : self.attached = new_attached;
188 0 : }
189 12538 : }
190 :
191 : /// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
192 : /// secondary to attached while maintaining the scheduler's reference counts.
193 5 : pub(crate) fn promote_attached(
194 5 : &mut self,
195 5 : scheduler: &mut Scheduler,
196 5 : promote_secondary: NodeId,
197 5 : ) {
198 5 : // If we call this with a node that isn't in secondary, it would cause incorrect
199 5 : // scheduler reference counting, since we assume the node is already referenced as a secondary.
200 5 : debug_assert!(self.secondary.contains(&promote_secondary));
201 :
202 10 : self.secondary.retain(|n| n != &promote_secondary);
203 5 :
204 5 : let demoted = self.attached;
205 5 : self.attached = Some(promote_secondary);
206 5 :
207 5 : scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary);
208 5 : if let Some(demoted) = demoted {
209 0 : scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached);
210 5 : }
211 5 : }
212 :
213 12525 : pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
214 12525 : debug_assert!(!self.secondary.contains(&new_secondary));
215 12525 : scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary);
216 12525 : self.secondary.push(new_secondary);
217 12525 : }
218 :
219 : /// It is legal to call this with a node that is not currently a secondary: that is a no-op
220 5 : pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
221 5 : let index = self.secondary.iter().position(|n| *n == node_id);
222 5 : if let Some(index) = index {
223 5 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
224 5 : self.secondary.remove(index);
225 5 : }
226 5 : }
227 :
228 12537 : pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
229 12537 : for secondary in self.secondary.drain(..) {
230 12520 : scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary);
231 12520 : }
232 12537 : }
233 :
234 : /// Remove the last secondary node from the list of secondaries
235 0 : pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
236 0 : if let Some(node_id) = self.secondary.pop() {
237 0 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
238 0 : }
239 0 : }
240 :
241 12537 : pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
242 12537 : if let Some(old_attached) = self.attached.take() {
243 12537 : scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
244 12537 : }
245 :
246 12537 : self.clear_secondary(scheduler);
247 12537 : }
248 :
249 12602 : pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
250 12602 : let mut result = Vec::new();
251 12602 : if let Some(p) = self.attached {
252 12600 : result.push(p)
253 2 : }
254 :
255 12602 : result.extend(self.secondary.iter().copied());
256 12602 :
257 12602 : result
258 12602 : }
259 :
260 25083 : pub(crate) fn get_attached(&self) -> &Option<NodeId> {
261 25083 : &self.attached
262 25083 : }
263 :
264 12524 : pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
265 12524 : &self.secondary
266 12524 : }
267 :
268 : /// If the node is in use as the attached location, demote it into
269 : /// the list of secondary locations. This is used when a node goes offline,
270 : /// and we want to use a different node for attachment, but not permanently
271 : /// forget the location on the offline node.
272 : ///
273 : /// Returns true if a change was made
274 5 : pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
275 5 : if self.attached == Some(node_id) {
276 5 : self.attached = None;
277 5 : self.secondary.push(node_id);
278 5 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached);
279 5 : true
280 : } else {
281 0 : false
282 : }
283 5 : }
284 : }
285 :
286 : impl Drop for IntentState {
287 12538 : fn drop(&mut self) {
288 12538 : // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
289 12538 : // We do not check this while panicking, to avoid polluting unit test failures or
290 12538 : // other assertions with this assertion's output. It's still wrong to leak these,
291 12538 : // but if we already have a panic then we don't need to independently flag this case.
292 12538 : if !(std::thread::panicking()) {
293 12538 : debug_assert!(self.attached.is_none() && self.secondary.is_empty());
294 0 : }
295 12537 : }
296 : }
297 :
298 0 : #[derive(Default, Clone, Serialize, Deserialize, Debug)]
299 : pub(crate) struct ObservedState {
300 : pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
301 : }
302 :
303 : /// Our latest knowledge of how this tenant is configured in the outside world.
304 : ///
305 : /// Meaning:
306 : /// * No instance of this type exists for a node: we are certain that we have nothing configured on that
307 : /// node for this shard.
308 : /// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
309 : /// what it is (e.g. we failed partway through configuring it)
310 : /// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
311 : /// and that configuration will still be present unless something external interfered.
312 0 : #[derive(Clone, Serialize, Deserialize, Debug)]
313 : pub(crate) struct ObservedStateLocation {
314 : /// If None, it means we do not know the status of this shard's location on this node, but
315 : /// we know that we might have some state on this node.
316 : pub(crate) conf: Option<LocationConfig>,
317 : }
318 : pub(crate) struct ReconcilerWaiter {
319 : // For observability purposes, remember the ID of the shard we're
320 : // waiting for.
321 : pub(crate) tenant_shard_id: TenantShardId,
322 :
323 : seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
324 : error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
325 : error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
326 : seq: Sequence,
327 : }
328 :
329 : pub(crate) enum ReconcilerStatus {
330 : Done,
331 : Failed,
332 : InProgress,
333 : }
334 :
335 0 : #[derive(thiserror::Error, Debug)]
336 : pub(crate) enum ReconcileWaitError {
337 : #[error("Timeout waiting for shard {0}")]
338 : Timeout(TenantShardId),
339 : #[error("shutting down")]
340 : Shutdown,
341 : #[error("Reconcile error on shard {0}: {1}")]
342 : Failed(TenantShardId, Arc<ReconcileError>),
343 : }
344 :
345 : #[derive(Eq, PartialEq, Debug, Clone)]
346 : pub(crate) struct ReplaceSecondary {
347 : old_node_id: NodeId,
348 : new_node_id: NodeId,
349 : }
350 :
351 : #[derive(Eq, PartialEq, Debug, Clone)]
352 : pub(crate) struct MigrateAttachment {
353 : pub(crate) old_attached_node_id: NodeId,
354 : pub(crate) new_attached_node_id: NodeId,
355 : }
356 :
357 : #[derive(Eq, PartialEq, Debug, Clone)]
358 : pub(crate) enum ScheduleOptimizationAction {
359 : // Replace one of our secondary locations with a different node
360 : ReplaceSecondary(ReplaceSecondary),
361 : // Migrate attachment to an existing secondary location
362 : MigrateAttachment(MigrateAttachment),
363 : }
364 :
365 : #[derive(Eq, PartialEq, Debug, Clone)]
366 : pub(crate) struct ScheduleOptimization {
367 : // What was the reconcile sequence when we generated this optimization? The optimization
368 : // should only be applied if the shard's sequence is still at this value, in case other changes
369 : // happened between planning the optimization and applying it.
370 : sequence: Sequence,
371 :
372 : pub(crate) action: ScheduleOptimizationAction,
373 : }
374 :
375 : impl ReconcilerWaiter {
376 0 : pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
377 0 : tokio::select! {
378 0 : result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
379 0 : result.map_err(|e| match e {
380 0 : SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
381 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
382 0 : })?;
383 : },
384 0 : result = self.error_seq_wait.wait_for(self.seq) => {
385 0 : result.map_err(|e| match e {
386 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
387 0 : SeqWaitError::Timeout => unreachable!()
388 0 : })?;
389 :
390 0 : return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
391 0 : self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
392 : }
393 : }
394 :
395 0 : Ok(())
396 0 : }
397 :
398 0 : pub(crate) fn get_status(&self) -> ReconcilerStatus {
399 0 : if self.seq_wait.would_wait_for(self.seq).is_ok() {
400 0 : ReconcilerStatus::Done
401 0 : } else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
402 0 : ReconcilerStatus::Failed
403 : } else {
404 0 : ReconcilerStatus::InProgress
405 : }
406 0 : }
407 : }
408 :
409 : /// Having spawned a reconciler task, the tenant shard's state will carry enough
410 : /// information to optionally cancel & await it later.
411 : pub(crate) struct ReconcilerHandle {
412 : sequence: Sequence,
413 : handle: JoinHandle<()>,
414 : cancel: CancellationToken,
415 : }
416 :
417 : pub(crate) enum ReconcileNeeded {
418 : /// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
419 : /// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
420 : No,
421 : /// shard has a reconciler running, and its intent hasn't changed since that one was
422 : /// spawned: wait for the existing reconciler rather than spawning a new one.
423 : WaitExisting(ReconcilerWaiter),
424 : /// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
425 : Yes,
426 : }
427 :
428 : /// Pending modification to the observed state of a tenant shard.
429 : /// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
430 : pub(crate) enum ObservedStateDelta {
431 : Upsert(Box<(NodeId, ObservedStateLocation)>),
432 : Delete(NodeId),
433 : }
434 :
435 : impl ObservedStateDelta {
436 0 : pub(crate) fn node_id(&self) -> &NodeId {
437 0 : match self {
438 0 : Self::Upsert(up) => &up.0,
439 0 : Self::Delete(nid) => nid,
440 : }
441 0 : }
442 : }
443 :
444 : /// When a reconcile task completes, it sends this result object
445 : /// to be applied to the primary TenantShard.
446 : pub(crate) struct ReconcileResult {
447 : pub(crate) sequence: Sequence,
448 : /// On errors, `observed` should be treated as an incompleted description
449 : /// of state (i.e. any nodes present in the result should override nodes
450 : /// present in the parent tenant state, but any unmentioned nodes should
451 : /// not be removed from parent tenant state)
452 : pub(crate) result: Result<(), ReconcileError>,
453 :
454 : pub(crate) tenant_shard_id: TenantShardId,
455 : pub(crate) generation: Option<Generation>,
456 : pub(crate) observed_deltas: Vec<ObservedStateDelta>,
457 :
458 : /// Set [`TenantShard::pending_compute_notification`] from this flag
459 : pub(crate) pending_compute_notification: bool,
460 : }
461 :
462 : impl ObservedState {
463 0 : pub(crate) fn new() -> Self {
464 0 : Self {
465 0 : locations: HashMap::new(),
466 0 : }
467 0 : }
468 : }
469 :
470 : impl TenantShard {
471 12519 : pub(crate) fn new(
472 12519 : tenant_shard_id: TenantShardId,
473 12519 : shard: ShardIdentity,
474 12519 : policy: PlacementPolicy,
475 12519 : ) -> Self {
476 12519 : metrics::METRICS_REGISTRY
477 12519 : .metrics_group
478 12519 : .storage_controller_tenant_shards
479 12519 : .inc();
480 12519 :
481 12519 : Self {
482 12519 : tenant_shard_id,
483 12519 : policy,
484 12519 : intent: IntentState::default(),
485 12519 : generation: Some(Generation::new(0)),
486 12519 : shard,
487 12519 : observed: ObservedState::default(),
488 12519 : config: TenantConfig::default(),
489 12519 : reconciler: None,
490 12519 : splitting: SplitState::Idle,
491 12519 : sequence: Sequence(1),
492 12519 : delayed_reconcile: false,
493 12519 : waiter: Arc::new(SeqWait::new(Sequence(0))),
494 12519 : error_waiter: Arc::new(SeqWait::new(Sequence(0))),
495 12519 : last_error: Arc::default(),
496 12519 : pending_compute_notification: false,
497 12519 : scheduling_policy: ShardSchedulingPolicy::default(),
498 12519 : preferred_az_id: None,
499 12519 : }
500 12519 : }
501 :
502 : /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
503 : /// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
504 : /// to get an intent state that complies with placement policy. The overall goal is to do scheduling
505 : /// in a way that makes use of any configured locations that already exist in the outside world.
506 1 : pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
507 1 : // Choose an attached location by filtering observed locations, and then sorting to get the highest
508 1 : // generation
509 1 : let mut attached_locs = self
510 1 : .observed
511 1 : .locations
512 1 : .iter()
513 2 : .filter_map(|(node_id, l)| {
514 2 : if let Some(conf) = &l.conf {
515 2 : if conf.mode == LocationConfigMode::AttachedMulti
516 1 : || conf.mode == LocationConfigMode::AttachedSingle
517 1 : || conf.mode == LocationConfigMode::AttachedStale
518 : {
519 2 : Some((node_id, conf.generation))
520 : } else {
521 0 : None
522 : }
523 : } else {
524 0 : None
525 : }
526 2 : })
527 1 : .collect::<Vec<_>>();
528 1 :
529 2 : attached_locs.sort_by_key(|i| i.1);
530 1 : if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
531 1 : self.intent.set_attached(scheduler, Some(*node_id));
532 1 : }
533 :
534 : // All remaining observed locations generate secondary intents. This includes None
535 : // observations, as these may well have some local content on disk that is usable (this
536 : // is an edge case that might occur if we restarted during a migration or other change)
537 : //
538 : // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
539 : // will take care of promoting one of these secondaries to be attached.
540 2 : self.observed.locations.keys().for_each(|node_id| {
541 2 : if Some(*node_id) != self.intent.attached {
542 1 : self.intent.push_secondary(scheduler, *node_id);
543 1 : }
544 2 : });
545 1 : }
546 :
547 : /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
548 : /// attached pageserver for a shard.
549 : ///
550 : /// Returns whether we modified it, and the NodeId selected.
551 12515 : fn schedule_attached(
552 12515 : &mut self,
553 12515 : scheduler: &mut Scheduler,
554 12515 : context: &ScheduleContext,
555 12515 : ) -> Result<(bool, NodeId), ScheduleError> {
556 : // No work to do if we already have an attached tenant
557 12515 : if let Some(node_id) = self.intent.attached {
558 0 : return Ok((false, node_id));
559 12515 : }
560 :
561 12515 : if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
562 : // Promote a secondary
563 1 : tracing::debug!("Promoted secondary {} to attached", promote_secondary);
564 1 : self.intent.promote_attached(scheduler, promote_secondary);
565 1 : Ok((true, promote_secondary))
566 : } else {
567 : // Pick a fresh node: either we had no secondaries or none were schedulable
568 12514 : let node_id = scheduler.schedule_shard::<AttachedShardTag>(
569 12514 : &self.intent.secondary,
570 12514 : &self.preferred_az_id,
571 12514 : context,
572 12514 : )?;
573 12514 : tracing::debug!("Selected {} as attached", node_id);
574 12514 : self.intent.set_attached(scheduler, Some(node_id));
575 12514 : Ok((true, node_id))
576 : }
577 12515 : }
578 :
579 12516 : #[instrument(skip_all, fields(
580 : tenant_id=%self.tenant_shard_id.tenant_id,
581 : shard_id=%self.tenant_shard_id.shard_slug(),
582 : sequence=%self.sequence
583 12516 : ))]
584 : pub(crate) fn schedule(
585 : &mut self,
586 : scheduler: &mut Scheduler,
587 : context: &mut ScheduleContext,
588 : ) -> Result<(), ScheduleError> {
589 : let r = self.do_schedule(scheduler, context);
590 :
591 : context.avoid(&self.intent.all_pageservers());
592 : if let Some(attached) = self.intent.get_attached() {
593 : context.push_attached(*attached);
594 : }
595 :
596 : r
597 : }
598 :
599 12516 : pub(crate) fn do_schedule(
600 12516 : &mut self,
601 12516 : scheduler: &mut Scheduler,
602 12516 : context: &ScheduleContext,
603 12516 : ) -> Result<(), ScheduleError> {
604 12516 : // TODO: before scheduling new nodes, check if any existing content in
605 12516 : // self.intent refers to pageservers that are offline, and pick other
606 12516 : // pageservers if so.
607 12516 :
608 12516 : // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
609 12516 : // change their attach location.
610 12516 :
611 12516 : match self.scheduling_policy {
612 12515 : ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
613 : ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
614 : // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
615 1 : tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
616 0 : "Scheduling is disabled by policy {:?}", self.scheduling_policy);
617 1 : return Ok(());
618 : }
619 : }
620 :
621 : // Build the set of pageservers already in use by this tenant, to avoid scheduling
622 : // more work on the same pageservers we're already using.
623 12515 : let mut modified = false;
624 :
625 : // Add/remove nodes to fulfil policy
626 : use PlacementPolicy::*;
627 12515 : match self.policy {
628 12515 : Attached(secondary_count) => {
629 12515 : let retain_secondaries = if self.intent.attached.is_none()
630 12515 : && scheduler.node_preferred(&self.intent.secondary).is_some()
631 : {
632 : // If we have no attached, and one of the secondaries is elegible to be promoted, retain
633 : // one more secondary than we usually would, as one of them will become attached futher down this function.
634 1 : secondary_count + 1
635 : } else {
636 12514 : secondary_count
637 : };
638 :
639 12515 : while self.intent.secondary.len() > retain_secondaries {
640 0 : // We have no particular preference for one secondary location over another: just
641 0 : // arbitrarily drop from the end
642 0 : self.intent.pop_secondary(scheduler);
643 0 : modified = true;
644 0 : }
645 :
646 : // Should have exactly one attached, and N secondaries
647 12515 : let (modified_attached, attached_node_id) =
648 12515 : self.schedule_attached(scheduler, context)?;
649 12515 : modified |= modified_attached;
650 12515 :
651 12515 : let mut used_pageservers = vec![attached_node_id];
652 25029 : while self.intent.secondary.len() < secondary_count {
653 12514 : let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
654 12514 : &used_pageservers,
655 12514 : &self.preferred_az_id,
656 12514 : context,
657 12514 : )?;
658 12514 : self.intent.push_secondary(scheduler, node_id);
659 12514 : used_pageservers.push(node_id);
660 12514 : modified = true;
661 : }
662 : }
663 : Secondary => {
664 0 : if let Some(node_id) = self.intent.get_attached() {
665 0 : // Populate secondary by demoting the attached node
666 0 : self.intent.demote_attached(scheduler, *node_id);
667 0 : modified = true;
668 0 : } else if self.intent.secondary.is_empty() {
669 : // Populate secondary by scheduling a fresh node
670 0 : let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
671 0 : &[],
672 0 : &self.preferred_az_id,
673 0 : context,
674 0 : )?;
675 0 : self.intent.push_secondary(scheduler, node_id);
676 0 : modified = true;
677 0 : }
678 0 : while self.intent.secondary.len() > 1 {
679 0 : // We have no particular preference for one secondary location over another: just
680 0 : // arbitrarily drop from the end
681 0 : self.intent.pop_secondary(scheduler);
682 0 : modified = true;
683 0 : }
684 : }
685 : Detached => {
686 : // Never add locations in this mode
687 0 : if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
688 0 : self.intent.clear(scheduler);
689 0 : modified = true;
690 0 : }
691 : }
692 : }
693 :
694 12515 : if modified {
695 12515 : self.sequence.0 += 1;
696 12515 : }
697 :
698 12515 : Ok(())
699 12516 : }
700 :
701 : /// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
702 : /// if the swap is not possible and leaves the intent state in its original state.
703 : ///
704 : /// Arguments:
705 : /// `attached_to`: the currently attached location matching the intent state (may be None if the
706 : /// shard is not attached)
707 : /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
708 : /// the scheduler to recommend a node
709 0 : pub(crate) fn reschedule_to_secondary(
710 0 : &mut self,
711 0 : promote_to: Option<NodeId>,
712 0 : scheduler: &mut Scheduler,
713 0 : ) -> Result<(), ScheduleError> {
714 0 : let promote_to = match promote_to {
715 0 : Some(node) => node,
716 0 : None => match scheduler.node_preferred(self.intent.get_secondary()) {
717 0 : Some(node) => node,
718 : None => {
719 0 : return Err(ScheduleError::ImpossibleConstraint);
720 : }
721 : },
722 : };
723 :
724 0 : assert!(self.intent.get_secondary().contains(&promote_to));
725 :
726 0 : if let Some(node) = self.intent.get_attached() {
727 0 : let demoted = self.intent.demote_attached(scheduler, *node);
728 0 : if !demoted {
729 0 : return Err(ScheduleError::ImpossibleConstraint);
730 0 : }
731 0 : }
732 :
733 0 : self.intent.promote_attached(scheduler, promote_to);
734 0 :
735 0 : // Increment the sequence number for the edge case where a
736 0 : // reconciler is already running to avoid waiting on the
737 0 : // current reconcile instead of spawning a new one.
738 0 : self.sequence = self.sequence.next();
739 0 :
740 0 : Ok(())
741 0 : }
742 :
743 : /// Optimize attachments: if a shard has a secondary location that is preferable to
744 : /// its primary location based on soft constraints, switch that secondary location
745 : /// to be attached.
746 23 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
747 : pub(crate) fn optimize_attachment(
748 : &self,
749 : nodes: &HashMap<NodeId, Node>,
750 : schedule_context: &ScheduleContext,
751 : ) -> Option<ScheduleOptimization> {
752 : let attached = (*self.intent.get_attached())?;
753 : if self.intent.secondary.is_empty() {
754 : // We can only do useful work if we have both attached and secondary locations: this
755 : // function doesn't schedule new locations, only swaps between attached and secondaries.
756 : return None;
757 : }
758 :
759 : let current_affinity_score = schedule_context.get_node_affinity(attached);
760 : let current_attachment_count = schedule_context.get_node_attachments(attached);
761 :
762 : // Generate score for each node, dropping any un-schedulable nodes.
763 : let all_pageservers = self.intent.all_pageservers();
764 : let mut scores = all_pageservers
765 : .iter()
766 46 : .flat_map(|node_id| {
767 46 : let node = nodes.get(node_id);
768 46 : if node.is_none() {
769 0 : None
770 46 : } else if matches!(
771 46 : node.unwrap().get_scheduling(),
772 : NodeSchedulingPolicy::Filling
773 : ) {
774 : // If the node is currently filling, don't count it as a candidate to avoid,
775 : // racing with the background fill.
776 0 : None
777 46 : } else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
778 0 : None
779 : } else {
780 46 : let affinity_score = schedule_context.get_node_affinity(*node_id);
781 46 : let attachment_count = schedule_context.get_node_attachments(*node_id);
782 46 : Some((*node_id, affinity_score, attachment_count))
783 : }
784 46 : })
785 : .collect::<Vec<_>>();
786 :
787 : // Sort precedence:
788 : // 1st - prefer nodes with the lowest total affinity score
789 : // 2nd - prefer nodes with the lowest number of attachments in this context
790 : // 3rd - if all else is equal, sort by node ID for determinism in tests.
791 46 : scores.sort_by_key(|i| (i.1, i.2, i.0));
792 :
793 : if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
794 : scores.first()
795 : {
796 : if attached != *preferred_node {
797 : // The best alternative must be more than 1 better than us, otherwise we could end
798 : // up flapping back next time we're called (e.g. there's no point migrating from
799 : // a location with score 1 to a score zero, because on next location the situation
800 : // would be the same, but in reverse).
801 : if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
802 : || current_attachment_count > *preferred_attachment_count + 1
803 : {
804 : tracing::info!(
805 : "Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
806 : self.intent.get_secondary()
807 : );
808 : return Some(ScheduleOptimization {
809 : sequence: self.sequence,
810 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
811 : old_attached_node_id: attached,
812 : new_attached_node_id: *preferred_node,
813 : }),
814 : });
815 : }
816 : } else {
817 : tracing::debug!(
818 : "Node {} is already preferred (score {:?})",
819 : preferred_node,
820 : preferred_affinity_score
821 : );
822 : }
823 : }
824 :
825 : // Fall-through: we didn't find an optimization
826 : None
827 : }
828 :
829 20 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
830 : pub(crate) fn optimize_secondary(
831 : &self,
832 : scheduler: &mut Scheduler,
833 : schedule_context: &ScheduleContext,
834 : ) -> Option<ScheduleOptimization> {
835 : if self.intent.secondary.is_empty() {
836 : // We can only do useful work if we have both attached and secondary locations: this
837 : // function doesn't schedule new locations, only swaps between attached and secondaries.
838 : return None;
839 : }
840 :
841 : for secondary in self.intent.get_secondary() {
842 : let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
843 : // We're already on a node unaffected any affinity constraints,
844 : // so we won't change it.
845 : continue;
846 : };
847 :
848 : // Let the scheduler suggest a node, where it would put us if we were scheduling afresh
849 : // This implicitly limits the choice to nodes that are available, and prefers nodes
850 : // with lower utilization.
851 : let Ok(candidate_node) = scheduler.schedule_shard::<SecondaryShardTag>(
852 : &self.intent.all_pageservers(),
853 : &self.preferred_az_id,
854 : schedule_context,
855 : ) else {
856 : // A scheduling error means we have no possible candidate replacements
857 : continue;
858 : };
859 :
860 : let candidate_affinity_score = schedule_context
861 : .nodes
862 : .get(&candidate_node)
863 : .unwrap_or(&AffinityScore::FREE);
864 :
865 : // The best alternative must be more than 1 better than us, otherwise we could end
866 : // up flapping back next time we're called.
867 : if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
868 : // If some other node is available and has a lower score than this node, then
869 : // that other node is a good place to migrate to.
870 : tracing::info!(
871 : "Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
872 : self.intent.get_secondary()
873 : );
874 : return Some(ScheduleOptimization {
875 : sequence: self.sequence,
876 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
877 : old_node_id: *secondary,
878 : new_node_id: candidate_node,
879 : }),
880 : });
881 : }
882 : }
883 :
884 : None
885 : }
886 :
887 : /// Return true if the optimization was really applied: it will not be applied if the optimization's
888 : /// sequence is behind this tenant shard's
889 9 : pub(crate) fn apply_optimization(
890 9 : &mut self,
891 9 : scheduler: &mut Scheduler,
892 9 : optimization: ScheduleOptimization,
893 9 : ) -> bool {
894 9 : if optimization.sequence != self.sequence {
895 0 : return false;
896 9 : }
897 9 :
898 9 : metrics::METRICS_REGISTRY
899 9 : .metrics_group
900 9 : .storage_controller_schedule_optimization
901 9 : .inc();
902 9 :
903 9 : match optimization.action {
904 : ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
905 4 : old_attached_node_id,
906 4 : new_attached_node_id,
907 4 : }) => {
908 4 : self.intent.demote_attached(scheduler, old_attached_node_id);
909 4 : self.intent
910 4 : .promote_attached(scheduler, new_attached_node_id);
911 4 : }
912 : ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
913 5 : old_node_id,
914 5 : new_node_id,
915 5 : }) => {
916 5 : self.intent.remove_secondary(scheduler, old_node_id);
917 5 : self.intent.push_secondary(scheduler, new_node_id);
918 5 : }
919 : }
920 :
921 9 : true
922 9 : }
923 :
924 : /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
925 : /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
926 : /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
927 : ///
928 : /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
929 : /// funciton should not be used to decide whether to reconcile.
930 0 : pub(crate) fn stably_attached(&self) -> Option<NodeId> {
931 0 : if let Some(attach_intent) = self.intent.attached {
932 0 : match self.observed.locations.get(&attach_intent) {
933 0 : Some(loc) => match &loc.conf {
934 0 : Some(conf) => match conf.mode {
935 : LocationConfigMode::AttachedMulti
936 : | LocationConfigMode::AttachedSingle
937 : | LocationConfigMode::AttachedStale => {
938 : // Our intent and observed state agree that this node is in an attached state.
939 0 : Some(attach_intent)
940 : }
941 : // Our observed config is not an attached state
942 0 : _ => None,
943 : },
944 : // Our observed state is None, i.e. in flux
945 0 : None => None,
946 : },
947 : // We have no observed state for this node
948 0 : None => None,
949 : }
950 : } else {
951 : // Our intent is not to attach
952 0 : None
953 : }
954 0 : }
955 :
956 0 : fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
957 0 : let mut dirty_nodes = HashSet::new();
958 :
959 0 : if let Some(node_id) = self.intent.attached {
960 : // Maybe panic: it is a severe bug if we try to attach while generation is null.
961 0 : let generation = self
962 0 : .generation
963 0 : .expect("Attempted to enter attached state without a generation");
964 0 :
965 0 : let wanted_conf =
966 0 : attached_location_conf(generation, &self.shard, &self.config, &self.policy);
967 0 : match self.observed.locations.get(&node_id) {
968 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
969 0 : Some(_) | None => {
970 0 : dirty_nodes.insert(node_id);
971 0 : }
972 : }
973 0 : }
974 :
975 0 : for node_id in &self.intent.secondary {
976 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
977 0 : match self.observed.locations.get(node_id) {
978 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
979 0 : Some(_) | None => {
980 0 : dirty_nodes.insert(*node_id);
981 0 : }
982 : }
983 : }
984 :
985 0 : for node_id in self.observed.locations.keys() {
986 0 : if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
987 0 : // We have observed state that isn't part of our intent: need to clean it up.
988 0 : dirty_nodes.insert(*node_id);
989 0 : }
990 : }
991 :
992 0 : dirty_nodes.retain(|node_id| {
993 0 : nodes
994 0 : .get(node_id)
995 0 : .map(|n| n.is_available())
996 0 : .unwrap_or(false)
997 0 : });
998 0 :
999 0 : !dirty_nodes.is_empty()
1000 0 : }
1001 :
1002 : #[allow(clippy::too_many_arguments)]
1003 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1004 : pub(crate) fn get_reconcile_needed(
1005 : &mut self,
1006 : pageservers: &Arc<HashMap<NodeId, Node>>,
1007 : ) -> ReconcileNeeded {
1008 : // If there are any ambiguous observed states, and the nodes they refer to are available,
1009 : // we should reconcile to clean them up.
1010 : let mut dirty_observed = false;
1011 : for (node_id, observed_loc) in &self.observed.locations {
1012 : let node = pageservers
1013 : .get(node_id)
1014 : .expect("Nodes may not be removed while referenced");
1015 : if observed_loc.conf.is_none() && node.is_available() {
1016 : dirty_observed = true;
1017 : break;
1018 : }
1019 : }
1020 :
1021 : let active_nodes_dirty = self.dirty(pageservers);
1022 :
1023 : // Even if there is no pageserver work to be done, if we have a pending notification to computes,
1024 : // wake up a reconciler to send it.
1025 : let do_reconcile =
1026 : active_nodes_dirty || dirty_observed || self.pending_compute_notification;
1027 :
1028 : if !do_reconcile {
1029 : tracing::debug!("Not dirty, no reconciliation needed.");
1030 : return ReconcileNeeded::No;
1031 : }
1032 :
1033 : // If we are currently splitting, then never start a reconciler task: the splitting logic
1034 : // requires that shards are not interfered with while it runs. Do this check here rather than
1035 : // up top, so that we only log this message if we would otherwise have done a reconciliation.
1036 : if !matches!(self.splitting, SplitState::Idle) {
1037 : tracing::info!("Refusing to reconcile, splitting in progress");
1038 : return ReconcileNeeded::No;
1039 : }
1040 :
1041 : // Reconcile already in flight for the current sequence?
1042 : if let Some(handle) = &self.reconciler {
1043 : if handle.sequence == self.sequence {
1044 : tracing::info!(
1045 : "Reconciliation already in progress for sequence {:?}",
1046 : self.sequence,
1047 : );
1048 : return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
1049 : tenant_shard_id: self.tenant_shard_id,
1050 : seq_wait: self.waiter.clone(),
1051 : error_seq_wait: self.error_waiter.clone(),
1052 : error: self.last_error.clone(),
1053 : seq: self.sequence,
1054 : });
1055 : }
1056 : }
1057 :
1058 : // Pre-checks done: finally check whether we may actually do the work
1059 : match self.scheduling_policy {
1060 : ShardSchedulingPolicy::Active
1061 : | ShardSchedulingPolicy::Essential
1062 : | ShardSchedulingPolicy::Pause => {}
1063 : ShardSchedulingPolicy::Stop => {
1064 : // We only reach this point if there is work to do and we're going to skip
1065 : // doing it: warn it obvious why this tenant isn't doing what it ought to.
1066 : tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
1067 : return ReconcileNeeded::No;
1068 : }
1069 : }
1070 :
1071 : ReconcileNeeded::Yes
1072 : }
1073 :
1074 : /// Ensure the sequence number is set to a value where waiting for this value will make us wait
1075 : /// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
1076 : ///
1077 : /// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
1078 : /// that the waiter will not complete until some future Reconciler is constructed and run.
1079 0 : fn ensure_sequence_ahead(&mut self) {
1080 0 : // Find the highest sequence for which a Reconciler has previously run or is currently
1081 0 : // running
1082 0 : let max_seen = std::cmp::max(
1083 0 : self.reconciler
1084 0 : .as_ref()
1085 0 : .map(|r| r.sequence)
1086 0 : .unwrap_or(Sequence(0)),
1087 0 : std::cmp::max(self.waiter.load(), self.error_waiter.load()),
1088 0 : );
1089 0 :
1090 0 : if self.sequence <= max_seen {
1091 0 : self.sequence = max_seen.next();
1092 0 : }
1093 0 : }
1094 :
1095 : /// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
1096 : ///
1097 : /// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
1098 : /// you would like to wait on the next reconciler that gets spawned in the background.
1099 0 : pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
1100 0 : self.ensure_sequence_ahead();
1101 0 :
1102 0 : ReconcilerWaiter {
1103 0 : tenant_shard_id: self.tenant_shard_id,
1104 0 : seq_wait: self.waiter.clone(),
1105 0 : error_seq_wait: self.error_waiter.clone(),
1106 0 : error: self.last_error.clone(),
1107 0 : seq: self.sequence,
1108 0 : }
1109 0 : }
1110 :
1111 0 : async fn reconcile(
1112 0 : sequence: Sequence,
1113 0 : mut reconciler: Reconciler,
1114 0 : must_notify: bool,
1115 0 : ) -> ReconcileResult {
1116 : // Attempt to make observed state match intent state
1117 0 : let result = reconciler.reconcile().await;
1118 :
1119 : // If we know we had a pending compute notification from some previous action, send a notification irrespective
1120 : // of whether the above reconcile() did any work
1121 0 : if result.is_ok() && must_notify {
1122 : // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
1123 0 : reconciler.compute_notify().await.ok();
1124 0 : }
1125 :
1126 : // Update result counter
1127 0 : let outcome_label = match &result {
1128 0 : Ok(_) => ReconcileOutcome::Success,
1129 0 : Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
1130 0 : Err(_) => ReconcileOutcome::Error,
1131 : };
1132 :
1133 0 : metrics::METRICS_REGISTRY
1134 0 : .metrics_group
1135 0 : .storage_controller_reconcile_complete
1136 0 : .inc(ReconcileCompleteLabelGroup {
1137 0 : status: outcome_label,
1138 0 : });
1139 0 :
1140 0 : // Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
1141 0 : // try and schedule more work in response to our result.
1142 0 : ReconcileResult {
1143 0 : sequence,
1144 0 : result,
1145 0 : tenant_shard_id: reconciler.tenant_shard_id,
1146 0 : generation: reconciler.generation,
1147 0 : observed_deltas: reconciler.observed_deltas(),
1148 0 : pending_compute_notification: reconciler.compute_notify_failure,
1149 0 : }
1150 0 : }
1151 :
1152 : #[allow(clippy::too_many_arguments)]
1153 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1154 : pub(crate) fn spawn_reconciler(
1155 : &mut self,
1156 : result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
1157 : pageservers: &Arc<HashMap<NodeId, Node>>,
1158 : compute_hook: &Arc<ComputeHook>,
1159 : reconciler_config: ReconcilerConfig,
1160 : service_config: &service::Config,
1161 : persistence: &Arc<Persistence>,
1162 : units: ReconcileUnits,
1163 : gate_guard: GateGuard,
1164 : cancel: &CancellationToken,
1165 : ) -> Option<ReconcilerWaiter> {
1166 : // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
1167 : // doing our sequence's work.
1168 : let old_handle = self.reconciler.take();
1169 :
1170 : // Build list of nodes from which the reconciler should detach
1171 : let mut detach = Vec::new();
1172 : for node_id in self.observed.locations.keys() {
1173 : if self.intent.get_attached() != &Some(*node_id)
1174 : && !self.intent.secondary.contains(node_id)
1175 : {
1176 : detach.push(
1177 : pageservers
1178 : .get(node_id)
1179 : .expect("Intent references non-existent pageserver")
1180 : .clone(),
1181 : )
1182 : }
1183 : }
1184 :
1185 : // Advance the sequence before spawning a reconciler, so that sequence waiters
1186 : // can distinguish between before+after the reconcile completes.
1187 : self.ensure_sequence_ahead();
1188 :
1189 : let reconciler_cancel = cancel.child_token();
1190 : let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
1191 : let reconciler = Reconciler {
1192 : tenant_shard_id: self.tenant_shard_id,
1193 : shard: self.shard,
1194 : placement_policy: self.policy.clone(),
1195 : generation: self.generation,
1196 : intent: reconciler_intent,
1197 : detach,
1198 : reconciler_config,
1199 : config: self.config.clone(),
1200 : observed: self.observed.clone(),
1201 : original_observed: self.observed.clone(),
1202 : compute_hook: compute_hook.clone(),
1203 : service_config: service_config.clone(),
1204 : _gate_guard: gate_guard,
1205 : _resource_units: units,
1206 : cancel: reconciler_cancel.clone(),
1207 : persistence: persistence.clone(),
1208 : compute_notify_failure: false,
1209 : };
1210 :
1211 : let reconcile_seq = self.sequence;
1212 : let long_reconcile_threshold = service_config.long_reconcile_threshold;
1213 :
1214 : tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
1215 : let must_notify = self.pending_compute_notification;
1216 : let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
1217 : tenant_id=%reconciler.tenant_shard_id.tenant_id,
1218 : shard_id=%reconciler.tenant_shard_id.shard_slug());
1219 : metrics::METRICS_REGISTRY
1220 : .metrics_group
1221 : .storage_controller_reconcile_spawn
1222 : .inc();
1223 : let result_tx = result_tx.clone();
1224 : let join_handle = tokio::task::spawn(
1225 0 : async move {
1226 : // Wait for any previous reconcile task to complete before we start
1227 0 : if let Some(old_handle) = old_handle {
1228 0 : old_handle.cancel.cancel();
1229 0 : if let Err(e) = old_handle.handle.await {
1230 : // We can't do much with this other than log it: the task is done, so
1231 : // we may proceed with our work.
1232 0 : tracing::error!("Unexpected join error waiting for reconcile task: {e}");
1233 0 : }
1234 0 : }
1235 :
1236 : // Early check for cancellation before doing any work
1237 : // TODO: wrap all remote API operations in cancellation check
1238 : // as well.
1239 0 : if reconciler.cancel.is_cancelled() {
1240 0 : metrics::METRICS_REGISTRY
1241 0 : .metrics_group
1242 0 : .storage_controller_reconcile_complete
1243 0 : .inc(ReconcileCompleteLabelGroup {
1244 0 : status: ReconcileOutcome::Cancel,
1245 0 : });
1246 0 : return;
1247 0 : }
1248 0 :
1249 0 : let (tenant_id_label, shard_number_label, sequence_label) = {
1250 0 : (
1251 0 : reconciler.tenant_shard_id.tenant_id.to_string(),
1252 0 : reconciler.tenant_shard_id.shard_number.0.to_string(),
1253 0 : reconcile_seq.to_string(),
1254 0 : )
1255 0 : };
1256 0 :
1257 0 : let label_group = ReconcileLongRunningLabelGroup {
1258 0 : tenant_id: &tenant_id_label,
1259 0 : shard_number: &shard_number_label,
1260 0 : sequence: &sequence_label,
1261 0 : };
1262 0 :
1263 0 : let reconcile_fut = Self::reconcile(reconcile_seq, reconciler, must_notify);
1264 0 : let long_reconcile_fut = {
1265 0 : let label_group = label_group.clone();
1266 0 : async move {
1267 0 : tokio::time::sleep(long_reconcile_threshold).await;
1268 :
1269 0 : tracing::warn!("Reconcile passed the long running threshold of {long_reconcile_threshold:?}");
1270 :
1271 0 : metrics::METRICS_REGISTRY
1272 0 : .metrics_group
1273 0 : .storage_controller_reconcile_long_running
1274 0 : .inc(label_group);
1275 0 : }
1276 : };
1277 :
1278 0 : let reconcile_fut = std::pin::pin!(reconcile_fut);
1279 0 : let long_reconcile_fut = std::pin::pin!(long_reconcile_fut);
1280 :
1281 0 : let (was_long, result) =
1282 0 : match future::select(reconcile_fut, long_reconcile_fut).await {
1283 0 : Either::Left((reconcile_result, _)) => (false, reconcile_result),
1284 0 : Either::Right((_, reconcile_fut)) => (true, reconcile_fut.await),
1285 : };
1286 :
1287 0 : if was_long {
1288 0 : let id = metrics::METRICS_REGISTRY
1289 0 : .metrics_group
1290 0 : .storage_controller_reconcile_long_running
1291 0 : .with_labels(label_group);
1292 0 : metrics::METRICS_REGISTRY
1293 0 : .metrics_group
1294 0 : .storage_controller_reconcile_long_running
1295 0 : .remove_metric(id);
1296 0 : }
1297 :
1298 0 : result_tx
1299 0 : .send(ReconcileResultRequest::ReconcileResult(result))
1300 0 : .ok();
1301 0 : }
1302 : .instrument(reconciler_span),
1303 : );
1304 :
1305 : self.reconciler = Some(ReconcilerHandle {
1306 : sequence: self.sequence,
1307 : handle: join_handle,
1308 : cancel: reconciler_cancel,
1309 : });
1310 :
1311 : Some(ReconcilerWaiter {
1312 : tenant_shard_id: self.tenant_shard_id,
1313 : seq_wait: self.waiter.clone(),
1314 : error_seq_wait: self.error_waiter.clone(),
1315 : error: self.last_error.clone(),
1316 : seq: self.sequence,
1317 : })
1318 : }
1319 :
1320 0 : pub(crate) fn cancel_reconciler(&self) {
1321 0 : if let Some(handle) = self.reconciler.as_ref() {
1322 0 : handle.cancel.cancel()
1323 0 : }
1324 0 : }
1325 :
1326 : /// Get a waiter for any reconciliation in flight, but do not start reconciliation
1327 : /// if it is not already running
1328 0 : pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
1329 0 : if self.reconciler.is_some() {
1330 0 : Some(ReconcilerWaiter {
1331 0 : tenant_shard_id: self.tenant_shard_id,
1332 0 : seq_wait: self.waiter.clone(),
1333 0 : error_seq_wait: self.error_waiter.clone(),
1334 0 : error: self.last_error.clone(),
1335 0 : seq: self.sequence,
1336 0 : })
1337 : } else {
1338 0 : None
1339 : }
1340 0 : }
1341 :
1342 : /// Called when a ReconcileResult has been emitted and the service is updating
1343 : /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
1344 : /// the handle to indicate there is no longer a reconciliation in progress.
1345 0 : pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
1346 0 : if let Some(reconcile_handle) = &self.reconciler {
1347 0 : if reconcile_handle.sequence <= sequence {
1348 0 : self.reconciler = None;
1349 0 : }
1350 0 : }
1351 0 : }
1352 :
1353 : /// If we had any state at all referring to this node ID, drop it. Does not
1354 : /// attempt to reschedule.
1355 : ///
1356 : /// Returns true if we modified the node's intent state.
1357 0 : pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
1358 0 : let mut intent_modified = false;
1359 0 :
1360 0 : // Drop if this node was our attached intent
1361 0 : if self.intent.attached == Some(node_id) {
1362 0 : self.intent.attached = None;
1363 0 : intent_modified = true;
1364 0 : }
1365 :
1366 : // Drop from the list of secondaries, and check if we modified it
1367 0 : let had_secondaries = self.intent.secondary.len();
1368 0 : self.intent.secondary.retain(|n| n != &node_id);
1369 0 : intent_modified |= self.intent.secondary.len() != had_secondaries;
1370 0 :
1371 0 : debug_assert!(!self.intent.all_pageservers().contains(&node_id));
1372 :
1373 0 : intent_modified
1374 0 : }
1375 :
1376 0 : pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
1377 0 : self.scheduling_policy = p;
1378 0 : }
1379 :
1380 0 : pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
1381 0 : &self.scheduling_policy
1382 0 : }
1383 :
1384 0 : pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
1385 0 : // Ordering: always set last_error before advancing sequence, so that sequence
1386 0 : // waiters are guaranteed to see a Some value when they see an error.
1387 0 : *(self.last_error.lock().unwrap()) = Some(Arc::new(error));
1388 0 : self.error_waiter.advance(sequence);
1389 0 : }
1390 :
1391 0 : pub(crate) fn from_persistent(
1392 0 : tsp: TenantShardPersistence,
1393 0 : intent: IntentState,
1394 0 : ) -> anyhow::Result<Self> {
1395 0 : let tenant_shard_id = tsp.get_tenant_shard_id()?;
1396 0 : let shard_identity = tsp.get_shard_identity()?;
1397 :
1398 0 : metrics::METRICS_REGISTRY
1399 0 : .metrics_group
1400 0 : .storage_controller_tenant_shards
1401 0 : .inc();
1402 0 :
1403 0 : Ok(Self {
1404 0 : tenant_shard_id,
1405 0 : shard: shard_identity,
1406 0 : sequence: Sequence::initial(),
1407 0 : generation: tsp.generation.map(|g| Generation::new(g as u32)),
1408 0 : policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
1409 0 : intent,
1410 0 : observed: ObservedState::new(),
1411 0 : config: serde_json::from_str(&tsp.config).unwrap(),
1412 0 : reconciler: None,
1413 0 : splitting: tsp.splitting,
1414 0 : waiter: Arc::new(SeqWait::new(Sequence::initial())),
1415 0 : error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
1416 0 : last_error: Arc::default(),
1417 0 : pending_compute_notification: false,
1418 0 : delayed_reconcile: false,
1419 0 : scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
1420 0 : preferred_az_id: tsp.preferred_az_id.map(AvailabilityZone),
1421 0 : })
1422 0 : }
1423 :
1424 0 : pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
1425 0 : TenantShardPersistence {
1426 0 : tenant_id: self.tenant_shard_id.tenant_id.to_string(),
1427 0 : shard_number: self.tenant_shard_id.shard_number.0 as i32,
1428 0 : shard_count: self.tenant_shard_id.shard_count.literal() as i32,
1429 0 : shard_stripe_size: self.shard.stripe_size.0 as i32,
1430 0 : generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
1431 0 : generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
1432 0 : placement_policy: serde_json::to_string(&self.policy).unwrap(),
1433 0 : config: serde_json::to_string(&self.config).unwrap(),
1434 0 : splitting: SplitState::default(),
1435 0 : scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
1436 0 : preferred_az_id: self.preferred_az_id.as_ref().map(|az| az.0.clone()),
1437 0 : }
1438 0 : }
1439 :
1440 12500 : pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> {
1441 12500 : self.preferred_az_id.as_ref()
1442 12500 : }
1443 :
1444 12500 : pub(crate) fn set_preferred_az(&mut self, preferred_az_id: AvailabilityZone) {
1445 12500 : self.preferred_az_id = Some(preferred_az_id);
1446 12500 : }
1447 :
1448 : /// Returns all the nodes to which this tenant shard is attached according to the
1449 : /// observed state and the generations. Return vector is sorted from latest generation
1450 : /// to earliest.
1451 0 : pub(crate) fn attached_locations(&self) -> Vec<(NodeId, Generation)> {
1452 0 : self.observed
1453 0 : .locations
1454 0 : .iter()
1455 0 : .filter_map(|(node_id, observed)| {
1456 : use LocationConfigMode::{AttachedMulti, AttachedSingle, AttachedStale};
1457 :
1458 0 : let conf = observed.conf.as_ref()?;
1459 :
1460 0 : match (conf.generation, conf.mode) {
1461 0 : (Some(gen), AttachedMulti | AttachedSingle | AttachedStale) => {
1462 0 : Some((*node_id, gen))
1463 : }
1464 0 : _ => None,
1465 : }
1466 0 : })
1467 0 : .sorted_by(|(_lhs_node_id, lhs_gen), (_rhs_node_id, rhs_gen)| {
1468 0 : lhs_gen.cmp(rhs_gen).reverse()
1469 0 : })
1470 0 : .map(|(node_id, gen)| (node_id, Generation::new(gen)))
1471 0 : .collect()
1472 0 : }
1473 :
1474 : /// Update the observed state of the tenant by applying incremental deltas
1475 : ///
1476 : /// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
1477 : /// They are then filtered in [`crate::service::Service::process_result`].
1478 0 : pub(crate) fn apply_observed_deltas(
1479 0 : &mut self,
1480 0 : deltas: impl Iterator<Item = ObservedStateDelta>,
1481 0 : ) {
1482 0 : for delta in deltas {
1483 0 : match delta {
1484 0 : ObservedStateDelta::Upsert(ups) => {
1485 0 : let (node_id, loc) = *ups;
1486 0 :
1487 0 : // If the generation of the observed location in the delta is lagging
1488 0 : // behind the current one, then we have a race condition and cannot
1489 0 : // be certain about the true observed state. Set the observed state
1490 0 : // to None in order to reflect this.
1491 0 : let crnt_gen = self
1492 0 : .observed
1493 0 : .locations
1494 0 : .get(&node_id)
1495 0 : .and_then(|loc| loc.conf.as_ref())
1496 0 : .and_then(|conf| conf.generation);
1497 0 : let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
1498 0 : match (crnt_gen, new_gen) {
1499 0 : (Some(crnt), Some(new)) if crnt_gen > new_gen => {
1500 0 : tracing::warn!(
1501 0 : "Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
1502 : node_id, loc, crnt, new
1503 : );
1504 :
1505 0 : self.observed
1506 0 : .locations
1507 0 : .insert(node_id, ObservedStateLocation { conf: None });
1508 0 :
1509 0 : continue;
1510 : }
1511 0 : _ => {}
1512 : }
1513 :
1514 0 : if let Some(conf) = &loc.conf {
1515 0 : tracing::info!("Updating observed location {}: {:?}", node_id, conf);
1516 : } else {
1517 0 : tracing::info!("Setting observed location {} to None", node_id,)
1518 : }
1519 :
1520 0 : self.observed.locations.insert(node_id, loc);
1521 : }
1522 0 : ObservedStateDelta::Delete(node_id) => {
1523 0 : tracing::info!("Deleting observed location {}", node_id);
1524 0 : self.observed.locations.remove(&node_id);
1525 : }
1526 : }
1527 : }
1528 0 : }
1529 : }
1530 :
1531 : impl Drop for TenantShard {
1532 12519 : fn drop(&mut self) {
1533 12519 : metrics::METRICS_REGISTRY
1534 12519 : .metrics_group
1535 12519 : .storage_controller_tenant_shards
1536 12519 : .dec();
1537 12519 : }
1538 : }
1539 :
1540 : #[cfg(test)]
1541 : pub(crate) mod tests {
1542 : use std::{cell::RefCell, rc::Rc};
1543 :
1544 : use pageserver_api::{
1545 : controller_api::NodeAvailability,
1546 : shard::{ShardCount, ShardNumber},
1547 : };
1548 : use rand::{rngs::StdRng, SeedableRng};
1549 : use utils::id::TenantId;
1550 :
1551 : use crate::scheduler::test_utils::make_test_nodes;
1552 :
1553 : use super::*;
1554 :
1555 7 : fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
1556 7 : let tenant_id = TenantId::generate();
1557 7 : let shard_number = ShardNumber(0);
1558 7 : let shard_count = ShardCount::new(1);
1559 7 :
1560 7 : let tenant_shard_id = TenantShardId {
1561 7 : tenant_id,
1562 7 : shard_number,
1563 7 : shard_count,
1564 7 : };
1565 7 : TenantShard::new(
1566 7 : tenant_shard_id,
1567 7 : ShardIdentity::new(
1568 7 : shard_number,
1569 7 : shard_count,
1570 7 : pageserver_api::shard::ShardStripeSize(32768),
1571 7 : )
1572 7 : .unwrap(),
1573 7 : policy,
1574 7 : )
1575 7 : }
1576 :
1577 5003 : fn make_test_tenant(
1578 5003 : policy: PlacementPolicy,
1579 5003 : shard_count: ShardCount,
1580 5003 : preferred_az: Option<AvailabilityZone>,
1581 5003 : ) -> Vec<TenantShard> {
1582 5003 : let tenant_id = TenantId::generate();
1583 5003 :
1584 5003 : (0..shard_count.count())
1585 12512 : .map(|i| {
1586 12512 : let shard_number = ShardNumber(i);
1587 12512 :
1588 12512 : let tenant_shard_id = TenantShardId {
1589 12512 : tenant_id,
1590 12512 : shard_number,
1591 12512 : shard_count,
1592 12512 : };
1593 12512 : let mut ts = TenantShard::new(
1594 12512 : tenant_shard_id,
1595 12512 : ShardIdentity::new(
1596 12512 : shard_number,
1597 12512 : shard_count,
1598 12512 : pageserver_api::shard::ShardStripeSize(32768),
1599 12512 : )
1600 12512 : .unwrap(),
1601 12512 : policy.clone(),
1602 12512 : );
1603 :
1604 12512 : if let Some(az) = &preferred_az {
1605 12500 : ts.set_preferred_az(az.clone());
1606 12500 : }
1607 :
1608 12512 : ts
1609 12512 : })
1610 5003 : .collect()
1611 5003 : }
1612 :
1613 : /// Test the scheduling behaviors used when a tenant configured for HA is subject
1614 : /// to nodes being marked offline.
1615 : #[test]
1616 1 : fn tenant_ha_scheduling() -> anyhow::Result<()> {
1617 1 : // Start with three nodes. Our tenant will only use two. The third one is
1618 1 : // expected to remain unused.
1619 1 : let mut nodes = make_test_nodes(3, &[]);
1620 1 :
1621 1 : let mut scheduler = Scheduler::new(nodes.values());
1622 1 : let mut context = ScheduleContext::default();
1623 1 :
1624 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1625 1 : tenant_shard
1626 1 : .schedule(&mut scheduler, &mut context)
1627 1 : .expect("we have enough nodes, scheduling should work");
1628 1 :
1629 1 : // Expect to initially be schedule on to different nodes
1630 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
1631 1 : assert!(tenant_shard.intent.attached.is_some());
1632 :
1633 1 : let attached_node_id = tenant_shard.intent.attached.unwrap();
1634 1 : let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
1635 1 : assert_ne!(attached_node_id, secondary_node_id);
1636 :
1637 : // Notifying the attached node is offline should demote it to a secondary
1638 1 : let changed = tenant_shard
1639 1 : .intent
1640 1 : .demote_attached(&mut scheduler, attached_node_id);
1641 1 : assert!(changed);
1642 1 : assert!(tenant_shard.intent.attached.is_none());
1643 1 : assert_eq!(tenant_shard.intent.secondary.len(), 2);
1644 :
1645 : // Update the scheduler state to indicate the node is offline
1646 1 : nodes
1647 1 : .get_mut(&attached_node_id)
1648 1 : .unwrap()
1649 1 : .set_availability(NodeAvailability::Offline);
1650 1 : scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
1651 1 :
1652 1 : // Scheduling the node should promote the still-available secondary node to attached
1653 1 : tenant_shard
1654 1 : .schedule(&mut scheduler, &mut context)
1655 1 : .expect("active nodes are available");
1656 1 : assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
1657 :
1658 : // The original attached node should have been retained as a secondary
1659 1 : assert_eq!(
1660 1 : *tenant_shard.intent.secondary.iter().last().unwrap(),
1661 1 : attached_node_id
1662 1 : );
1663 :
1664 1 : tenant_shard.intent.clear(&mut scheduler);
1665 1 :
1666 1 : Ok(())
1667 1 : }
1668 :
1669 : #[test]
1670 1 : fn intent_from_observed() -> anyhow::Result<()> {
1671 1 : let nodes = make_test_nodes(3, &[]);
1672 1 : let mut scheduler = Scheduler::new(nodes.values());
1673 1 :
1674 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1675 1 :
1676 1 : tenant_shard.observed.locations.insert(
1677 1 : NodeId(3),
1678 1 : ObservedStateLocation {
1679 1 : conf: Some(LocationConfig {
1680 1 : mode: LocationConfigMode::AttachedMulti,
1681 1 : generation: Some(2),
1682 1 : secondary_conf: None,
1683 1 : shard_number: tenant_shard.shard.number.0,
1684 1 : shard_count: tenant_shard.shard.count.literal(),
1685 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1686 1 : tenant_conf: TenantConfig::default(),
1687 1 : }),
1688 1 : },
1689 1 : );
1690 1 :
1691 1 : tenant_shard.observed.locations.insert(
1692 1 : NodeId(2),
1693 1 : ObservedStateLocation {
1694 1 : conf: Some(LocationConfig {
1695 1 : mode: LocationConfigMode::AttachedStale,
1696 1 : generation: Some(1),
1697 1 : secondary_conf: None,
1698 1 : shard_number: tenant_shard.shard.number.0,
1699 1 : shard_count: tenant_shard.shard.count.literal(),
1700 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1701 1 : tenant_conf: TenantConfig::default(),
1702 1 : }),
1703 1 : },
1704 1 : );
1705 1 :
1706 1 : tenant_shard.intent_from_observed(&mut scheduler);
1707 1 :
1708 1 : // The highest generationed attached location gets used as attached
1709 1 : assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
1710 : // Other locations get used as secondary
1711 1 : assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
1712 :
1713 1 : scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
1714 :
1715 1 : tenant_shard.intent.clear(&mut scheduler);
1716 1 : Ok(())
1717 1 : }
1718 :
1719 : #[test]
1720 1 : fn scheduling_mode() -> anyhow::Result<()> {
1721 1 : let nodes = make_test_nodes(3, &[]);
1722 1 : let mut scheduler = Scheduler::new(nodes.values());
1723 1 :
1724 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1725 1 :
1726 1 : // In pause mode, schedule() shouldn't do anything
1727 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
1728 1 : assert!(tenant_shard
1729 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
1730 1 : .is_ok());
1731 1 : assert!(tenant_shard.intent.all_pageservers().is_empty());
1732 :
1733 : // In active mode, schedule() works
1734 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
1735 1 : assert!(tenant_shard
1736 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
1737 1 : .is_ok());
1738 1 : assert!(!tenant_shard.intent.all_pageservers().is_empty());
1739 :
1740 1 : tenant_shard.intent.clear(&mut scheduler);
1741 1 : Ok(())
1742 1 : }
1743 :
1744 : #[test]
1745 1 : fn optimize_attachment() -> anyhow::Result<()> {
1746 1 : let nodes = make_test_nodes(3, &[]);
1747 1 : let mut scheduler = Scheduler::new(nodes.values());
1748 1 :
1749 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
1750 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
1751 1 :
1752 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
1753 1 : // on different nodes.
1754 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1755 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
1756 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1757 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
1758 1 :
1759 1 : let mut schedule_context = ScheduleContext::default();
1760 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
1761 1 : schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
1762 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
1763 1 : schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
1764 1 :
1765 1 : let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
1766 1 :
1767 1 : // Either shard should recognize that it has the option to switch to a secondary location where there
1768 1 : // would be no other shards from the same tenant, and request to do so.
1769 1 : assert_eq!(
1770 1 : optimization_a,
1771 1 : Some(ScheduleOptimization {
1772 1 : sequence: shard_a.sequence,
1773 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1774 1 : old_attached_node_id: NodeId(1),
1775 1 : new_attached_node_id: NodeId(2)
1776 1 : })
1777 1 : })
1778 1 : );
1779 :
1780 : // Note that these optimizing two shards in the same tenant with the same ScheduleContext is
1781 : // mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
1782 : // of [`Service::optimize_all`] to avoid trying
1783 : // to do optimizations for multiple shards in the same tenant at the same time. Generating
1784 : // both optimizations is just done for test purposes
1785 1 : let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
1786 1 : assert_eq!(
1787 1 : optimization_b,
1788 1 : Some(ScheduleOptimization {
1789 1 : sequence: shard_b.sequence,
1790 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1791 1 : old_attached_node_id: NodeId(1),
1792 1 : new_attached_node_id: NodeId(3)
1793 1 : })
1794 1 : })
1795 1 : );
1796 :
1797 : // Applying these optimizations should result in the end state proposed
1798 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
1799 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
1800 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
1801 1 : shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
1802 1 : assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
1803 1 : assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
1804 :
1805 1 : shard_a.intent.clear(&mut scheduler);
1806 1 : shard_b.intent.clear(&mut scheduler);
1807 1 :
1808 1 : Ok(())
1809 1 : }
1810 :
1811 : #[test]
1812 1 : fn optimize_secondary() -> anyhow::Result<()> {
1813 1 : let nodes = make_test_nodes(4, &[]);
1814 1 : let mut scheduler = Scheduler::new(nodes.values());
1815 1 :
1816 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
1817 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
1818 1 :
1819 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
1820 1 : // on different nodes.
1821 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1822 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
1823 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
1824 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
1825 1 :
1826 1 : let mut schedule_context = ScheduleContext::default();
1827 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
1828 1 : schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
1829 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
1830 1 : schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
1831 1 :
1832 1 : let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);
1833 1 :
1834 1 : // Since there is a node with no locations available, the node with two locations for the
1835 1 : // same tenant should generate an optimization to move one away
1836 1 : assert_eq!(
1837 1 : optimization_a,
1838 1 : Some(ScheduleOptimization {
1839 1 : sequence: shard_a.sequence,
1840 1 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
1841 1 : old_node_id: NodeId(3),
1842 1 : new_node_id: NodeId(4)
1843 1 : })
1844 1 : })
1845 1 : );
1846 :
1847 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
1848 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
1849 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
1850 :
1851 1 : shard_a.intent.clear(&mut scheduler);
1852 1 : shard_b.intent.clear(&mut scheduler);
1853 1 :
1854 1 : Ok(())
1855 1 : }
1856 :
1857 : // Optimize til quiescent: this emulates what Service::optimize_all does, when
1858 : // called repeatedly in the background.
1859 : // Returns the applied optimizations
1860 3 : fn optimize_til_idle(
1861 3 : nodes: &HashMap<NodeId, Node>,
1862 3 : scheduler: &mut Scheduler,
1863 3 : shards: &mut [TenantShard],
1864 3 : ) -> Vec<ScheduleOptimization> {
1865 3 : let mut loop_n = 0;
1866 3 : let mut optimizations = Vec::default();
1867 : loop {
1868 9 : let mut schedule_context = ScheduleContext::default();
1869 9 : let mut any_changed = false;
1870 :
1871 36 : for shard in shards.iter() {
1872 36 : schedule_context.avoid(&shard.intent.all_pageservers());
1873 36 : if let Some(attached) = shard.intent.get_attached() {
1874 36 : schedule_context.push_attached(*attached);
1875 36 : }
1876 : }
1877 :
1878 21 : for shard in shards.iter_mut() {
1879 21 : let optimization = shard.optimize_attachment(nodes, &schedule_context);
1880 21 : if let Some(optimization) = optimization {
1881 2 : optimizations.push(optimization.clone());
1882 2 : shard.apply_optimization(scheduler, optimization);
1883 2 : any_changed = true;
1884 2 : break;
1885 19 : }
1886 19 :
1887 19 : let optimization = shard.optimize_secondary(scheduler, &schedule_context);
1888 19 : if let Some(optimization) = optimization {
1889 4 : optimizations.push(optimization.clone());
1890 4 : shard.apply_optimization(scheduler, optimization);
1891 4 : any_changed = true;
1892 4 : break;
1893 15 : }
1894 : }
1895 :
1896 9 : if !any_changed {
1897 3 : break;
1898 6 : }
1899 6 :
1900 6 : // Assert no infinite loop
1901 6 : loop_n += 1;
1902 6 : assert!(loop_n < 1000);
1903 : }
1904 :
1905 3 : optimizations
1906 3 : }
1907 :
1908 : /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
1909 : /// that it converges.
1910 : #[test]
1911 1 : fn optimize_add_nodes() -> anyhow::Result<()> {
1912 1 : let nodes = make_test_nodes(4, &[]);
1913 1 :
1914 1 : // Only show the scheduler a couple of nodes
1915 1 : let mut scheduler = Scheduler::new([].iter());
1916 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1917 1 : scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
1918 1 :
1919 1 : let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
1920 1 : let mut schedule_context = ScheduleContext::default();
1921 5 : for shard in &mut shards {
1922 4 : assert!(shard
1923 4 : .schedule(&mut scheduler, &mut schedule_context)
1924 4 : .is_ok());
1925 : }
1926 :
1927 : // We should see equal number of locations on the two nodes.
1928 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
1929 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
1930 :
1931 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
1932 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
1933 :
1934 : // Add another two nodes: we should see the shards spread out when their optimize
1935 : // methods are called
1936 1 : scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
1937 1 : scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
1938 1 : optimize_til_idle(&nodes, &mut scheduler, &mut shards);
1939 1 :
1940 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
1941 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
1942 :
1943 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
1944 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
1945 :
1946 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
1947 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1);
1948 :
1949 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
1950 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1);
1951 :
1952 4 : for shard in shards.iter_mut() {
1953 4 : shard.intent.clear(&mut scheduler);
1954 4 : }
1955 :
1956 1 : Ok(())
1957 1 : }
1958 :
1959 : /// Test that initial shard scheduling is optimal. By optimal we mean
1960 : /// that the optimizer cannot find a way to improve it.
1961 : ///
1962 : /// This test is an example of the scheduling issue described in
1963 : /// https://github.com/neondatabase/neon/issues/8969
1964 : #[test]
1965 1 : fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
1966 : use itertools::Itertools;
1967 :
1968 1 : let nodes = make_test_nodes(2, &[]);
1969 1 :
1970 1 : let mut scheduler = Scheduler::new([].iter());
1971 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1972 1 : scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
1973 1 :
1974 1 : let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
1975 1 : let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
1976 1 :
1977 1 : let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
1978 1 : let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
1979 1 :
1980 4 : let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
1981 4 : let b_shards_with_context = b.iter_mut().map(|shard| (shard, b_context.clone()));
1982 1 :
1983 1 : let schedule_order = a_shards_with_context.interleave(b_shards_with_context);
1984 :
1985 9 : for (shard, context) in schedule_order {
1986 8 : let context = &mut *context.borrow_mut();
1987 8 : shard.schedule(&mut scheduler, context).unwrap();
1988 8 : }
1989 :
1990 1 : let applied_to_a = optimize_til_idle(&nodes, &mut scheduler, &mut a);
1991 1 : assert_eq!(applied_to_a, vec![]);
1992 :
1993 1 : let applied_to_b = optimize_til_idle(&nodes, &mut scheduler, &mut b);
1994 1 : assert_eq!(applied_to_b, vec![]);
1995 :
1996 8 : for shard in a.iter_mut().chain(b.iter_mut()) {
1997 8 : shard.intent.clear(&mut scheduler);
1998 8 : }
1999 :
2000 1 : Ok(())
2001 1 : }
2002 :
2003 : #[test]
2004 1 : fn random_az_shard_scheduling() -> anyhow::Result<()> {
2005 : use rand::seq::SliceRandom;
2006 :
2007 51 : for seed in 0..50 {
2008 50 : eprintln!("Running test with seed {seed}");
2009 50 : let mut rng = StdRng::seed_from_u64(seed);
2010 50 :
2011 50 : let az_a_tag = AvailabilityZone("az-a".to_string());
2012 50 : let az_b_tag = AvailabilityZone("az-b".to_string());
2013 50 : let azs = [az_a_tag, az_b_tag];
2014 50 : let nodes = make_test_nodes(4, &azs);
2015 50 : let mut shards_per_az: HashMap<AvailabilityZone, u32> = HashMap::new();
2016 50 :
2017 50 : let mut scheduler = Scheduler::new([].iter());
2018 200 : for node in nodes.values() {
2019 200 : scheduler.node_upsert(node);
2020 200 : }
2021 :
2022 50 : let mut shards = Vec::default();
2023 50 : let mut contexts = Vec::default();
2024 50 : let mut az_picker = azs.iter().cycle().cloned();
2025 5050 : for i in 0..100 {
2026 5000 : let az = az_picker.next().unwrap();
2027 5000 : let shard_count = i % 4 + 1;
2028 5000 : *shards_per_az.entry(az.clone()).or_default() += shard_count;
2029 5000 :
2030 5000 : let tenant_shards = make_test_tenant(
2031 5000 : PlacementPolicy::Attached(1),
2032 5000 : ShardCount::new(shard_count.try_into().unwrap()),
2033 5000 : Some(az),
2034 5000 : );
2035 5000 : let context = Rc::new(RefCell::new(ScheduleContext::default()));
2036 5000 :
2037 5000 : contexts.push(context.clone());
2038 5000 : let with_ctx = tenant_shards
2039 5000 : .into_iter()
2040 12500 : .map(|shard| (shard, context.clone()));
2041 17500 : for shard_with_ctx in with_ctx {
2042 12500 : shards.push(shard_with_ctx);
2043 12500 : }
2044 : }
2045 :
2046 50 : shards.shuffle(&mut rng);
2047 :
2048 : #[derive(Default, Debug)]
2049 : struct NodeStats {
2050 : attachments: u32,
2051 : secondaries: u32,
2052 : }
2053 :
2054 50 : let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::default();
2055 50 : let mut attachments_in_wrong_az = 0;
2056 50 : let mut secondaries_in_wrong_az = 0;
2057 :
2058 12550 : for (shard, context) in &mut shards {
2059 12500 : let context = &mut *context.borrow_mut();
2060 12500 : shard.schedule(&mut scheduler, context).unwrap();
2061 12500 :
2062 12500 : let attached_node = shard.intent.get_attached().unwrap();
2063 12500 : let stats = node_stats.entry(attached_node).or_default();
2064 12500 : stats.attachments += 1;
2065 12500 :
2066 12500 : let secondary_node = *shard.intent.get_secondary().first().unwrap();
2067 12500 : let stats = node_stats.entry(secondary_node).or_default();
2068 12500 : stats.secondaries += 1;
2069 12500 :
2070 12500 : let attached_node_az = nodes
2071 12500 : .get(&attached_node)
2072 12500 : .unwrap()
2073 12500 : .get_availability_zone_id();
2074 12500 : let secondary_node_az = nodes
2075 12500 : .get(&secondary_node)
2076 12500 : .unwrap()
2077 12500 : .get_availability_zone_id();
2078 12500 : let preferred_az = shard.preferred_az().unwrap();
2079 12500 :
2080 12500 : if attached_node_az != preferred_az {
2081 0 : eprintln!(
2082 0 : "{} attachment was scheduled in AZ {} but preferred AZ {}",
2083 0 : shard.tenant_shard_id, attached_node_az, preferred_az
2084 0 : );
2085 0 : attachments_in_wrong_az += 1;
2086 12500 : }
2087 :
2088 12500 : if secondary_node_az == preferred_az {
2089 0 : eprintln!(
2090 0 : "{} secondary was scheduled in AZ {} which matches preference",
2091 0 : shard.tenant_shard_id, attached_node_az
2092 0 : );
2093 0 : secondaries_in_wrong_az += 1;
2094 12500 : }
2095 : }
2096 :
2097 50 : let mut violations = Vec::default();
2098 50 :
2099 50 : if attachments_in_wrong_az > 0 {
2100 0 : violations.push(format!(
2101 0 : "{} attachments scheduled to the incorrect AZ",
2102 0 : attachments_in_wrong_az
2103 0 : ));
2104 50 : }
2105 :
2106 50 : if secondaries_in_wrong_az > 0 {
2107 0 : violations.push(format!(
2108 0 : "{} secondaries scheduled to the incorrect AZ",
2109 0 : secondaries_in_wrong_az
2110 0 : ));
2111 50 : }
2112 :
2113 50 : eprintln!(
2114 50 : "attachments_in_wrong_az={} secondaries_in_wrong_az={}",
2115 50 : attachments_in_wrong_az, secondaries_in_wrong_az
2116 50 : );
2117 :
2118 250 : for (node_id, stats) in &node_stats {
2119 200 : let node_az = nodes.get(node_id).unwrap().get_availability_zone_id();
2120 200 : let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2;
2121 200 : let allowed_attachment_load =
2122 200 : (ideal_attachment_load - 1)..(ideal_attachment_load + 2);
2123 200 :
2124 200 : if !allowed_attachment_load.contains(&stats.attachments) {
2125 0 : violations.push(format!(
2126 0 : "Found {} attachments on node {}, but expected {}",
2127 0 : stats.attachments, node_id, ideal_attachment_load
2128 0 : ));
2129 200 : }
2130 :
2131 200 : eprintln!(
2132 200 : "{}: attachments={} secondaries={} ideal_attachment_load={}",
2133 200 : node_id, stats.attachments, stats.secondaries, ideal_attachment_load
2134 200 : );
2135 : }
2136 :
2137 50 : assert!(violations.is_empty(), "{violations:?}");
2138 :
2139 12550 : for (mut shard, _ctx) in shards {
2140 12500 : shard.intent.clear(&mut scheduler);
2141 12500 : }
2142 : }
2143 1 : Ok(())
2144 1 : }
2145 : }
|