Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : sync::Arc,
4 : time::Duration,
5 : };
6 :
7 : use crate::{
8 : metrics::{
9 : self, ReconcileCompleteLabelGroup, ReconcileLongRunningLabelGroup, ReconcileOutcome,
10 : },
11 : persistence::TenantShardPersistence,
12 : reconciler::{ReconcileUnits, ReconcilerConfig},
13 : scheduler::{
14 : AffinityScore, AttachedShardTag, MaySchedule, RefCountUpdate, ScheduleContext,
15 : SecondaryShardTag,
16 : },
17 : service::ReconcileResultRequest,
18 : };
19 : use futures::future::{self, Either};
20 : use itertools::Itertools;
21 : use pageserver_api::controller_api::{
22 : AvailabilityZone, NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
23 : };
24 : use pageserver_api::{
25 : models::{LocationConfig, LocationConfigMode, TenantConfig},
26 : shard::{ShardIdentity, TenantShardId},
27 : };
28 : use serde::{Deserialize, Serialize};
29 : use tokio::task::JoinHandle;
30 : use tokio_util::sync::CancellationToken;
31 : use tracing::{instrument, Instrument};
32 : use utils::{
33 : generation::Generation,
34 : id::NodeId,
35 : seqwait::{SeqWait, SeqWaitError},
36 : sync::gate::GateGuard,
37 : };
38 :
39 : use crate::{
40 : compute_hook::ComputeHook,
41 : node::Node,
42 : persistence::{split_state::SplitState, Persistence},
43 : reconciler::{
44 : attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
45 : },
46 : scheduler::{ScheduleError, Scheduler},
47 : service, Sequence,
48 : };
49 :
50 : /// Serialization helper
51 0 : fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
52 0 : where
53 0 : S: serde::ser::Serializer,
54 0 : T: std::fmt::Display,
55 0 : {
56 0 : serializer.collect_str(
57 0 : &v.lock()
58 0 : .unwrap()
59 0 : .as_ref()
60 0 : .map(|e| format!("{e}"))
61 0 : .unwrap_or("".to_string()),
62 0 : )
63 0 : }
64 :
65 : /// In-memory state for a particular tenant shard.
66 : ///
67 : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
68 : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
69 0 : #[derive(Serialize)]
70 : pub(crate) struct TenantShard {
71 : pub(crate) tenant_shard_id: TenantShardId,
72 :
73 : pub(crate) shard: ShardIdentity,
74 :
75 : // Runtime only: sequence used to coordinate when updating this object while
76 : // with background reconcilers may be running. A reconciler runs to a particular
77 : // sequence.
78 : pub(crate) sequence: Sequence,
79 :
80 : // Latest generation number: next time we attach, increment this
81 : // and use the incremented number when attaching.
82 : //
83 : // None represents an incompletely onboarded tenant via the [`Service::location_config`]
84 : // API, where this tenant may only run in PlacementPolicy::Secondary.
85 : pub(crate) generation: Option<Generation>,
86 :
87 : // High level description of how the tenant should be set up. Provided
88 : // externally.
89 : pub(crate) policy: PlacementPolicy,
90 :
91 : // Low level description of exactly which pageservers should fulfil
92 : // which role. Generated by `Self::schedule`.
93 : pub(crate) intent: IntentState,
94 :
95 : // Low level description of how the tenant is configured on pageservers:
96 : // if this does not match `Self::intent` then the tenant needs reconciliation
97 : // with `Self::reconcile`.
98 : pub(crate) observed: ObservedState,
99 :
100 : // Tenant configuration, passed through opaquely to the pageserver. Identical
101 : // for all shards in a tenant.
102 : pub(crate) config: TenantConfig,
103 :
104 : /// If a reconcile task is currently in flight, it may be joined here (it is
105 : /// only safe to join if either the result has been received or the reconciler's
106 : /// cancellation token has been fired)
107 : #[serde(skip)]
108 : pub(crate) reconciler: Option<ReconcilerHandle>,
109 :
110 : /// If a tenant is being split, then all shards with that TenantId will have a
111 : /// SplitState set, this acts as a guard against other operations such as background
112 : /// reconciliation, and timeline creation.
113 : pub(crate) splitting: SplitState,
114 :
115 : /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
116 : /// is set. This flag is cleared when the tenant is popped off the delay queue.
117 : pub(crate) delayed_reconcile: bool,
118 :
119 : /// Optionally wait for reconciliation to complete up to a particular
120 : /// sequence number.
121 : #[serde(skip)]
122 : pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
123 :
124 : /// Indicates sequence number for which we have encountered an error reconciling. If
125 : /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
126 : /// and callers should stop waiting for `waiter` and propagate the error.
127 : #[serde(skip)]
128 : pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
129 :
130 : /// The most recent error from a reconcile on this tenant. This is a nested Arc
131 : /// because:
132 : /// - ReconcileWaiters need to Arc-clone the overall object to read it later
133 : /// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
134 : /// many waiters for one shard, and the underlying error types are not Clone.
135 : ///
136 : /// TODO: generalize to an array of recent events
137 : /// TOOD: use a ArcSwap instead of mutex for faster reads?
138 : #[serde(serialize_with = "read_last_error")]
139 : pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
140 :
141 : /// If we have a pending compute notification that for some reason we weren't able to send,
142 : /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
143 : /// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
144 : /// of state that we publish externally in an eventually consistent way.
145 : pub(crate) pending_compute_notification: bool,
146 :
147 : // Support/debug tool: if something is going wrong or flapping with scheduling, this may
148 : // be set to a non-active state to avoid making changes while the issue is fixed.
149 : scheduling_policy: ShardSchedulingPolicy,
150 :
151 : // We should attempt to schedule this shard in the provided AZ to
152 : // decrease chances of cross-AZ compute.
153 : preferred_az_id: Option<AvailabilityZone>,
154 : }
155 :
156 : #[derive(Default, Clone, Debug, Serialize)]
157 : pub(crate) struct IntentState {
158 : attached: Option<NodeId>,
159 : secondary: Vec<NodeId>,
160 : }
161 :
162 : impl IntentState {
163 19 : pub(crate) fn new() -> Self {
164 19 : Self {
165 19 : attached: None,
166 19 : secondary: vec![],
167 19 : }
168 19 : }
169 0 : pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
170 0 : if let Some(node_id) = node_id {
171 0 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach);
172 0 : }
173 0 : Self {
174 0 : attached: node_id,
175 0 : secondary: vec![],
176 0 : }
177 0 : }
178 :
179 12538 : pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
180 12538 : if self.attached != new_attached {
181 12538 : if let Some(old_attached) = self.attached.take() {
182 0 : scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
183 12538 : }
184 12538 : if let Some(new_attached) = &new_attached {
185 12538 : scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach);
186 12538 : }
187 12538 : self.attached = new_attached;
188 0 : }
189 12538 : }
190 :
191 : /// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
192 : /// secondary to attached while maintaining the scheduler's reference counts.
193 5 : pub(crate) fn promote_attached(
194 5 : &mut self,
195 5 : scheduler: &mut Scheduler,
196 5 : promote_secondary: NodeId,
197 5 : ) {
198 5 : // If we call this with a node that isn't in secondary, it would cause incorrect
199 5 : // scheduler reference counting, since we assume the node is already referenced as a secondary.
200 5 : debug_assert!(self.secondary.contains(&promote_secondary));
201 :
202 10 : self.secondary.retain(|n| n != &promote_secondary);
203 5 :
204 5 : let demoted = self.attached;
205 5 : self.attached = Some(promote_secondary);
206 5 :
207 5 : scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary);
208 5 : if let Some(demoted) = demoted {
209 0 : scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached);
210 5 : }
211 5 : }
212 :
213 12525 : pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
214 12525 : debug_assert!(!self.secondary.contains(&new_secondary));
215 12525 : scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary);
216 12525 : self.secondary.push(new_secondary);
217 12525 : }
218 :
219 : /// It is legal to call this with a node that is not currently a secondary: that is a no-op
220 5 : pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
221 5 : let index = self.secondary.iter().position(|n| *n == node_id);
222 5 : if let Some(index) = index {
223 5 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
224 5 : self.secondary.remove(index);
225 5 : }
226 5 : }
227 :
228 12537 : pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
229 12537 : for secondary in self.secondary.drain(..) {
230 12520 : scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary);
231 12520 : }
232 12537 : }
233 :
234 : /// Remove the last secondary node from the list of secondaries
235 0 : pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
236 0 : if let Some(node_id) = self.secondary.pop() {
237 0 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
238 0 : }
239 0 : }
240 :
241 12537 : pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
242 12537 : if let Some(old_attached) = self.attached.take() {
243 12537 : scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
244 12537 : }
245 :
246 12537 : self.clear_secondary(scheduler);
247 12537 : }
248 :
249 12602 : pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
250 12602 : let mut result = Vec::new();
251 12602 : if let Some(p) = self.attached {
252 12600 : result.push(p)
253 2 : }
254 :
255 12602 : result.extend(self.secondary.iter().copied());
256 12602 :
257 12602 : result
258 12602 : }
259 :
260 25083 : pub(crate) fn get_attached(&self) -> &Option<NodeId> {
261 25083 : &self.attached
262 25083 : }
263 :
264 12524 : pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
265 12524 : &self.secondary
266 12524 : }
267 :
268 : /// If the node is in use as the attached location, demote it into
269 : /// the list of secondary locations. This is used when a node goes offline,
270 : /// and we want to use a different node for attachment, but not permanently
271 : /// forget the location on the offline node.
272 : ///
273 : /// Returns true if a change was made
274 5 : pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
275 5 : if self.attached == Some(node_id) {
276 5 : self.attached = None;
277 5 : self.secondary.push(node_id);
278 5 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached);
279 5 : true
280 : } else {
281 0 : false
282 : }
283 5 : }
284 : }
285 :
286 : impl Drop for IntentState {
287 12538 : fn drop(&mut self) {
288 12538 : // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
289 12538 : // We do not check this while panicking, to avoid polluting unit test failures or
290 12538 : // other assertions with this assertion's output. It's still wrong to leak these,
291 12538 : // but if we already have a panic then we don't need to independently flag this case.
292 12538 : if !(std::thread::panicking()) {
293 12538 : debug_assert!(self.attached.is_none() && self.secondary.is_empty());
294 0 : }
295 12537 : }
296 : }
297 :
298 0 : #[derive(Default, Clone, Serialize, Deserialize, Debug)]
299 : pub(crate) struct ObservedState {
300 : pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
301 : }
302 :
303 : /// Our latest knowledge of how this tenant is configured in the outside world.
304 : ///
305 : /// Meaning:
306 : /// * No instance of this type exists for a node: we are certain that we have nothing configured on that
307 : /// node for this shard.
308 : /// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
309 : /// what it is (e.g. we failed partway through configuring it)
310 : /// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
311 : /// and that configuration will still be present unless something external interfered.
312 0 : #[derive(Clone, Serialize, Deserialize, Debug)]
313 : pub(crate) struct ObservedStateLocation {
314 : /// If None, it means we do not know the status of this shard's location on this node, but
315 : /// we know that we might have some state on this node.
316 : pub(crate) conf: Option<LocationConfig>,
317 : }
318 : pub(crate) struct ReconcilerWaiter {
319 : // For observability purposes, remember the ID of the shard we're
320 : // waiting for.
321 : pub(crate) tenant_shard_id: TenantShardId,
322 :
323 : seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
324 : error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
325 : error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
326 : seq: Sequence,
327 : }
328 :
329 : pub(crate) enum ReconcilerStatus {
330 : Done,
331 : Failed,
332 : InProgress,
333 : }
334 :
335 0 : #[derive(thiserror::Error, Debug)]
336 : pub(crate) enum ReconcileWaitError {
337 : #[error("Timeout waiting for shard {0}")]
338 : Timeout(TenantShardId),
339 : #[error("shutting down")]
340 : Shutdown,
341 : #[error("Reconcile error on shard {0}: {1}")]
342 : Failed(TenantShardId, Arc<ReconcileError>),
343 : }
344 :
345 : #[derive(Eq, PartialEq, Debug, Clone)]
346 : pub(crate) struct ReplaceSecondary {
347 : old_node_id: NodeId,
348 : new_node_id: NodeId,
349 : }
350 :
351 : #[derive(Eq, PartialEq, Debug, Clone)]
352 : pub(crate) struct MigrateAttachment {
353 : pub(crate) old_attached_node_id: NodeId,
354 : pub(crate) new_attached_node_id: NodeId,
355 : }
356 :
357 : #[derive(Eq, PartialEq, Debug, Clone)]
358 : pub(crate) enum ScheduleOptimizationAction {
359 : // Replace one of our secondary locations with a different node
360 : ReplaceSecondary(ReplaceSecondary),
361 : // Migrate attachment to an existing secondary location
362 : MigrateAttachment(MigrateAttachment),
363 : }
364 :
365 : #[derive(Eq, PartialEq, Debug, Clone)]
366 : pub(crate) struct ScheduleOptimization {
367 : // What was the reconcile sequence when we generated this optimization? The optimization
368 : // should only be applied if the shard's sequence is still at this value, in case other changes
369 : // happened between planning the optimization and applying it.
370 : sequence: Sequence,
371 :
372 : pub(crate) action: ScheduleOptimizationAction,
373 : }
374 :
375 : impl ReconcilerWaiter {
376 0 : pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
377 0 : tokio::select! {
378 0 : result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
379 0 : result.map_err(|e| match e {
380 0 : SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
381 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
382 0 : })?;
383 : },
384 0 : result = self.error_seq_wait.wait_for(self.seq) => {
385 0 : result.map_err(|e| match e {
386 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
387 0 : SeqWaitError::Timeout => unreachable!()
388 0 : })?;
389 :
390 0 : return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
391 0 : self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
392 : }
393 : }
394 :
395 0 : Ok(())
396 0 : }
397 :
398 0 : pub(crate) fn get_status(&self) -> ReconcilerStatus {
399 0 : if self.seq_wait.would_wait_for(self.seq).is_ok() {
400 0 : ReconcilerStatus::Done
401 0 : } else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
402 0 : ReconcilerStatus::Failed
403 : } else {
404 0 : ReconcilerStatus::InProgress
405 : }
406 0 : }
407 : }
408 :
409 : /// Having spawned a reconciler task, the tenant shard's state will carry enough
410 : /// information to optionally cancel & await it later.
411 : pub(crate) struct ReconcilerHandle {
412 : sequence: Sequence,
413 : handle: JoinHandle<()>,
414 : cancel: CancellationToken,
415 : }
416 :
417 : pub(crate) enum ReconcileNeeded {
418 : /// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
419 : /// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
420 : No,
421 : /// shard has a reconciler running, and its intent hasn't changed since that one was
422 : /// spawned: wait for the existing reconciler rather than spawning a new one.
423 : WaitExisting(ReconcilerWaiter),
424 : /// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
425 : Yes,
426 : }
427 :
428 : /// Pending modification to the observed state of a tenant shard.
429 : /// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
430 : pub(crate) enum ObservedStateDelta {
431 : Upsert(Box<(NodeId, ObservedStateLocation)>),
432 : Delete(NodeId),
433 : }
434 :
435 : impl ObservedStateDelta {
436 0 : pub(crate) fn node_id(&self) -> &NodeId {
437 0 : match self {
438 0 : Self::Upsert(up) => &up.0,
439 0 : Self::Delete(nid) => nid,
440 : }
441 0 : }
442 : }
443 :
444 : /// When a reconcile task completes, it sends this result object
445 : /// to be applied to the primary TenantShard.
446 : pub(crate) struct ReconcileResult {
447 : pub(crate) sequence: Sequence,
448 : /// On errors, `observed` should be treated as an incompleted description
449 : /// of state (i.e. any nodes present in the result should override nodes
450 : /// present in the parent tenant state, but any unmentioned nodes should
451 : /// not be removed from parent tenant state)
452 : pub(crate) result: Result<(), ReconcileError>,
453 :
454 : pub(crate) tenant_shard_id: TenantShardId,
455 : pub(crate) generation: Option<Generation>,
456 : pub(crate) observed_deltas: Vec<ObservedStateDelta>,
457 :
458 : /// Set [`TenantShard::pending_compute_notification`] from this flag
459 : pub(crate) pending_compute_notification: bool,
460 : }
461 :
462 : impl ObservedState {
463 0 : pub(crate) fn new() -> Self {
464 0 : Self {
465 0 : locations: HashMap::new(),
466 0 : }
467 0 : }
468 : }
469 :
470 : impl TenantShard {
471 12519 : pub(crate) fn new(
472 12519 : tenant_shard_id: TenantShardId,
473 12519 : shard: ShardIdentity,
474 12519 : policy: PlacementPolicy,
475 12519 : ) -> Self {
476 12519 : Self {
477 12519 : tenant_shard_id,
478 12519 : policy,
479 12519 : intent: IntentState::default(),
480 12519 : generation: Some(Generation::new(0)),
481 12519 : shard,
482 12519 : observed: ObservedState::default(),
483 12519 : config: TenantConfig::default(),
484 12519 : reconciler: None,
485 12519 : splitting: SplitState::Idle,
486 12519 : sequence: Sequence(1),
487 12519 : delayed_reconcile: false,
488 12519 : waiter: Arc::new(SeqWait::new(Sequence(0))),
489 12519 : error_waiter: Arc::new(SeqWait::new(Sequence(0))),
490 12519 : last_error: Arc::default(),
491 12519 : pending_compute_notification: false,
492 12519 : scheduling_policy: ShardSchedulingPolicy::default(),
493 12519 : preferred_az_id: None,
494 12519 : }
495 12519 : }
496 :
497 : /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
498 : /// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
499 : /// to get an intent state that complies with placement policy. The overall goal is to do scheduling
500 : /// in a way that makes use of any configured locations that already exist in the outside world.
501 1 : pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
502 1 : // Choose an attached location by filtering observed locations, and then sorting to get the highest
503 1 : // generation
504 1 : let mut attached_locs = self
505 1 : .observed
506 1 : .locations
507 1 : .iter()
508 2 : .filter_map(|(node_id, l)| {
509 2 : if let Some(conf) = &l.conf {
510 2 : if conf.mode == LocationConfigMode::AttachedMulti
511 1 : || conf.mode == LocationConfigMode::AttachedSingle
512 1 : || conf.mode == LocationConfigMode::AttachedStale
513 : {
514 2 : Some((node_id, conf.generation))
515 : } else {
516 0 : None
517 : }
518 : } else {
519 0 : None
520 : }
521 2 : })
522 1 : .collect::<Vec<_>>();
523 1 :
524 2 : attached_locs.sort_by_key(|i| i.1);
525 1 : if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
526 1 : self.intent.set_attached(scheduler, Some(*node_id));
527 1 : }
528 :
529 : // All remaining observed locations generate secondary intents. This includes None
530 : // observations, as these may well have some local content on disk that is usable (this
531 : // is an edge case that might occur if we restarted during a migration or other change)
532 : //
533 : // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
534 : // will take care of promoting one of these secondaries to be attached.
535 2 : self.observed.locations.keys().for_each(|node_id| {
536 2 : if Some(*node_id) != self.intent.attached {
537 1 : self.intent.push_secondary(scheduler, *node_id);
538 1 : }
539 2 : });
540 1 : }
541 :
542 : /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
543 : /// attached pageserver for a shard.
544 : ///
545 : /// Returns whether we modified it, and the NodeId selected.
546 12515 : fn schedule_attached(
547 12515 : &mut self,
548 12515 : scheduler: &mut Scheduler,
549 12515 : context: &ScheduleContext,
550 12515 : ) -> Result<(bool, NodeId), ScheduleError> {
551 : // No work to do if we already have an attached tenant
552 12515 : if let Some(node_id) = self.intent.attached {
553 0 : return Ok((false, node_id));
554 12515 : }
555 :
556 12515 : if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
557 : // Promote a secondary
558 1 : tracing::debug!("Promoted secondary {} to attached", promote_secondary);
559 1 : self.intent.promote_attached(scheduler, promote_secondary);
560 1 : Ok((true, promote_secondary))
561 : } else {
562 : // Pick a fresh node: either we had no secondaries or none were schedulable
563 12514 : let node_id = scheduler.schedule_shard::<AttachedShardTag>(
564 12514 : &self.intent.secondary,
565 12514 : &self.preferred_az_id,
566 12514 : context,
567 12514 : )?;
568 12514 : tracing::debug!("Selected {} as attached", node_id);
569 12514 : self.intent.set_attached(scheduler, Some(node_id));
570 12514 : Ok((true, node_id))
571 : }
572 12515 : }
573 :
574 12516 : #[instrument(skip_all, fields(
575 : tenant_id=%self.tenant_shard_id.tenant_id,
576 : shard_id=%self.tenant_shard_id.shard_slug(),
577 : sequence=%self.sequence
578 12516 : ))]
579 : pub(crate) fn schedule(
580 : &mut self,
581 : scheduler: &mut Scheduler,
582 : context: &mut ScheduleContext,
583 : ) -> Result<(), ScheduleError> {
584 : let r = self.do_schedule(scheduler, context);
585 :
586 : context.avoid(&self.intent.all_pageservers());
587 : if let Some(attached) = self.intent.get_attached() {
588 : context.push_attached(*attached);
589 : }
590 :
591 : r
592 : }
593 :
594 12516 : pub(crate) fn do_schedule(
595 12516 : &mut self,
596 12516 : scheduler: &mut Scheduler,
597 12516 : context: &ScheduleContext,
598 12516 : ) -> Result<(), ScheduleError> {
599 12516 : // TODO: before scheduling new nodes, check if any existing content in
600 12516 : // self.intent refers to pageservers that are offline, and pick other
601 12516 : // pageservers if so.
602 12516 :
603 12516 : // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
604 12516 : // change their attach location.
605 12516 :
606 12516 : match self.scheduling_policy {
607 12515 : ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
608 : ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
609 : // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
610 1 : tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
611 0 : "Scheduling is disabled by policy {:?}", self.scheduling_policy);
612 1 : return Ok(());
613 : }
614 : }
615 :
616 : // Build the set of pageservers already in use by this tenant, to avoid scheduling
617 : // more work on the same pageservers we're already using.
618 12515 : let mut modified = false;
619 :
620 : // Add/remove nodes to fulfil policy
621 : use PlacementPolicy::*;
622 12515 : match self.policy {
623 12515 : Attached(secondary_count) => {
624 12515 : let retain_secondaries = if self.intent.attached.is_none()
625 12515 : && scheduler.node_preferred(&self.intent.secondary).is_some()
626 : {
627 : // If we have no attached, and one of the secondaries is elegible to be promoted, retain
628 : // one more secondary than we usually would, as one of them will become attached futher down this function.
629 1 : secondary_count + 1
630 : } else {
631 12514 : secondary_count
632 : };
633 :
634 12515 : while self.intent.secondary.len() > retain_secondaries {
635 0 : // We have no particular preference for one secondary location over another: just
636 0 : // arbitrarily drop from the end
637 0 : self.intent.pop_secondary(scheduler);
638 0 : modified = true;
639 0 : }
640 :
641 : // Should have exactly one attached, and N secondaries
642 12515 : let (modified_attached, attached_node_id) =
643 12515 : self.schedule_attached(scheduler, context)?;
644 12515 : modified |= modified_attached;
645 12515 :
646 12515 : let mut used_pageservers = vec![attached_node_id];
647 25029 : while self.intent.secondary.len() < secondary_count {
648 12514 : let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
649 12514 : &used_pageservers,
650 12514 : &self.preferred_az_id,
651 12514 : context,
652 12514 : )?;
653 12514 : self.intent.push_secondary(scheduler, node_id);
654 12514 : used_pageservers.push(node_id);
655 12514 : modified = true;
656 : }
657 : }
658 : Secondary => {
659 0 : if let Some(node_id) = self.intent.get_attached() {
660 0 : // Populate secondary by demoting the attached node
661 0 : self.intent.demote_attached(scheduler, *node_id);
662 0 : modified = true;
663 0 : } else if self.intent.secondary.is_empty() {
664 : // Populate secondary by scheduling a fresh node
665 0 : let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
666 0 : &[],
667 0 : &self.preferred_az_id,
668 0 : context,
669 0 : )?;
670 0 : self.intent.push_secondary(scheduler, node_id);
671 0 : modified = true;
672 0 : }
673 0 : while self.intent.secondary.len() > 1 {
674 0 : // We have no particular preference for one secondary location over another: just
675 0 : // arbitrarily drop from the end
676 0 : self.intent.pop_secondary(scheduler);
677 0 : modified = true;
678 0 : }
679 : }
680 : Detached => {
681 : // Never add locations in this mode
682 0 : if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
683 0 : self.intent.clear(scheduler);
684 0 : modified = true;
685 0 : }
686 : }
687 : }
688 :
689 12515 : if modified {
690 12515 : self.sequence.0 += 1;
691 12515 : }
692 :
693 12515 : Ok(())
694 12516 : }
695 :
696 : /// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
697 : /// if the swap is not possible and leaves the intent state in its original state.
698 : ///
699 : /// Arguments:
700 : /// `attached_to`: the currently attached location matching the intent state (may be None if the
701 : /// shard is not attached)
702 : /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
703 : /// the scheduler to recommend a node
704 0 : pub(crate) fn reschedule_to_secondary(
705 0 : &mut self,
706 0 : promote_to: Option<NodeId>,
707 0 : scheduler: &mut Scheduler,
708 0 : ) -> Result<(), ScheduleError> {
709 0 : let promote_to = match promote_to {
710 0 : Some(node) => node,
711 0 : None => match scheduler.node_preferred(self.intent.get_secondary()) {
712 0 : Some(node) => node,
713 : None => {
714 0 : return Err(ScheduleError::ImpossibleConstraint);
715 : }
716 : },
717 : };
718 :
719 0 : assert!(self.intent.get_secondary().contains(&promote_to));
720 :
721 0 : if let Some(node) = self.intent.get_attached() {
722 0 : let demoted = self.intent.demote_attached(scheduler, *node);
723 0 : if !demoted {
724 0 : return Err(ScheduleError::ImpossibleConstraint);
725 0 : }
726 0 : }
727 :
728 0 : self.intent.promote_attached(scheduler, promote_to);
729 0 :
730 0 : // Increment the sequence number for the edge case where a
731 0 : // reconciler is already running to avoid waiting on the
732 0 : // current reconcile instead of spawning a new one.
733 0 : self.sequence = self.sequence.next();
734 0 :
735 0 : Ok(())
736 0 : }
737 :
738 : /// Optimize attachments: if a shard has a secondary location that is preferable to
739 : /// its primary location based on soft constraints, switch that secondary location
740 : /// to be attached.
741 23 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
742 : pub(crate) fn optimize_attachment(
743 : &self,
744 : nodes: &HashMap<NodeId, Node>,
745 : schedule_context: &ScheduleContext,
746 : ) -> Option<ScheduleOptimization> {
747 : let attached = (*self.intent.get_attached())?;
748 : if self.intent.secondary.is_empty() {
749 : // We can only do useful work if we have both attached and secondary locations: this
750 : // function doesn't schedule new locations, only swaps between attached and secondaries.
751 : return None;
752 : }
753 :
754 : let current_affinity_score = schedule_context.get_node_affinity(attached);
755 : let current_attachment_count = schedule_context.get_node_attachments(attached);
756 :
757 : // Generate score for each node, dropping any un-schedulable nodes.
758 : let all_pageservers = self.intent.all_pageservers();
759 : let mut scores = all_pageservers
760 : .iter()
761 46 : .flat_map(|node_id| {
762 46 : let node = nodes.get(node_id);
763 46 : if node.is_none() {
764 0 : None
765 46 : } else if matches!(
766 46 : node.unwrap().get_scheduling(),
767 : NodeSchedulingPolicy::Filling
768 : ) {
769 : // If the node is currently filling, don't count it as a candidate to avoid,
770 : // racing with the background fill.
771 0 : None
772 46 : } else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
773 0 : None
774 : } else {
775 46 : let affinity_score = schedule_context.get_node_affinity(*node_id);
776 46 : let attachment_count = schedule_context.get_node_attachments(*node_id);
777 46 : Some((*node_id, affinity_score, attachment_count))
778 : }
779 46 : })
780 : .collect::<Vec<_>>();
781 :
782 : // Sort precedence:
783 : // 1st - prefer nodes with the lowest total affinity score
784 : // 2nd - prefer nodes with the lowest number of attachments in this context
785 : // 3rd - if all else is equal, sort by node ID for determinism in tests.
786 46 : scores.sort_by_key(|i| (i.1, i.2, i.0));
787 :
788 : if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
789 : scores.first()
790 : {
791 : if attached != *preferred_node {
792 : // The best alternative must be more than 1 better than us, otherwise we could end
793 : // up flapping back next time we're called (e.g. there's no point migrating from
794 : // a location with score 1 to a score zero, because on next location the situation
795 : // would be the same, but in reverse).
796 : if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
797 : || current_attachment_count > *preferred_attachment_count + 1
798 : {
799 : tracing::info!(
800 : "Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
801 : self.intent.get_secondary()
802 : );
803 : return Some(ScheduleOptimization {
804 : sequence: self.sequence,
805 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
806 : old_attached_node_id: attached,
807 : new_attached_node_id: *preferred_node,
808 : }),
809 : });
810 : }
811 : } else {
812 : tracing::debug!(
813 : "Node {} is already preferred (score {:?})",
814 : preferred_node,
815 : preferred_affinity_score
816 : );
817 : }
818 : }
819 :
820 : // Fall-through: we didn't find an optimization
821 : None
822 : }
823 :
824 20 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
825 : pub(crate) fn optimize_secondary(
826 : &self,
827 : scheduler: &mut Scheduler,
828 : schedule_context: &ScheduleContext,
829 : ) -> Option<ScheduleOptimization> {
830 : if self.intent.secondary.is_empty() {
831 : // We can only do useful work if we have both attached and secondary locations: this
832 : // function doesn't schedule new locations, only swaps between attached and secondaries.
833 : return None;
834 : }
835 :
836 : for secondary in self.intent.get_secondary() {
837 : let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
838 : // We're already on a node unaffected any affinity constraints,
839 : // so we won't change it.
840 : continue;
841 : };
842 :
843 : // Let the scheduler suggest a node, where it would put us if we were scheduling afresh
844 : // This implicitly limits the choice to nodes that are available, and prefers nodes
845 : // with lower utilization.
846 : let Ok(candidate_node) = scheduler.schedule_shard::<SecondaryShardTag>(
847 : &self.intent.all_pageservers(),
848 : &self.preferred_az_id,
849 : schedule_context,
850 : ) else {
851 : // A scheduling error means we have no possible candidate replacements
852 : continue;
853 : };
854 :
855 : let candidate_affinity_score = schedule_context
856 : .nodes
857 : .get(&candidate_node)
858 : .unwrap_or(&AffinityScore::FREE);
859 :
860 : // The best alternative must be more than 1 better than us, otherwise we could end
861 : // up flapping back next time we're called.
862 : if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
863 : // If some other node is available and has a lower score than this node, then
864 : // that other node is a good place to migrate to.
865 : tracing::info!(
866 : "Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
867 : self.intent.get_secondary()
868 : );
869 : return Some(ScheduleOptimization {
870 : sequence: self.sequence,
871 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
872 : old_node_id: *secondary,
873 : new_node_id: candidate_node,
874 : }),
875 : });
876 : }
877 : }
878 :
879 : None
880 : }
881 :
882 : /// Return true if the optimization was really applied: it will not be applied if the optimization's
883 : /// sequence is behind this tenant shard's
884 9 : pub(crate) fn apply_optimization(
885 9 : &mut self,
886 9 : scheduler: &mut Scheduler,
887 9 : optimization: ScheduleOptimization,
888 9 : ) -> bool {
889 9 : if optimization.sequence != self.sequence {
890 0 : return false;
891 9 : }
892 9 :
893 9 : metrics::METRICS_REGISTRY
894 9 : .metrics_group
895 9 : .storage_controller_schedule_optimization
896 9 : .inc();
897 9 :
898 9 : match optimization.action {
899 : ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
900 4 : old_attached_node_id,
901 4 : new_attached_node_id,
902 4 : }) => {
903 4 : self.intent.demote_attached(scheduler, old_attached_node_id);
904 4 : self.intent
905 4 : .promote_attached(scheduler, new_attached_node_id);
906 4 : }
907 : ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
908 5 : old_node_id,
909 5 : new_node_id,
910 5 : }) => {
911 5 : self.intent.remove_secondary(scheduler, old_node_id);
912 5 : self.intent.push_secondary(scheduler, new_node_id);
913 5 : }
914 : }
915 :
916 9 : true
917 9 : }
918 :
919 : /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
920 : /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
921 : /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
922 : ///
923 : /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
924 : /// funciton should not be used to decide whether to reconcile.
925 0 : pub(crate) fn stably_attached(&self) -> Option<NodeId> {
926 0 : if let Some(attach_intent) = self.intent.attached {
927 0 : match self.observed.locations.get(&attach_intent) {
928 0 : Some(loc) => match &loc.conf {
929 0 : Some(conf) => match conf.mode {
930 : LocationConfigMode::AttachedMulti
931 : | LocationConfigMode::AttachedSingle
932 : | LocationConfigMode::AttachedStale => {
933 : // Our intent and observed state agree that this node is in an attached state.
934 0 : Some(attach_intent)
935 : }
936 : // Our observed config is not an attached state
937 0 : _ => None,
938 : },
939 : // Our observed state is None, i.e. in flux
940 0 : None => None,
941 : },
942 : // We have no observed state for this node
943 0 : None => None,
944 : }
945 : } else {
946 : // Our intent is not to attach
947 0 : None
948 : }
949 0 : }
950 :
951 0 : fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
952 0 : let mut dirty_nodes = HashSet::new();
953 :
954 0 : if let Some(node_id) = self.intent.attached {
955 : // Maybe panic: it is a severe bug if we try to attach while generation is null.
956 0 : let generation = self
957 0 : .generation
958 0 : .expect("Attempted to enter attached state without a generation");
959 0 :
960 0 : let wanted_conf =
961 0 : attached_location_conf(generation, &self.shard, &self.config, &self.policy);
962 0 : match self.observed.locations.get(&node_id) {
963 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
964 0 : Some(_) | None => {
965 0 : dirty_nodes.insert(node_id);
966 0 : }
967 : }
968 0 : }
969 :
970 0 : for node_id in &self.intent.secondary {
971 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
972 0 : match self.observed.locations.get(node_id) {
973 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
974 0 : Some(_) | None => {
975 0 : dirty_nodes.insert(*node_id);
976 0 : }
977 : }
978 : }
979 :
980 0 : for node_id in self.observed.locations.keys() {
981 0 : if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
982 0 : // We have observed state that isn't part of our intent: need to clean it up.
983 0 : dirty_nodes.insert(*node_id);
984 0 : }
985 : }
986 :
987 0 : dirty_nodes.retain(|node_id| {
988 0 : nodes
989 0 : .get(node_id)
990 0 : .map(|n| n.is_available())
991 0 : .unwrap_or(false)
992 0 : });
993 0 :
994 0 : !dirty_nodes.is_empty()
995 0 : }
996 :
997 : #[allow(clippy::too_many_arguments)]
998 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
999 : pub(crate) fn get_reconcile_needed(
1000 : &mut self,
1001 : pageservers: &Arc<HashMap<NodeId, Node>>,
1002 : ) -> ReconcileNeeded {
1003 : // If there are any ambiguous observed states, and the nodes they refer to are available,
1004 : // we should reconcile to clean them up.
1005 : let mut dirty_observed = false;
1006 : for (node_id, observed_loc) in &self.observed.locations {
1007 : let node = pageservers
1008 : .get(node_id)
1009 : .expect("Nodes may not be removed while referenced");
1010 : if observed_loc.conf.is_none() && node.is_available() {
1011 : dirty_observed = true;
1012 : break;
1013 : }
1014 : }
1015 :
1016 : let active_nodes_dirty = self.dirty(pageservers);
1017 :
1018 : // Even if there is no pageserver work to be done, if we have a pending notification to computes,
1019 : // wake up a reconciler to send it.
1020 : let do_reconcile =
1021 : active_nodes_dirty || dirty_observed || self.pending_compute_notification;
1022 :
1023 : if !do_reconcile {
1024 : tracing::debug!("Not dirty, no reconciliation needed.");
1025 : return ReconcileNeeded::No;
1026 : }
1027 :
1028 : // If we are currently splitting, then never start a reconciler task: the splitting logic
1029 : // requires that shards are not interfered with while it runs. Do this check here rather than
1030 : // up top, so that we only log this message if we would otherwise have done a reconciliation.
1031 : if !matches!(self.splitting, SplitState::Idle) {
1032 : tracing::info!("Refusing to reconcile, splitting in progress");
1033 : return ReconcileNeeded::No;
1034 : }
1035 :
1036 : // Reconcile already in flight for the current sequence?
1037 : if let Some(handle) = &self.reconciler {
1038 : if handle.sequence == self.sequence {
1039 : tracing::info!(
1040 : "Reconciliation already in progress for sequence {:?}",
1041 : self.sequence,
1042 : );
1043 : return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
1044 : tenant_shard_id: self.tenant_shard_id,
1045 : seq_wait: self.waiter.clone(),
1046 : error_seq_wait: self.error_waiter.clone(),
1047 : error: self.last_error.clone(),
1048 : seq: self.sequence,
1049 : });
1050 : }
1051 : }
1052 :
1053 : // Pre-checks done: finally check whether we may actually do the work
1054 : match self.scheduling_policy {
1055 : ShardSchedulingPolicy::Active
1056 : | ShardSchedulingPolicy::Essential
1057 : | ShardSchedulingPolicy::Pause => {}
1058 : ShardSchedulingPolicy::Stop => {
1059 : // We only reach this point if there is work to do and we're going to skip
1060 : // doing it: warn it obvious why this tenant isn't doing what it ought to.
1061 : tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
1062 : return ReconcileNeeded::No;
1063 : }
1064 : }
1065 :
1066 : ReconcileNeeded::Yes
1067 : }
1068 :
1069 : /// Ensure the sequence number is set to a value where waiting for this value will make us wait
1070 : /// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
1071 : ///
1072 : /// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
1073 : /// that the waiter will not complete until some future Reconciler is constructed and run.
1074 0 : fn ensure_sequence_ahead(&mut self) {
1075 0 : // Find the highest sequence for which a Reconciler has previously run or is currently
1076 0 : // running
1077 0 : let max_seen = std::cmp::max(
1078 0 : self.reconciler
1079 0 : .as_ref()
1080 0 : .map(|r| r.sequence)
1081 0 : .unwrap_or(Sequence(0)),
1082 0 : std::cmp::max(self.waiter.load(), self.error_waiter.load()),
1083 0 : );
1084 0 :
1085 0 : if self.sequence <= max_seen {
1086 0 : self.sequence = max_seen.next();
1087 0 : }
1088 0 : }
1089 :
1090 : /// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
1091 : ///
1092 : /// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
1093 : /// you would like to wait on the next reconciler that gets spawned in the background.
1094 0 : pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
1095 0 : self.ensure_sequence_ahead();
1096 0 :
1097 0 : ReconcilerWaiter {
1098 0 : tenant_shard_id: self.tenant_shard_id,
1099 0 : seq_wait: self.waiter.clone(),
1100 0 : error_seq_wait: self.error_waiter.clone(),
1101 0 : error: self.last_error.clone(),
1102 0 : seq: self.sequence,
1103 0 : }
1104 0 : }
1105 :
1106 0 : async fn reconcile(
1107 0 : sequence: Sequence,
1108 0 : mut reconciler: Reconciler,
1109 0 : must_notify: bool,
1110 0 : ) -> ReconcileResult {
1111 : // Attempt to make observed state match intent state
1112 0 : let result = reconciler.reconcile().await;
1113 :
1114 : // If we know we had a pending compute notification from some previous action, send a notification irrespective
1115 : // of whether the above reconcile() did any work
1116 0 : if result.is_ok() && must_notify {
1117 : // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
1118 0 : reconciler.compute_notify().await.ok();
1119 0 : }
1120 :
1121 : // Update result counter
1122 0 : let outcome_label = match &result {
1123 0 : Ok(_) => ReconcileOutcome::Success,
1124 0 : Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
1125 0 : Err(_) => ReconcileOutcome::Error,
1126 : };
1127 :
1128 0 : metrics::METRICS_REGISTRY
1129 0 : .metrics_group
1130 0 : .storage_controller_reconcile_complete
1131 0 : .inc(ReconcileCompleteLabelGroup {
1132 0 : status: outcome_label,
1133 0 : });
1134 0 :
1135 0 : // Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
1136 0 : // try and schedule more work in response to our result.
1137 0 : ReconcileResult {
1138 0 : sequence,
1139 0 : result,
1140 0 : tenant_shard_id: reconciler.tenant_shard_id,
1141 0 : generation: reconciler.generation,
1142 0 : observed_deltas: reconciler.observed_deltas(),
1143 0 : pending_compute_notification: reconciler.compute_notify_failure,
1144 0 : }
1145 0 : }
1146 :
1147 : #[allow(clippy::too_many_arguments)]
1148 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1149 : pub(crate) fn spawn_reconciler(
1150 : &mut self,
1151 : result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
1152 : pageservers: &Arc<HashMap<NodeId, Node>>,
1153 : compute_hook: &Arc<ComputeHook>,
1154 : reconciler_config: ReconcilerConfig,
1155 : service_config: &service::Config,
1156 : persistence: &Arc<Persistence>,
1157 : units: ReconcileUnits,
1158 : gate_guard: GateGuard,
1159 : cancel: &CancellationToken,
1160 : ) -> Option<ReconcilerWaiter> {
1161 : // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
1162 : // doing our sequence's work.
1163 : let old_handle = self.reconciler.take();
1164 :
1165 : // Build list of nodes from which the reconciler should detach
1166 : let mut detach = Vec::new();
1167 : for node_id in self.observed.locations.keys() {
1168 : if self.intent.get_attached() != &Some(*node_id)
1169 : && !self.intent.secondary.contains(node_id)
1170 : {
1171 : detach.push(
1172 : pageservers
1173 : .get(node_id)
1174 : .expect("Intent references non-existent pageserver")
1175 : .clone(),
1176 : )
1177 : }
1178 : }
1179 :
1180 : // Advance the sequence before spawning a reconciler, so that sequence waiters
1181 : // can distinguish between before+after the reconcile completes.
1182 : self.ensure_sequence_ahead();
1183 :
1184 : let reconciler_cancel = cancel.child_token();
1185 : let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
1186 : let reconciler = Reconciler {
1187 : tenant_shard_id: self.tenant_shard_id,
1188 : shard: self.shard,
1189 : placement_policy: self.policy.clone(),
1190 : generation: self.generation,
1191 : intent: reconciler_intent,
1192 : detach,
1193 : reconciler_config,
1194 : config: self.config.clone(),
1195 : observed: self.observed.clone(),
1196 : original_observed: self.observed.clone(),
1197 : compute_hook: compute_hook.clone(),
1198 : service_config: service_config.clone(),
1199 : _gate_guard: gate_guard,
1200 : _resource_units: units,
1201 : cancel: reconciler_cancel.clone(),
1202 : persistence: persistence.clone(),
1203 : compute_notify_failure: false,
1204 : };
1205 :
1206 : let reconcile_seq = self.sequence;
1207 : let long_reconcile_threshold = service_config.long_reconcile_threshold;
1208 :
1209 : tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
1210 : let must_notify = self.pending_compute_notification;
1211 : let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
1212 : tenant_id=%reconciler.tenant_shard_id.tenant_id,
1213 : shard_id=%reconciler.tenant_shard_id.shard_slug());
1214 : metrics::METRICS_REGISTRY
1215 : .metrics_group
1216 : .storage_controller_reconcile_spawn
1217 : .inc();
1218 : let result_tx = result_tx.clone();
1219 : let join_handle = tokio::task::spawn(
1220 0 : async move {
1221 : // Wait for any previous reconcile task to complete before we start
1222 0 : if let Some(old_handle) = old_handle {
1223 0 : old_handle.cancel.cancel();
1224 0 : if let Err(e) = old_handle.handle.await {
1225 : // We can't do much with this other than log it: the task is done, so
1226 : // we may proceed with our work.
1227 0 : tracing::error!("Unexpected join error waiting for reconcile task: {e}");
1228 0 : }
1229 0 : }
1230 :
1231 : // Early check for cancellation before doing any work
1232 : // TODO: wrap all remote API operations in cancellation check
1233 : // as well.
1234 0 : if reconciler.cancel.is_cancelled() {
1235 0 : metrics::METRICS_REGISTRY
1236 0 : .metrics_group
1237 0 : .storage_controller_reconcile_complete
1238 0 : .inc(ReconcileCompleteLabelGroup {
1239 0 : status: ReconcileOutcome::Cancel,
1240 0 : });
1241 0 : return;
1242 0 : }
1243 0 :
1244 0 : let (tenant_id_label, shard_number_label, sequence_label) = {
1245 0 : (
1246 0 : reconciler.tenant_shard_id.tenant_id.to_string(),
1247 0 : reconciler.tenant_shard_id.shard_number.0.to_string(),
1248 0 : reconcile_seq.to_string(),
1249 0 : )
1250 0 : };
1251 0 :
1252 0 : let label_group = ReconcileLongRunningLabelGroup {
1253 0 : tenant_id: &tenant_id_label,
1254 0 : shard_number: &shard_number_label,
1255 0 : sequence: &sequence_label,
1256 0 : };
1257 0 :
1258 0 : let reconcile_fut = Self::reconcile(reconcile_seq, reconciler, must_notify);
1259 0 : let long_reconcile_fut = {
1260 0 : let label_group = label_group.clone();
1261 0 : async move {
1262 0 : tokio::time::sleep(long_reconcile_threshold).await;
1263 :
1264 0 : tracing::warn!("Reconcile passed the long running threshold of {long_reconcile_threshold:?}");
1265 :
1266 0 : metrics::METRICS_REGISTRY
1267 0 : .metrics_group
1268 0 : .storage_controller_reconcile_long_running
1269 0 : .inc(label_group);
1270 0 : }
1271 : };
1272 :
1273 0 : let reconcile_fut = std::pin::pin!(reconcile_fut);
1274 0 : let long_reconcile_fut = std::pin::pin!(long_reconcile_fut);
1275 :
1276 0 : let (was_long, result) =
1277 0 : match future::select(reconcile_fut, long_reconcile_fut).await {
1278 0 : Either::Left((reconcile_result, _)) => (false, reconcile_result),
1279 0 : Either::Right((_, reconcile_fut)) => (true, reconcile_fut.await),
1280 : };
1281 :
1282 0 : if was_long {
1283 0 : let id = metrics::METRICS_REGISTRY
1284 0 : .metrics_group
1285 0 : .storage_controller_reconcile_long_running
1286 0 : .with_labels(label_group);
1287 0 : metrics::METRICS_REGISTRY
1288 0 : .metrics_group
1289 0 : .storage_controller_reconcile_long_running
1290 0 : .remove_metric(id);
1291 0 : }
1292 :
1293 0 : result_tx
1294 0 : .send(ReconcileResultRequest::ReconcileResult(result))
1295 0 : .ok();
1296 0 : }
1297 : .instrument(reconciler_span),
1298 : );
1299 :
1300 : self.reconciler = Some(ReconcilerHandle {
1301 : sequence: self.sequence,
1302 : handle: join_handle,
1303 : cancel: reconciler_cancel,
1304 : });
1305 :
1306 : Some(ReconcilerWaiter {
1307 : tenant_shard_id: self.tenant_shard_id,
1308 : seq_wait: self.waiter.clone(),
1309 : error_seq_wait: self.error_waiter.clone(),
1310 : error: self.last_error.clone(),
1311 : seq: self.sequence,
1312 : })
1313 : }
1314 :
1315 : /// Get a waiter for any reconciliation in flight, but do not start reconciliation
1316 : /// if it is not already running
1317 0 : pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
1318 0 : if self.reconciler.is_some() {
1319 0 : Some(ReconcilerWaiter {
1320 0 : tenant_shard_id: self.tenant_shard_id,
1321 0 : seq_wait: self.waiter.clone(),
1322 0 : error_seq_wait: self.error_waiter.clone(),
1323 0 : error: self.last_error.clone(),
1324 0 : seq: self.sequence,
1325 0 : })
1326 : } else {
1327 0 : None
1328 : }
1329 0 : }
1330 :
1331 : /// Called when a ReconcileResult has been emitted and the service is updating
1332 : /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
1333 : /// the handle to indicate there is no longer a reconciliation in progress.
1334 0 : pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
1335 0 : if let Some(reconcile_handle) = &self.reconciler {
1336 0 : if reconcile_handle.sequence <= sequence {
1337 0 : self.reconciler = None;
1338 0 : }
1339 0 : }
1340 0 : }
1341 :
1342 : /// If we had any state at all referring to this node ID, drop it. Does not
1343 : /// attempt to reschedule.
1344 : ///
1345 : /// Returns true if we modified the node's intent state.
1346 0 : pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
1347 0 : let mut intent_modified = false;
1348 0 :
1349 0 : // Drop if this node was our attached intent
1350 0 : if self.intent.attached == Some(node_id) {
1351 0 : self.intent.attached = None;
1352 0 : intent_modified = true;
1353 0 : }
1354 :
1355 : // Drop from the list of secondaries, and check if we modified it
1356 0 : let had_secondaries = self.intent.secondary.len();
1357 0 : self.intent.secondary.retain(|n| n != &node_id);
1358 0 : intent_modified |= self.intent.secondary.len() != had_secondaries;
1359 0 :
1360 0 : debug_assert!(!self.intent.all_pageservers().contains(&node_id));
1361 :
1362 0 : intent_modified
1363 0 : }
1364 :
1365 0 : pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
1366 0 : self.scheduling_policy = p;
1367 0 : }
1368 :
1369 0 : pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
1370 0 : &self.scheduling_policy
1371 0 : }
1372 :
1373 0 : pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
1374 0 : // Ordering: always set last_error before advancing sequence, so that sequence
1375 0 : // waiters are guaranteed to see a Some value when they see an error.
1376 0 : *(self.last_error.lock().unwrap()) = Some(Arc::new(error));
1377 0 : self.error_waiter.advance(sequence);
1378 0 : }
1379 :
1380 0 : pub(crate) fn from_persistent(
1381 0 : tsp: TenantShardPersistence,
1382 0 : intent: IntentState,
1383 0 : ) -> anyhow::Result<Self> {
1384 0 : let tenant_shard_id = tsp.get_tenant_shard_id()?;
1385 0 : let shard_identity = tsp.get_shard_identity()?;
1386 :
1387 0 : Ok(Self {
1388 0 : tenant_shard_id,
1389 0 : shard: shard_identity,
1390 0 : sequence: Sequence::initial(),
1391 0 : generation: tsp.generation.map(|g| Generation::new(g as u32)),
1392 0 : policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
1393 0 : intent,
1394 0 : observed: ObservedState::new(),
1395 0 : config: serde_json::from_str(&tsp.config).unwrap(),
1396 0 : reconciler: None,
1397 0 : splitting: tsp.splitting,
1398 0 : waiter: Arc::new(SeqWait::new(Sequence::initial())),
1399 0 : error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
1400 0 : last_error: Arc::default(),
1401 0 : pending_compute_notification: false,
1402 0 : delayed_reconcile: false,
1403 0 : scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
1404 0 : preferred_az_id: tsp.preferred_az_id.map(AvailabilityZone),
1405 0 : })
1406 0 : }
1407 :
1408 0 : pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
1409 0 : TenantShardPersistence {
1410 0 : tenant_id: self.tenant_shard_id.tenant_id.to_string(),
1411 0 : shard_number: self.tenant_shard_id.shard_number.0 as i32,
1412 0 : shard_count: self.tenant_shard_id.shard_count.literal() as i32,
1413 0 : shard_stripe_size: self.shard.stripe_size.0 as i32,
1414 0 : generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
1415 0 : generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
1416 0 : placement_policy: serde_json::to_string(&self.policy).unwrap(),
1417 0 : config: serde_json::to_string(&self.config).unwrap(),
1418 0 : splitting: SplitState::default(),
1419 0 : scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
1420 0 : preferred_az_id: self.preferred_az_id.as_ref().map(|az| az.0.clone()),
1421 0 : }
1422 0 : }
1423 :
1424 12500 : pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> {
1425 12500 : self.preferred_az_id.as_ref()
1426 12500 : }
1427 :
1428 12500 : pub(crate) fn set_preferred_az(&mut self, preferred_az_id: AvailabilityZone) {
1429 12500 : self.preferred_az_id = Some(preferred_az_id);
1430 12500 : }
1431 :
1432 : /// Returns all the nodes to which this tenant shard is attached according to the
1433 : /// observed state and the generations. Return vector is sorted from latest generation
1434 : /// to earliest.
1435 0 : pub(crate) fn attached_locations(&self) -> Vec<(NodeId, Generation)> {
1436 0 : self.observed
1437 0 : .locations
1438 0 : .iter()
1439 0 : .filter_map(|(node_id, observed)| {
1440 : use LocationConfigMode::{AttachedMulti, AttachedSingle, AttachedStale};
1441 :
1442 0 : let conf = observed.conf.as_ref()?;
1443 :
1444 0 : match (conf.generation, conf.mode) {
1445 0 : (Some(gen), AttachedMulti | AttachedSingle | AttachedStale) => {
1446 0 : Some((*node_id, gen))
1447 : }
1448 0 : _ => None,
1449 : }
1450 0 : })
1451 0 : .sorted_by(|(_lhs_node_id, lhs_gen), (_rhs_node_id, rhs_gen)| {
1452 0 : lhs_gen.cmp(rhs_gen).reverse()
1453 0 : })
1454 0 : .map(|(node_id, gen)| (node_id, Generation::new(gen)))
1455 0 : .collect()
1456 0 : }
1457 :
1458 : /// Update the observed state of the tenant by applying incremental deltas
1459 : ///
1460 : /// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
1461 : /// They are then filtered in [`crate::service::Service::process_result`].
1462 0 : pub(crate) fn apply_observed_deltas(
1463 0 : &mut self,
1464 0 : deltas: impl Iterator<Item = ObservedStateDelta>,
1465 0 : ) {
1466 0 : for delta in deltas {
1467 0 : match delta {
1468 0 : ObservedStateDelta::Upsert(ups) => {
1469 0 : let (node_id, loc) = *ups;
1470 0 :
1471 0 : // If the generation of the observed location in the delta is lagging
1472 0 : // behind the current one, then we have a race condition and cannot
1473 0 : // be certain about the true observed state. Set the observed state
1474 0 : // to None in order to reflect this.
1475 0 : let crnt_gen = self
1476 0 : .observed
1477 0 : .locations
1478 0 : .get(&node_id)
1479 0 : .and_then(|loc| loc.conf.as_ref())
1480 0 : .and_then(|conf| conf.generation);
1481 0 : let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
1482 0 : match (crnt_gen, new_gen) {
1483 0 : (Some(crnt), Some(new)) if crnt_gen > new_gen => {
1484 0 : tracing::warn!(
1485 0 : "Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
1486 : node_id, loc, crnt, new
1487 : );
1488 :
1489 0 : self.observed
1490 0 : .locations
1491 0 : .insert(node_id, ObservedStateLocation { conf: None });
1492 0 :
1493 0 : continue;
1494 : }
1495 0 : _ => {}
1496 : }
1497 :
1498 0 : if let Some(conf) = &loc.conf {
1499 0 : tracing::info!("Updating observed location {}: {:?}", node_id, conf);
1500 : } else {
1501 0 : tracing::info!("Setting observed location {} to None", node_id,)
1502 : }
1503 :
1504 0 : self.observed.locations.insert(node_id, loc);
1505 : }
1506 0 : ObservedStateDelta::Delete(node_id) => {
1507 0 : tracing::info!("Deleting observed location {}", node_id);
1508 0 : self.observed.locations.remove(&node_id);
1509 : }
1510 : }
1511 : }
1512 0 : }
1513 : }
1514 :
1515 : #[cfg(test)]
1516 : pub(crate) mod tests {
1517 : use std::{cell::RefCell, rc::Rc};
1518 :
1519 : use pageserver_api::{
1520 : controller_api::NodeAvailability,
1521 : shard::{ShardCount, ShardNumber},
1522 : };
1523 : use rand::{rngs::StdRng, SeedableRng};
1524 : use utils::id::TenantId;
1525 :
1526 : use crate::scheduler::test_utils::make_test_nodes;
1527 :
1528 : use super::*;
1529 :
1530 7 : fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
1531 7 : let tenant_id = TenantId::generate();
1532 7 : let shard_number = ShardNumber(0);
1533 7 : let shard_count = ShardCount::new(1);
1534 7 :
1535 7 : let tenant_shard_id = TenantShardId {
1536 7 : tenant_id,
1537 7 : shard_number,
1538 7 : shard_count,
1539 7 : };
1540 7 : TenantShard::new(
1541 7 : tenant_shard_id,
1542 7 : ShardIdentity::new(
1543 7 : shard_number,
1544 7 : shard_count,
1545 7 : pageserver_api::shard::ShardStripeSize(32768),
1546 7 : )
1547 7 : .unwrap(),
1548 7 : policy,
1549 7 : )
1550 7 : }
1551 :
1552 5003 : fn make_test_tenant(
1553 5003 : policy: PlacementPolicy,
1554 5003 : shard_count: ShardCount,
1555 5003 : preferred_az: Option<AvailabilityZone>,
1556 5003 : ) -> Vec<TenantShard> {
1557 5003 : let tenant_id = TenantId::generate();
1558 5003 :
1559 5003 : (0..shard_count.count())
1560 12512 : .map(|i| {
1561 12512 : let shard_number = ShardNumber(i);
1562 12512 :
1563 12512 : let tenant_shard_id = TenantShardId {
1564 12512 : tenant_id,
1565 12512 : shard_number,
1566 12512 : shard_count,
1567 12512 : };
1568 12512 : let mut ts = TenantShard::new(
1569 12512 : tenant_shard_id,
1570 12512 : ShardIdentity::new(
1571 12512 : shard_number,
1572 12512 : shard_count,
1573 12512 : pageserver_api::shard::ShardStripeSize(32768),
1574 12512 : )
1575 12512 : .unwrap(),
1576 12512 : policy.clone(),
1577 12512 : );
1578 :
1579 12512 : if let Some(az) = &preferred_az {
1580 12500 : ts.set_preferred_az(az.clone());
1581 12500 : }
1582 :
1583 12512 : ts
1584 12512 : })
1585 5003 : .collect()
1586 5003 : }
1587 :
1588 : /// Test the scheduling behaviors used when a tenant configured for HA is subject
1589 : /// to nodes being marked offline.
1590 : #[test]
1591 1 : fn tenant_ha_scheduling() -> anyhow::Result<()> {
1592 1 : // Start with three nodes. Our tenant will only use two. The third one is
1593 1 : // expected to remain unused.
1594 1 : let mut nodes = make_test_nodes(3, &[]);
1595 1 :
1596 1 : let mut scheduler = Scheduler::new(nodes.values());
1597 1 : let mut context = ScheduleContext::default();
1598 1 :
1599 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1600 1 : tenant_shard
1601 1 : .schedule(&mut scheduler, &mut context)
1602 1 : .expect("we have enough nodes, scheduling should work");
1603 1 :
1604 1 : // Expect to initially be schedule on to different nodes
1605 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
1606 1 : assert!(tenant_shard.intent.attached.is_some());
1607 :
1608 1 : let attached_node_id = tenant_shard.intent.attached.unwrap();
1609 1 : let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
1610 1 : assert_ne!(attached_node_id, secondary_node_id);
1611 :
1612 : // Notifying the attached node is offline should demote it to a secondary
1613 1 : let changed = tenant_shard
1614 1 : .intent
1615 1 : .demote_attached(&mut scheduler, attached_node_id);
1616 1 : assert!(changed);
1617 1 : assert!(tenant_shard.intent.attached.is_none());
1618 1 : assert_eq!(tenant_shard.intent.secondary.len(), 2);
1619 :
1620 : // Update the scheduler state to indicate the node is offline
1621 1 : nodes
1622 1 : .get_mut(&attached_node_id)
1623 1 : .unwrap()
1624 1 : .set_availability(NodeAvailability::Offline);
1625 1 : scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
1626 1 :
1627 1 : // Scheduling the node should promote the still-available secondary node to attached
1628 1 : tenant_shard
1629 1 : .schedule(&mut scheduler, &mut context)
1630 1 : .expect("active nodes are available");
1631 1 : assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
1632 :
1633 : // The original attached node should have been retained as a secondary
1634 1 : assert_eq!(
1635 1 : *tenant_shard.intent.secondary.iter().last().unwrap(),
1636 1 : attached_node_id
1637 1 : );
1638 :
1639 1 : tenant_shard.intent.clear(&mut scheduler);
1640 1 :
1641 1 : Ok(())
1642 1 : }
1643 :
1644 : #[test]
1645 1 : fn intent_from_observed() -> anyhow::Result<()> {
1646 1 : let nodes = make_test_nodes(3, &[]);
1647 1 : let mut scheduler = Scheduler::new(nodes.values());
1648 1 :
1649 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1650 1 :
1651 1 : tenant_shard.observed.locations.insert(
1652 1 : NodeId(3),
1653 1 : ObservedStateLocation {
1654 1 : conf: Some(LocationConfig {
1655 1 : mode: LocationConfigMode::AttachedMulti,
1656 1 : generation: Some(2),
1657 1 : secondary_conf: None,
1658 1 : shard_number: tenant_shard.shard.number.0,
1659 1 : shard_count: tenant_shard.shard.count.literal(),
1660 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1661 1 : tenant_conf: TenantConfig::default(),
1662 1 : }),
1663 1 : },
1664 1 : );
1665 1 :
1666 1 : tenant_shard.observed.locations.insert(
1667 1 : NodeId(2),
1668 1 : ObservedStateLocation {
1669 1 : conf: Some(LocationConfig {
1670 1 : mode: LocationConfigMode::AttachedStale,
1671 1 : generation: Some(1),
1672 1 : secondary_conf: None,
1673 1 : shard_number: tenant_shard.shard.number.0,
1674 1 : shard_count: tenant_shard.shard.count.literal(),
1675 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1676 1 : tenant_conf: TenantConfig::default(),
1677 1 : }),
1678 1 : },
1679 1 : );
1680 1 :
1681 1 : tenant_shard.intent_from_observed(&mut scheduler);
1682 1 :
1683 1 : // The highest generationed attached location gets used as attached
1684 1 : assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
1685 : // Other locations get used as secondary
1686 1 : assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
1687 :
1688 1 : scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
1689 :
1690 1 : tenant_shard.intent.clear(&mut scheduler);
1691 1 : Ok(())
1692 1 : }
1693 :
1694 : #[test]
1695 1 : fn scheduling_mode() -> anyhow::Result<()> {
1696 1 : let nodes = make_test_nodes(3, &[]);
1697 1 : let mut scheduler = Scheduler::new(nodes.values());
1698 1 :
1699 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1700 1 :
1701 1 : // In pause mode, schedule() shouldn't do anything
1702 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
1703 1 : assert!(tenant_shard
1704 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
1705 1 : .is_ok());
1706 1 : assert!(tenant_shard.intent.all_pageservers().is_empty());
1707 :
1708 : // In active mode, schedule() works
1709 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
1710 1 : assert!(tenant_shard
1711 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
1712 1 : .is_ok());
1713 1 : assert!(!tenant_shard.intent.all_pageservers().is_empty());
1714 :
1715 1 : tenant_shard.intent.clear(&mut scheduler);
1716 1 : Ok(())
1717 1 : }
1718 :
1719 : #[test]
1720 1 : fn optimize_attachment() -> anyhow::Result<()> {
1721 1 : let nodes = make_test_nodes(3, &[]);
1722 1 : let mut scheduler = Scheduler::new(nodes.values());
1723 1 :
1724 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
1725 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
1726 1 :
1727 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
1728 1 : // on different nodes.
1729 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1730 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
1731 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1732 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
1733 1 :
1734 1 : let mut schedule_context = ScheduleContext::default();
1735 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
1736 1 : schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
1737 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
1738 1 : schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
1739 1 :
1740 1 : let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
1741 1 :
1742 1 : // Either shard should recognize that it has the option to switch to a secondary location where there
1743 1 : // would be no other shards from the same tenant, and request to do so.
1744 1 : assert_eq!(
1745 1 : optimization_a,
1746 1 : Some(ScheduleOptimization {
1747 1 : sequence: shard_a.sequence,
1748 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1749 1 : old_attached_node_id: NodeId(1),
1750 1 : new_attached_node_id: NodeId(2)
1751 1 : })
1752 1 : })
1753 1 : );
1754 :
1755 : // Note that these optimizing two shards in the same tenant with the same ScheduleContext is
1756 : // mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
1757 : // of [`Service::optimize_all`] to avoid trying
1758 : // to do optimizations for multiple shards in the same tenant at the same time. Generating
1759 : // both optimizations is just done for test purposes
1760 1 : let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
1761 1 : assert_eq!(
1762 1 : optimization_b,
1763 1 : Some(ScheduleOptimization {
1764 1 : sequence: shard_b.sequence,
1765 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1766 1 : old_attached_node_id: NodeId(1),
1767 1 : new_attached_node_id: NodeId(3)
1768 1 : })
1769 1 : })
1770 1 : );
1771 :
1772 : // Applying these optimizations should result in the end state proposed
1773 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
1774 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
1775 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
1776 1 : shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
1777 1 : assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
1778 1 : assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
1779 :
1780 1 : shard_a.intent.clear(&mut scheduler);
1781 1 : shard_b.intent.clear(&mut scheduler);
1782 1 :
1783 1 : Ok(())
1784 1 : }
1785 :
1786 : #[test]
1787 1 : fn optimize_secondary() -> anyhow::Result<()> {
1788 1 : let nodes = make_test_nodes(4, &[]);
1789 1 : let mut scheduler = Scheduler::new(nodes.values());
1790 1 :
1791 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
1792 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
1793 1 :
1794 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
1795 1 : // on different nodes.
1796 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1797 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
1798 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
1799 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
1800 1 :
1801 1 : let mut schedule_context = ScheduleContext::default();
1802 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
1803 1 : schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
1804 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
1805 1 : schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
1806 1 :
1807 1 : let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);
1808 1 :
1809 1 : // Since there is a node with no locations available, the node with two locations for the
1810 1 : // same tenant should generate an optimization to move one away
1811 1 : assert_eq!(
1812 1 : optimization_a,
1813 1 : Some(ScheduleOptimization {
1814 1 : sequence: shard_a.sequence,
1815 1 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
1816 1 : old_node_id: NodeId(3),
1817 1 : new_node_id: NodeId(4)
1818 1 : })
1819 1 : })
1820 1 : );
1821 :
1822 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
1823 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
1824 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
1825 :
1826 1 : shard_a.intent.clear(&mut scheduler);
1827 1 : shard_b.intent.clear(&mut scheduler);
1828 1 :
1829 1 : Ok(())
1830 1 : }
1831 :
1832 : // Optimize til quiescent: this emulates what Service::optimize_all does, when
1833 : // called repeatedly in the background.
1834 : // Returns the applied optimizations
1835 3 : fn optimize_til_idle(
1836 3 : nodes: &HashMap<NodeId, Node>,
1837 3 : scheduler: &mut Scheduler,
1838 3 : shards: &mut [TenantShard],
1839 3 : ) -> Vec<ScheduleOptimization> {
1840 3 : let mut loop_n = 0;
1841 3 : let mut optimizations = Vec::default();
1842 : loop {
1843 9 : let mut schedule_context = ScheduleContext::default();
1844 9 : let mut any_changed = false;
1845 :
1846 36 : for shard in shards.iter() {
1847 36 : schedule_context.avoid(&shard.intent.all_pageservers());
1848 36 : if let Some(attached) = shard.intent.get_attached() {
1849 36 : schedule_context.push_attached(*attached);
1850 36 : }
1851 : }
1852 :
1853 21 : for shard in shards.iter_mut() {
1854 21 : let optimization = shard.optimize_attachment(nodes, &schedule_context);
1855 21 : if let Some(optimization) = optimization {
1856 2 : optimizations.push(optimization.clone());
1857 2 : shard.apply_optimization(scheduler, optimization);
1858 2 : any_changed = true;
1859 2 : break;
1860 19 : }
1861 19 :
1862 19 : let optimization = shard.optimize_secondary(scheduler, &schedule_context);
1863 19 : if let Some(optimization) = optimization {
1864 4 : optimizations.push(optimization.clone());
1865 4 : shard.apply_optimization(scheduler, optimization);
1866 4 : any_changed = true;
1867 4 : break;
1868 15 : }
1869 : }
1870 :
1871 9 : if !any_changed {
1872 3 : break;
1873 6 : }
1874 6 :
1875 6 : // Assert no infinite loop
1876 6 : loop_n += 1;
1877 6 : assert!(loop_n < 1000);
1878 : }
1879 :
1880 3 : optimizations
1881 3 : }
1882 :
1883 : /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
1884 : /// that it converges.
1885 : #[test]
1886 1 : fn optimize_add_nodes() -> anyhow::Result<()> {
1887 1 : let nodes = make_test_nodes(4, &[]);
1888 1 :
1889 1 : // Only show the scheduler a couple of nodes
1890 1 : let mut scheduler = Scheduler::new([].iter());
1891 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1892 1 : scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
1893 1 :
1894 1 : let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
1895 1 : let mut schedule_context = ScheduleContext::default();
1896 5 : for shard in &mut shards {
1897 4 : assert!(shard
1898 4 : .schedule(&mut scheduler, &mut schedule_context)
1899 4 : .is_ok());
1900 : }
1901 :
1902 : // We should see equal number of locations on the two nodes.
1903 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
1904 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
1905 :
1906 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
1907 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
1908 :
1909 : // Add another two nodes: we should see the shards spread out when their optimize
1910 : // methods are called
1911 1 : scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
1912 1 : scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
1913 1 : optimize_til_idle(&nodes, &mut scheduler, &mut shards);
1914 1 :
1915 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
1916 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
1917 :
1918 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
1919 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
1920 :
1921 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
1922 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1);
1923 :
1924 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
1925 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1);
1926 :
1927 4 : for shard in shards.iter_mut() {
1928 4 : shard.intent.clear(&mut scheduler);
1929 4 : }
1930 :
1931 1 : Ok(())
1932 1 : }
1933 :
1934 : /// Test that initial shard scheduling is optimal. By optimal we mean
1935 : /// that the optimizer cannot find a way to improve it.
1936 : ///
1937 : /// This test is an example of the scheduling issue described in
1938 : /// https://github.com/neondatabase/neon/issues/8969
1939 : #[test]
1940 1 : fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
1941 : use itertools::Itertools;
1942 :
1943 1 : let nodes = make_test_nodes(2, &[]);
1944 1 :
1945 1 : let mut scheduler = Scheduler::new([].iter());
1946 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1947 1 : scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
1948 1 :
1949 1 : let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
1950 1 : let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
1951 1 :
1952 1 : let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
1953 1 : let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
1954 1 :
1955 4 : let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
1956 4 : let b_shards_with_context = b.iter_mut().map(|shard| (shard, b_context.clone()));
1957 1 :
1958 1 : let schedule_order = a_shards_with_context.interleave(b_shards_with_context);
1959 :
1960 9 : for (shard, context) in schedule_order {
1961 8 : let context = &mut *context.borrow_mut();
1962 8 : shard.schedule(&mut scheduler, context).unwrap();
1963 8 : }
1964 :
1965 1 : let applied_to_a = optimize_til_idle(&nodes, &mut scheduler, &mut a);
1966 1 : assert_eq!(applied_to_a, vec![]);
1967 :
1968 1 : let applied_to_b = optimize_til_idle(&nodes, &mut scheduler, &mut b);
1969 1 : assert_eq!(applied_to_b, vec![]);
1970 :
1971 8 : for shard in a.iter_mut().chain(b.iter_mut()) {
1972 8 : shard.intent.clear(&mut scheduler);
1973 8 : }
1974 :
1975 1 : Ok(())
1976 1 : }
1977 :
1978 : #[test]
1979 1 : fn random_az_shard_scheduling() -> anyhow::Result<()> {
1980 : use rand::seq::SliceRandom;
1981 :
1982 51 : for seed in 0..50 {
1983 50 : eprintln!("Running test with seed {seed}");
1984 50 : let mut rng = StdRng::seed_from_u64(seed);
1985 50 :
1986 50 : let az_a_tag = AvailabilityZone("az-a".to_string());
1987 50 : let az_b_tag = AvailabilityZone("az-b".to_string());
1988 50 : let azs = [az_a_tag, az_b_tag];
1989 50 : let nodes = make_test_nodes(4, &azs);
1990 50 : let mut shards_per_az: HashMap<AvailabilityZone, u32> = HashMap::new();
1991 50 :
1992 50 : let mut scheduler = Scheduler::new([].iter());
1993 200 : for node in nodes.values() {
1994 200 : scheduler.node_upsert(node);
1995 200 : }
1996 :
1997 50 : let mut shards = Vec::default();
1998 50 : let mut contexts = Vec::default();
1999 50 : let mut az_picker = azs.iter().cycle().cloned();
2000 5050 : for i in 0..100 {
2001 5000 : let az = az_picker.next().unwrap();
2002 5000 : let shard_count = i % 4 + 1;
2003 5000 : *shards_per_az.entry(az.clone()).or_default() += shard_count;
2004 5000 :
2005 5000 : let tenant_shards = make_test_tenant(
2006 5000 : PlacementPolicy::Attached(1),
2007 5000 : ShardCount::new(shard_count.try_into().unwrap()),
2008 5000 : Some(az),
2009 5000 : );
2010 5000 : let context = Rc::new(RefCell::new(ScheduleContext::default()));
2011 5000 :
2012 5000 : contexts.push(context.clone());
2013 5000 : let with_ctx = tenant_shards
2014 5000 : .into_iter()
2015 12500 : .map(|shard| (shard, context.clone()));
2016 17500 : for shard_with_ctx in with_ctx {
2017 12500 : shards.push(shard_with_ctx);
2018 12500 : }
2019 : }
2020 :
2021 50 : shards.shuffle(&mut rng);
2022 :
2023 : #[derive(Default, Debug)]
2024 : struct NodeStats {
2025 : attachments: u32,
2026 : secondaries: u32,
2027 : }
2028 :
2029 50 : let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::default();
2030 50 : let mut attachments_in_wrong_az = 0;
2031 50 : let mut secondaries_in_wrong_az = 0;
2032 :
2033 12550 : for (shard, context) in &mut shards {
2034 12500 : let context = &mut *context.borrow_mut();
2035 12500 : shard.schedule(&mut scheduler, context).unwrap();
2036 12500 :
2037 12500 : let attached_node = shard.intent.get_attached().unwrap();
2038 12500 : let stats = node_stats.entry(attached_node).or_default();
2039 12500 : stats.attachments += 1;
2040 12500 :
2041 12500 : let secondary_node = *shard.intent.get_secondary().first().unwrap();
2042 12500 : let stats = node_stats.entry(secondary_node).or_default();
2043 12500 : stats.secondaries += 1;
2044 12500 :
2045 12500 : let attached_node_az = nodes
2046 12500 : .get(&attached_node)
2047 12500 : .unwrap()
2048 12500 : .get_availability_zone_id();
2049 12500 : let secondary_node_az = nodes
2050 12500 : .get(&secondary_node)
2051 12500 : .unwrap()
2052 12500 : .get_availability_zone_id();
2053 12500 : let preferred_az = shard.preferred_az().unwrap();
2054 12500 :
2055 12500 : if attached_node_az != preferred_az {
2056 0 : eprintln!(
2057 0 : "{} attachment was scheduled in AZ {} but preferred AZ {}",
2058 0 : shard.tenant_shard_id, attached_node_az, preferred_az
2059 0 : );
2060 0 : attachments_in_wrong_az += 1;
2061 12500 : }
2062 :
2063 12500 : if secondary_node_az == preferred_az {
2064 0 : eprintln!(
2065 0 : "{} secondary was scheduled in AZ {} which matches preference",
2066 0 : shard.tenant_shard_id, attached_node_az
2067 0 : );
2068 0 : secondaries_in_wrong_az += 1;
2069 12500 : }
2070 : }
2071 :
2072 50 : let mut violations = Vec::default();
2073 50 :
2074 50 : if attachments_in_wrong_az > 0 {
2075 0 : violations.push(format!(
2076 0 : "{} attachments scheduled to the incorrect AZ",
2077 0 : attachments_in_wrong_az
2078 0 : ));
2079 50 : }
2080 :
2081 50 : if secondaries_in_wrong_az > 0 {
2082 0 : violations.push(format!(
2083 0 : "{} secondaries scheduled to the incorrect AZ",
2084 0 : secondaries_in_wrong_az
2085 0 : ));
2086 50 : }
2087 :
2088 50 : eprintln!(
2089 50 : "attachments_in_wrong_az={} secondaries_in_wrong_az={}",
2090 50 : attachments_in_wrong_az, secondaries_in_wrong_az
2091 50 : );
2092 :
2093 250 : for (node_id, stats) in &node_stats {
2094 200 : let node_az = nodes.get(node_id).unwrap().get_availability_zone_id();
2095 200 : let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2;
2096 200 : let allowed_attachment_load =
2097 200 : (ideal_attachment_load - 1)..(ideal_attachment_load + 2);
2098 200 :
2099 200 : if !allowed_attachment_load.contains(&stats.attachments) {
2100 0 : violations.push(format!(
2101 0 : "Found {} attachments on node {}, but expected {}",
2102 0 : stats.attachments, node_id, ideal_attachment_load
2103 0 : ));
2104 200 : }
2105 :
2106 200 : eprintln!(
2107 200 : "{}: attachments={} secondaries={} ideal_attachment_load={}",
2108 200 : node_id, stats.attachments, stats.secondaries, ideal_attachment_load
2109 200 : );
2110 : }
2111 :
2112 50 : assert!(violations.is_empty(), "{violations:?}");
2113 :
2114 12550 : for (mut shard, _ctx) in shards {
2115 12500 : shard.intent.clear(&mut scheduler);
2116 12500 : }
2117 : }
2118 1 : Ok(())
2119 1 : }
2120 : }
|