Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : sync::Arc,
4 : time::Duration,
5 : };
6 :
7 : use crate::{
8 : metrics::{
9 : self, ReconcileCompleteLabelGroup, ReconcileLongRunningLabelGroup, ReconcileOutcome,
10 : },
11 : persistence::TenantShardPersistence,
12 : reconciler::{ReconcileUnits, ReconcilerConfig},
13 : scheduler::{
14 : AffinityScore, AttachedShardTag, MaySchedule, RefCountUpdate, ScheduleContext,
15 : SecondaryShardTag,
16 : },
17 : service::ReconcileResultRequest,
18 : };
19 : use futures::future::{self, Either};
20 : use itertools::Itertools;
21 : use pageserver_api::controller_api::{
22 : AvailabilityZone, NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
23 : };
24 : use pageserver_api::{
25 : models::{LocationConfig, LocationConfigMode, TenantConfig},
26 : shard::{ShardIdentity, TenantShardId},
27 : };
28 : use serde::{Deserialize, Serialize};
29 : use tokio::task::JoinHandle;
30 : use tokio_util::sync::CancellationToken;
31 : use tracing::{instrument, Instrument};
32 : use utils::{
33 : generation::Generation,
34 : id::NodeId,
35 : seqwait::{SeqWait, SeqWaitError},
36 : sync::gate::GateGuard,
37 : };
38 :
39 : use crate::{
40 : compute_hook::ComputeHook,
41 : node::Node,
42 : persistence::{split_state::SplitState, Persistence},
43 : reconciler::{
44 : attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
45 : },
46 : scheduler::{ScheduleError, Scheduler},
47 : service, Sequence,
48 : };
49 :
50 : /// Serialization helper
51 0 : fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
52 0 : where
53 0 : S: serde::ser::Serializer,
54 0 : T: std::fmt::Display,
55 0 : {
56 0 : serializer.collect_str(
57 0 : &v.lock()
58 0 : .unwrap()
59 0 : .as_ref()
60 0 : .map(|e| format!("{e}"))
61 0 : .unwrap_or("".to_string()),
62 0 : )
63 0 : }
64 :
65 : /// In-memory state for a particular tenant shard.
66 : ///
67 : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
68 : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
69 0 : #[derive(Serialize)]
70 : pub(crate) struct TenantShard {
71 : pub(crate) tenant_shard_id: TenantShardId,
72 :
73 : pub(crate) shard: ShardIdentity,
74 :
75 : // Runtime only: sequence used to coordinate when updating this object while
76 : // with background reconcilers may be running. A reconciler runs to a particular
77 : // sequence.
78 : pub(crate) sequence: Sequence,
79 :
80 : // Latest generation number: next time we attach, increment this
81 : // and use the incremented number when attaching.
82 : //
83 : // None represents an incompletely onboarded tenant via the [`Service::location_config`]
84 : // API, where this tenant may only run in PlacementPolicy::Secondary.
85 : pub(crate) generation: Option<Generation>,
86 :
87 : // High level description of how the tenant should be set up. Provided
88 : // externally.
89 : pub(crate) policy: PlacementPolicy,
90 :
91 : // Low level description of exactly which pageservers should fulfil
92 : // which role. Generated by `Self::schedule`.
93 : pub(crate) intent: IntentState,
94 :
95 : // Low level description of how the tenant is configured on pageservers:
96 : // if this does not match `Self::intent` then the tenant needs reconciliation
97 : // with `Self::reconcile`.
98 : pub(crate) observed: ObservedState,
99 :
100 : // Tenant configuration, passed through opaquely to the pageserver. Identical
101 : // for all shards in a tenant.
102 : pub(crate) config: TenantConfig,
103 :
104 : /// If a reconcile task is currently in flight, it may be joined here (it is
105 : /// only safe to join if either the result has been received or the reconciler's
106 : /// cancellation token has been fired)
107 : #[serde(skip)]
108 : pub(crate) reconciler: Option<ReconcilerHandle>,
109 :
110 : /// If a tenant is being split, then all shards with that TenantId will have a
111 : /// SplitState set, this acts as a guard against other operations such as background
112 : /// reconciliation, and timeline creation.
113 : pub(crate) splitting: SplitState,
114 :
115 : /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
116 : /// is set. This flag is cleared when the tenant is popped off the delay queue.
117 : pub(crate) delayed_reconcile: bool,
118 :
119 : /// Optionally wait for reconciliation to complete up to a particular
120 : /// sequence number.
121 : #[serde(skip)]
122 : pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
123 :
124 : /// Indicates sequence number for which we have encountered an error reconciling. If
125 : /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
126 : /// and callers should stop waiting for `waiter` and propagate the error.
127 : #[serde(skip)]
128 : pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
129 :
130 : /// The most recent error from a reconcile on this tenant. This is a nested Arc
131 : /// because:
132 : /// - ReconcileWaiters need to Arc-clone the overall object to read it later
133 : /// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
134 : /// many waiters for one shard, and the underlying error types are not Clone.
135 : ///
136 : /// TODO: generalize to an array of recent events
137 : /// TOOD: use a ArcSwap instead of mutex for faster reads?
138 : #[serde(serialize_with = "read_last_error")]
139 : pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
140 :
141 : /// If we have a pending compute notification that for some reason we weren't able to send,
142 : /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
143 : /// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
144 : /// of state that we publish externally in an eventually consistent way.
145 : pub(crate) pending_compute_notification: bool,
146 :
147 : // Support/debug tool: if something is going wrong or flapping with scheduling, this may
148 : // be set to a non-active state to avoid making changes while the issue is fixed.
149 : scheduling_policy: ShardSchedulingPolicy,
150 :
151 : // We should attempt to schedule this shard in the provided AZ to
152 : // decrease chances of cross-AZ compute.
153 : preferred_az_id: Option<AvailabilityZone>,
154 : }
155 :
156 : #[derive(Default, Clone, Debug, Serialize)]
157 : pub(crate) struct IntentState {
158 : attached: Option<NodeId>,
159 : secondary: Vec<NodeId>,
160 : }
161 :
162 : impl IntentState {
163 19 : pub(crate) fn new() -> Self {
164 19 : Self {
165 19 : attached: None,
166 19 : secondary: vec![],
167 19 : }
168 19 : }
169 0 : pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
170 0 : if let Some(node_id) = node_id {
171 0 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach);
172 0 : }
173 0 : Self {
174 0 : attached: node_id,
175 0 : secondary: vec![],
176 0 : }
177 0 : }
178 :
179 12544 : pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
180 12544 : if self.attached != new_attached {
181 12544 : if let Some(old_attached) = self.attached.take() {
182 0 : scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
183 12544 : }
184 12544 : if let Some(new_attached) = &new_attached {
185 12544 : scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach);
186 12544 : }
187 12544 : self.attached = new_attached;
188 0 : }
189 12544 : }
190 :
191 : /// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
192 : /// secondary to attached while maintaining the scheduler's reference counts.
193 5 : pub(crate) fn promote_attached(
194 5 : &mut self,
195 5 : scheduler: &mut Scheduler,
196 5 : promote_secondary: NodeId,
197 5 : ) {
198 5 : // If we call this with a node that isn't in secondary, it would cause incorrect
199 5 : // scheduler reference counting, since we assume the node is already referenced as a secondary.
200 5 : debug_assert!(self.secondary.contains(&promote_secondary));
201 :
202 10 : self.secondary.retain(|n| n != &promote_secondary);
203 5 :
204 5 : let demoted = self.attached;
205 5 : self.attached = Some(promote_secondary);
206 5 :
207 5 : scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary);
208 5 : if let Some(demoted) = demoted {
209 0 : scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached);
210 5 : }
211 5 : }
212 :
213 12531 : pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
214 12531 : debug_assert!(!self.secondary.contains(&new_secondary));
215 12531 : scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary);
216 12531 : self.secondary.push(new_secondary);
217 12531 : }
218 :
219 : /// It is legal to call this with a node that is not currently a secondary: that is a no-op
220 5 : pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
221 5 : let index = self.secondary.iter().position(|n| *n == node_id);
222 5 : if let Some(index) = index {
223 5 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
224 5 : self.secondary.remove(index);
225 5 : }
226 5 : }
227 :
228 12543 : pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
229 12543 : for secondary in self.secondary.drain(..) {
230 12526 : scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary);
231 12526 : }
232 12543 : }
233 :
234 : /// Remove the last secondary node from the list of secondaries
235 0 : pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
236 0 : if let Some(node_id) = self.secondary.pop() {
237 0 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
238 0 : }
239 0 : }
240 :
241 12543 : pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
242 12543 : if let Some(old_attached) = self.attached.take() {
243 12543 : scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
244 12543 : }
245 :
246 12543 : self.clear_secondary(scheduler);
247 12543 : }
248 :
249 12614 : pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
250 12614 : let mut result = Vec::new();
251 12614 : if let Some(p) = self.attached {
252 12612 : result.push(p)
253 2 : }
254 :
255 12614 : result.extend(self.secondary.iter().copied());
256 12614 :
257 12614 : result
258 12614 : }
259 :
260 25095 : pub(crate) fn get_attached(&self) -> &Option<NodeId> {
261 25095 : &self.attached
262 25095 : }
263 :
264 12524 : pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
265 12524 : &self.secondary
266 12524 : }
267 :
268 : /// If the node is in use as the attached location, demote it into
269 : /// the list of secondary locations. This is used when a node goes offline,
270 : /// and we want to use a different node for attachment, but not permanently
271 : /// forget the location on the offline node.
272 : ///
273 : /// Returns true if a change was made
274 5 : pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
275 5 : if self.attached == Some(node_id) {
276 5 : self.attached = None;
277 5 : self.secondary.push(node_id);
278 5 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached);
279 5 : true
280 : } else {
281 0 : false
282 : }
283 5 : }
284 : }
285 :
286 : impl Drop for IntentState {
287 12544 : fn drop(&mut self) {
288 12544 : // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
289 12544 : // We do not check this while panicking, to avoid polluting unit test failures or
290 12544 : // other assertions with this assertion's output. It's still wrong to leak these,
291 12544 : // but if we already have a panic then we don't need to independently flag this case.
292 12544 : if !(std::thread::panicking()) {
293 12544 : debug_assert!(self.attached.is_none() && self.secondary.is_empty());
294 0 : }
295 12543 : }
296 : }
297 :
298 0 : #[derive(Default, Clone, Serialize, Deserialize, Debug)]
299 : pub(crate) struct ObservedState {
300 : pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
301 : }
302 :
303 : /// Our latest knowledge of how this tenant is configured in the outside world.
304 : ///
305 : /// Meaning:
306 : /// * No instance of this type exists for a node: we are certain that we have nothing configured on that
307 : /// node for this shard.
308 : /// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
309 : /// what it is (e.g. we failed partway through configuring it)
310 : /// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
311 : /// and that configuration will still be present unless something external interfered.
312 0 : #[derive(Clone, Serialize, Deserialize, Debug)]
313 : pub(crate) struct ObservedStateLocation {
314 : /// If None, it means we do not know the status of this shard's location on this node, but
315 : /// we know that we might have some state on this node.
316 : pub(crate) conf: Option<LocationConfig>,
317 : }
318 : pub(crate) struct ReconcilerWaiter {
319 : // For observability purposes, remember the ID of the shard we're
320 : // waiting for.
321 : pub(crate) tenant_shard_id: TenantShardId,
322 :
323 : seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
324 : error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
325 : error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
326 : seq: Sequence,
327 : }
328 :
329 : pub(crate) enum ReconcilerStatus {
330 : Done,
331 : Failed,
332 : InProgress,
333 : }
334 :
335 : #[derive(thiserror::Error, Debug)]
336 : pub(crate) enum ReconcileWaitError {
337 : #[error("Timeout waiting for shard {0}")]
338 : Timeout(TenantShardId),
339 : #[error("shutting down")]
340 : Shutdown,
341 : #[error("Reconcile error on shard {0}: {1}")]
342 : Failed(TenantShardId, Arc<ReconcileError>),
343 : }
344 :
345 : #[derive(Eq, PartialEq, Debug, Clone)]
346 : pub(crate) struct ReplaceSecondary {
347 : old_node_id: NodeId,
348 : new_node_id: NodeId,
349 : }
350 :
351 : #[derive(Eq, PartialEq, Debug, Clone)]
352 : pub(crate) struct MigrateAttachment {
353 : pub(crate) old_attached_node_id: NodeId,
354 : pub(crate) new_attached_node_id: NodeId,
355 : }
356 :
357 : #[derive(Eq, PartialEq, Debug, Clone)]
358 : pub(crate) enum ScheduleOptimizationAction {
359 : // Replace one of our secondary locations with a different node
360 : ReplaceSecondary(ReplaceSecondary),
361 : // Migrate attachment to an existing secondary location
362 : MigrateAttachment(MigrateAttachment),
363 : }
364 :
365 : #[derive(Eq, PartialEq, Debug, Clone)]
366 : pub(crate) struct ScheduleOptimization {
367 : // What was the reconcile sequence when we generated this optimization? The optimization
368 : // should only be applied if the shard's sequence is still at this value, in case other changes
369 : // happened between planning the optimization and applying it.
370 : sequence: Sequence,
371 :
372 : pub(crate) action: ScheduleOptimizationAction,
373 : }
374 :
375 : impl ReconcilerWaiter {
376 0 : pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
377 0 : tokio::select! {
378 0 : result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
379 0 : result.map_err(|e| match e {
380 0 : SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
381 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
382 0 : })?;
383 : },
384 0 : result = self.error_seq_wait.wait_for(self.seq) => {
385 0 : result.map_err(|e| match e {
386 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
387 0 : SeqWaitError::Timeout => unreachable!()
388 0 : })?;
389 :
390 0 : return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
391 0 : self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
392 : }
393 : }
394 :
395 0 : Ok(())
396 0 : }
397 :
398 0 : pub(crate) fn get_status(&self) -> ReconcilerStatus {
399 0 : if self.seq_wait.would_wait_for(self.seq).is_ok() {
400 0 : ReconcilerStatus::Done
401 0 : } else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
402 0 : ReconcilerStatus::Failed
403 : } else {
404 0 : ReconcilerStatus::InProgress
405 : }
406 0 : }
407 : }
408 :
409 : /// Having spawned a reconciler task, the tenant shard's state will carry enough
410 : /// information to optionally cancel & await it later.
411 : pub(crate) struct ReconcilerHandle {
412 : sequence: Sequence,
413 : handle: JoinHandle<()>,
414 : cancel: CancellationToken,
415 : }
416 :
417 : pub(crate) enum ReconcileNeeded {
418 : /// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
419 : /// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
420 : No,
421 : /// shard has a reconciler running, and its intent hasn't changed since that one was
422 : /// spawned: wait for the existing reconciler rather than spawning a new one.
423 : WaitExisting(ReconcilerWaiter),
424 : /// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
425 : Yes,
426 : }
427 :
428 : /// Pending modification to the observed state of a tenant shard.
429 : /// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
430 : pub(crate) enum ObservedStateDelta {
431 : Upsert(Box<(NodeId, ObservedStateLocation)>),
432 : Delete(NodeId),
433 : }
434 :
435 : impl ObservedStateDelta {
436 0 : pub(crate) fn node_id(&self) -> &NodeId {
437 0 : match self {
438 0 : Self::Upsert(up) => &up.0,
439 0 : Self::Delete(nid) => nid,
440 : }
441 0 : }
442 : }
443 :
444 : /// When a reconcile task completes, it sends this result object
445 : /// to be applied to the primary TenantShard.
446 : pub(crate) struct ReconcileResult {
447 : pub(crate) sequence: Sequence,
448 : /// On errors, `observed` should be treated as an incompleted description
449 : /// of state (i.e. any nodes present in the result should override nodes
450 : /// present in the parent tenant state, but any unmentioned nodes should
451 : /// not be removed from parent tenant state)
452 : pub(crate) result: Result<(), ReconcileError>,
453 :
454 : pub(crate) tenant_shard_id: TenantShardId,
455 : pub(crate) generation: Option<Generation>,
456 : pub(crate) observed_deltas: Vec<ObservedStateDelta>,
457 :
458 : /// Set [`TenantShard::pending_compute_notification`] from this flag
459 : pub(crate) pending_compute_notification: bool,
460 : }
461 :
462 : impl ObservedState {
463 0 : pub(crate) fn new() -> Self {
464 0 : Self {
465 0 : locations: HashMap::new(),
466 0 : }
467 0 : }
468 :
469 0 : pub(crate) fn is_empty(&self) -> bool {
470 0 : self.locations.is_empty()
471 0 : }
472 : }
473 :
474 : impl TenantShard {
475 12525 : pub(crate) fn new(
476 12525 : tenant_shard_id: TenantShardId,
477 12525 : shard: ShardIdentity,
478 12525 : policy: PlacementPolicy,
479 12525 : preferred_az_id: Option<AvailabilityZone>,
480 12525 : ) -> Self {
481 12525 : metrics::METRICS_REGISTRY
482 12525 : .metrics_group
483 12525 : .storage_controller_tenant_shards
484 12525 : .inc();
485 12525 :
486 12525 : Self {
487 12525 : tenant_shard_id,
488 12525 : policy,
489 12525 : intent: IntentState::default(),
490 12525 : generation: Some(Generation::new(0)),
491 12525 : shard,
492 12525 : observed: ObservedState::default(),
493 12525 : config: TenantConfig::default(),
494 12525 : reconciler: None,
495 12525 : splitting: SplitState::Idle,
496 12525 : sequence: Sequence(1),
497 12525 : delayed_reconcile: false,
498 12525 : waiter: Arc::new(SeqWait::new(Sequence(0))),
499 12525 : error_waiter: Arc::new(SeqWait::new(Sequence(0))),
500 12525 : last_error: Arc::default(),
501 12525 : pending_compute_notification: false,
502 12525 : scheduling_policy: ShardSchedulingPolicy::default(),
503 12525 : preferred_az_id,
504 12525 : }
505 12525 : }
506 :
507 : /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
508 : /// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
509 : /// to get an intent state that complies with placement policy. The overall goal is to do scheduling
510 : /// in a way that makes use of any configured locations that already exist in the outside world.
511 1 : pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
512 1 : // Choose an attached location by filtering observed locations, and then sorting to get the highest
513 1 : // generation
514 1 : let mut attached_locs = self
515 1 : .observed
516 1 : .locations
517 1 : .iter()
518 2 : .filter_map(|(node_id, l)| {
519 2 : if let Some(conf) = &l.conf {
520 2 : if conf.mode == LocationConfigMode::AttachedMulti
521 1 : || conf.mode == LocationConfigMode::AttachedSingle
522 1 : || conf.mode == LocationConfigMode::AttachedStale
523 : {
524 2 : Some((node_id, conf.generation))
525 : } else {
526 0 : None
527 : }
528 : } else {
529 0 : None
530 : }
531 2 : })
532 1 : .collect::<Vec<_>>();
533 1 :
534 2 : attached_locs.sort_by_key(|i| i.1);
535 1 : if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
536 1 : self.intent.set_attached(scheduler, Some(*node_id));
537 1 : }
538 :
539 : // All remaining observed locations generate secondary intents. This includes None
540 : // observations, as these may well have some local content on disk that is usable (this
541 : // is an edge case that might occur if we restarted during a migration or other change)
542 : //
543 : // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
544 : // will take care of promoting one of these secondaries to be attached.
545 2 : self.observed.locations.keys().for_each(|node_id| {
546 2 : if Some(*node_id) != self.intent.attached {
547 1 : self.intent.push_secondary(scheduler, *node_id);
548 1 : }
549 2 : });
550 1 : }
551 :
552 : /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
553 : /// attached pageserver for a shard.
554 : ///
555 : /// Returns whether we modified it, and the NodeId selected.
556 12521 : fn schedule_attached(
557 12521 : &mut self,
558 12521 : scheduler: &mut Scheduler,
559 12521 : context: &ScheduleContext,
560 12521 : ) -> Result<(bool, NodeId), ScheduleError> {
561 : // No work to do if we already have an attached tenant
562 12521 : if let Some(node_id) = self.intent.attached {
563 0 : return Ok((false, node_id));
564 12521 : }
565 :
566 12521 : if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
567 : // Promote a secondary
568 1 : tracing::debug!("Promoted secondary {} to attached", promote_secondary);
569 1 : self.intent.promote_attached(scheduler, promote_secondary);
570 1 : Ok((true, promote_secondary))
571 : } else {
572 : // Pick a fresh node: either we had no secondaries or none were schedulable
573 12520 : let node_id = scheduler.schedule_shard::<AttachedShardTag>(
574 12520 : &self.intent.secondary,
575 12520 : &self.preferred_az_id,
576 12520 : context,
577 12520 : )?;
578 12520 : tracing::debug!("Selected {} as attached", node_id);
579 12520 : self.intent.set_attached(scheduler, Some(node_id));
580 12520 : Ok((true, node_id))
581 : }
582 12521 : }
583 :
584 : #[instrument(skip_all, fields(
585 : tenant_id=%self.tenant_shard_id.tenant_id,
586 : shard_id=%self.tenant_shard_id.shard_slug(),
587 : sequence=%self.sequence
588 : ))]
589 : pub(crate) fn schedule(
590 : &mut self,
591 : scheduler: &mut Scheduler,
592 : context: &mut ScheduleContext,
593 : ) -> Result<(), ScheduleError> {
594 : let r = self.do_schedule(scheduler, context);
595 :
596 : context.avoid(&self.intent.all_pageservers());
597 : if let Some(attached) = self.intent.get_attached() {
598 : context.push_attached(*attached);
599 : }
600 :
601 : r
602 : }
603 :
604 12522 : pub(crate) fn do_schedule(
605 12522 : &mut self,
606 12522 : scheduler: &mut Scheduler,
607 12522 : context: &ScheduleContext,
608 12522 : ) -> Result<(), ScheduleError> {
609 12522 : // TODO: before scheduling new nodes, check if any existing content in
610 12522 : // self.intent refers to pageservers that are offline, and pick other
611 12522 : // pageservers if so.
612 12522 :
613 12522 : // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
614 12522 : // change their attach location.
615 12522 :
616 12522 : match self.scheduling_policy {
617 12521 : ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
618 : ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
619 : // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
620 1 : tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
621 0 : "Scheduling is disabled by policy {:?}", self.scheduling_policy);
622 1 : return Ok(());
623 : }
624 : }
625 :
626 : // Build the set of pageservers already in use by this tenant, to avoid scheduling
627 : // more work on the same pageservers we're already using.
628 12521 : let mut modified = false;
629 :
630 : // Add/remove nodes to fulfil policy
631 : use PlacementPolicy::*;
632 12521 : match self.policy {
633 12521 : Attached(secondary_count) => {
634 12521 : let retain_secondaries = if self.intent.attached.is_none()
635 12521 : && scheduler.node_preferred(&self.intent.secondary).is_some()
636 : {
637 : // If we have no attached, and one of the secondaries is elegible to be promoted, retain
638 : // one more secondary than we usually would, as one of them will become attached futher down this function.
639 1 : secondary_count + 1
640 : } else {
641 12520 : secondary_count
642 : };
643 :
644 12521 : while self.intent.secondary.len() > retain_secondaries {
645 0 : // We have no particular preference for one secondary location over another: just
646 0 : // arbitrarily drop from the end
647 0 : self.intent.pop_secondary(scheduler);
648 0 : modified = true;
649 0 : }
650 :
651 : // Should have exactly one attached, and N secondaries
652 12521 : let (modified_attached, attached_node_id) =
653 12521 : self.schedule_attached(scheduler, context)?;
654 12521 : modified |= modified_attached;
655 12521 :
656 12521 : let mut used_pageservers = vec![attached_node_id];
657 25041 : while self.intent.secondary.len() < secondary_count {
658 12520 : let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
659 12520 : &used_pageservers,
660 12520 : &self.preferred_az_id,
661 12520 : context,
662 12520 : )?;
663 12520 : self.intent.push_secondary(scheduler, node_id);
664 12520 : used_pageservers.push(node_id);
665 12520 : modified = true;
666 : }
667 : }
668 : Secondary => {
669 0 : if let Some(node_id) = self.intent.get_attached() {
670 0 : // Populate secondary by demoting the attached node
671 0 : self.intent.demote_attached(scheduler, *node_id);
672 0 : modified = true;
673 0 : } else if self.intent.secondary.is_empty() {
674 : // Populate secondary by scheduling a fresh node
675 0 : let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
676 0 : &[],
677 0 : &self.preferred_az_id,
678 0 : context,
679 0 : )?;
680 0 : self.intent.push_secondary(scheduler, node_id);
681 0 : modified = true;
682 0 : }
683 0 : while self.intent.secondary.len() > 1 {
684 0 : // We have no particular preference for one secondary location over another: just
685 0 : // arbitrarily drop from the end
686 0 : self.intent.pop_secondary(scheduler);
687 0 : modified = true;
688 0 : }
689 : }
690 : Detached => {
691 : // Never add locations in this mode
692 0 : if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
693 0 : self.intent.clear(scheduler);
694 0 : modified = true;
695 0 : }
696 : }
697 : }
698 :
699 12521 : if modified {
700 12521 : self.sequence.0 += 1;
701 12521 : }
702 :
703 12521 : Ok(())
704 12522 : }
705 :
706 : /// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
707 : /// if the swap is not possible and leaves the intent state in its original state.
708 : ///
709 : /// Arguments:
710 : /// `attached_to`: the currently attached location matching the intent state (may be None if the
711 : /// shard is not attached)
712 : /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
713 : /// the scheduler to recommend a node
714 0 : pub(crate) fn reschedule_to_secondary(
715 0 : &mut self,
716 0 : promote_to: Option<NodeId>,
717 0 : scheduler: &mut Scheduler,
718 0 : ) -> Result<(), ScheduleError> {
719 0 : let promote_to = match promote_to {
720 0 : Some(node) => node,
721 0 : None => match scheduler.node_preferred(self.intent.get_secondary()) {
722 0 : Some(node) => node,
723 : None => {
724 0 : return Err(ScheduleError::ImpossibleConstraint);
725 : }
726 : },
727 : };
728 :
729 0 : assert!(self.intent.get_secondary().contains(&promote_to));
730 :
731 0 : if let Some(node) = self.intent.get_attached() {
732 0 : let demoted = self.intent.demote_attached(scheduler, *node);
733 0 : if !demoted {
734 0 : return Err(ScheduleError::ImpossibleConstraint);
735 0 : }
736 0 : }
737 :
738 0 : self.intent.promote_attached(scheduler, promote_to);
739 0 :
740 0 : // Increment the sequence number for the edge case where a
741 0 : // reconciler is already running to avoid waiting on the
742 0 : // current reconcile instead of spawning a new one.
743 0 : self.sequence = self.sequence.next();
744 0 :
745 0 : Ok(())
746 0 : }
747 :
748 : /// Optimize attachments: if a shard has a secondary location that is preferable to
749 : /// its primary location based on soft constraints, switch that secondary location
750 : /// to be attached.
751 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
752 : pub(crate) fn optimize_attachment(
753 : &self,
754 : nodes: &HashMap<NodeId, Node>,
755 : schedule_context: &ScheduleContext,
756 : ) -> Option<ScheduleOptimization> {
757 : let attached = (*self.intent.get_attached())?;
758 : if self.intent.secondary.is_empty() {
759 : // We can only do useful work if we have both attached and secondary locations: this
760 : // function doesn't schedule new locations, only swaps between attached and secondaries.
761 : return None;
762 : }
763 :
764 : let current_affinity_score = schedule_context.get_node_affinity(attached);
765 : let current_attachment_count = schedule_context.get_node_attachments(attached);
766 :
767 : // Generate score for each node, dropping any un-schedulable nodes.
768 : let all_pageservers = self.intent.all_pageservers();
769 : let mut scores = all_pageservers
770 : .iter()
771 46 : .flat_map(|node_id| {
772 46 : let node = nodes.get(node_id);
773 46 : if node.is_none() {
774 0 : None
775 46 : } else if matches!(
776 46 : node.unwrap().get_scheduling(),
777 : NodeSchedulingPolicy::Filling
778 : ) {
779 : // If the node is currently filling, don't count it as a candidate to avoid,
780 : // racing with the background fill.
781 0 : None
782 46 : } else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
783 0 : None
784 : } else {
785 46 : let affinity_score = schedule_context.get_node_affinity(*node_id);
786 46 : let attachment_count = schedule_context.get_node_attachments(*node_id);
787 46 : Some((*node_id, affinity_score, attachment_count))
788 : }
789 46 : })
790 : .collect::<Vec<_>>();
791 :
792 : // Sort precedence:
793 : // 1st - prefer nodes with the lowest total affinity score
794 : // 2nd - prefer nodes with the lowest number of attachments in this context
795 : // 3rd - if all else is equal, sort by node ID for determinism in tests.
796 46 : scores.sort_by_key(|i| (i.1, i.2, i.0));
797 :
798 : if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
799 : scores.first()
800 : {
801 : if attached != *preferred_node {
802 : // The best alternative must be more than 1 better than us, otherwise we could end
803 : // up flapping back next time we're called (e.g. there's no point migrating from
804 : // a location with score 1 to a score zero, because on next location the situation
805 : // would be the same, but in reverse).
806 : if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
807 : || current_attachment_count > *preferred_attachment_count + 1
808 : {
809 : tracing::info!(
810 : "Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
811 : self.intent.get_secondary()
812 : );
813 : return Some(ScheduleOptimization {
814 : sequence: self.sequence,
815 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
816 : old_attached_node_id: attached,
817 : new_attached_node_id: *preferred_node,
818 : }),
819 : });
820 : }
821 : } else {
822 : tracing::debug!(
823 : "Node {} is already preferred (score {:?})",
824 : preferred_node,
825 : preferred_affinity_score
826 : );
827 : }
828 : }
829 :
830 : // Fall-through: we didn't find an optimization
831 : None
832 : }
833 :
834 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
835 : pub(crate) fn optimize_secondary(
836 : &self,
837 : scheduler: &mut Scheduler,
838 : schedule_context: &ScheduleContext,
839 : ) -> Option<ScheduleOptimization> {
840 : if self.intent.secondary.is_empty() {
841 : // We can only do useful work if we have both attached and secondary locations: this
842 : // function doesn't schedule new locations, only swaps between attached and secondaries.
843 : return None;
844 : }
845 :
846 : for secondary in self.intent.get_secondary() {
847 : let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
848 : // We're already on a node unaffected any affinity constraints,
849 : // so we won't change it.
850 : continue;
851 : };
852 :
853 : // Let the scheduler suggest a node, where it would put us if we were scheduling afresh
854 : // This implicitly limits the choice to nodes that are available, and prefers nodes
855 : // with lower utilization.
856 : let Ok(candidate_node) = scheduler.schedule_shard::<SecondaryShardTag>(
857 : &self.intent.all_pageservers(),
858 : &self.preferred_az_id,
859 : schedule_context,
860 : ) else {
861 : // A scheduling error means we have no possible candidate replacements
862 : continue;
863 : };
864 :
865 : let candidate_affinity_score = schedule_context
866 : .nodes
867 : .get(&candidate_node)
868 : .unwrap_or(&AffinityScore::FREE);
869 :
870 : // The best alternative must be more than 1 better than us, otherwise we could end
871 : // up flapping back next time we're called.
872 : if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
873 : // If some other node is available and has a lower score than this node, then
874 : // that other node is a good place to migrate to.
875 : tracing::info!(
876 : "Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
877 : self.intent.get_secondary()
878 : );
879 : return Some(ScheduleOptimization {
880 : sequence: self.sequence,
881 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
882 : old_node_id: *secondary,
883 : new_node_id: candidate_node,
884 : }),
885 : });
886 : }
887 : }
888 :
889 : None
890 : }
891 :
892 : /// Return true if the optimization was really applied: it will not be applied if the optimization's
893 : /// sequence is behind this tenant shard's
894 9 : pub(crate) fn apply_optimization(
895 9 : &mut self,
896 9 : scheduler: &mut Scheduler,
897 9 : optimization: ScheduleOptimization,
898 9 : ) -> bool {
899 9 : if optimization.sequence != self.sequence {
900 0 : return false;
901 9 : }
902 9 :
903 9 : metrics::METRICS_REGISTRY
904 9 : .metrics_group
905 9 : .storage_controller_schedule_optimization
906 9 : .inc();
907 9 :
908 9 : match optimization.action {
909 : ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
910 4 : old_attached_node_id,
911 4 : new_attached_node_id,
912 4 : }) => {
913 4 : self.intent.demote_attached(scheduler, old_attached_node_id);
914 4 : self.intent
915 4 : .promote_attached(scheduler, new_attached_node_id);
916 4 : }
917 : ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
918 5 : old_node_id,
919 5 : new_node_id,
920 5 : }) => {
921 5 : self.intent.remove_secondary(scheduler, old_node_id);
922 5 : self.intent.push_secondary(scheduler, new_node_id);
923 5 : }
924 : }
925 :
926 9 : true
927 9 : }
928 :
929 : /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
930 : /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
931 : /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
932 : ///
933 : /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
934 : /// funciton should not be used to decide whether to reconcile.
935 0 : pub(crate) fn stably_attached(&self) -> Option<NodeId> {
936 0 : if let Some(attach_intent) = self.intent.attached {
937 0 : match self.observed.locations.get(&attach_intent) {
938 0 : Some(loc) => match &loc.conf {
939 0 : Some(conf) => match conf.mode {
940 : LocationConfigMode::AttachedMulti
941 : | LocationConfigMode::AttachedSingle
942 : | LocationConfigMode::AttachedStale => {
943 : // Our intent and observed state agree that this node is in an attached state.
944 0 : Some(attach_intent)
945 : }
946 : // Our observed config is not an attached state
947 0 : _ => None,
948 : },
949 : // Our observed state is None, i.e. in flux
950 0 : None => None,
951 : },
952 : // We have no observed state for this node
953 0 : None => None,
954 : }
955 : } else {
956 : // Our intent is not to attach
957 0 : None
958 : }
959 0 : }
960 :
961 0 : fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
962 0 : let mut dirty_nodes = HashSet::new();
963 :
964 0 : if let Some(node_id) = self.intent.attached {
965 : // Maybe panic: it is a severe bug if we try to attach while generation is null.
966 0 : let generation = self
967 0 : .generation
968 0 : .expect("Attempted to enter attached state without a generation");
969 0 :
970 0 : let wanted_conf =
971 0 : attached_location_conf(generation, &self.shard, &self.config, &self.policy);
972 0 : match self.observed.locations.get(&node_id) {
973 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
974 0 : Some(_) | None => {
975 0 : dirty_nodes.insert(node_id);
976 0 : }
977 : }
978 0 : }
979 :
980 0 : for node_id in &self.intent.secondary {
981 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
982 0 : match self.observed.locations.get(node_id) {
983 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
984 0 : Some(_) | None => {
985 0 : dirty_nodes.insert(*node_id);
986 0 : }
987 : }
988 : }
989 :
990 0 : for node_id in self.observed.locations.keys() {
991 0 : if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
992 0 : // We have observed state that isn't part of our intent: need to clean it up.
993 0 : dirty_nodes.insert(*node_id);
994 0 : }
995 : }
996 :
997 0 : dirty_nodes.retain(|node_id| {
998 0 : nodes
999 0 : .get(node_id)
1000 0 : .map(|n| n.is_available())
1001 0 : .unwrap_or(false)
1002 0 : });
1003 0 :
1004 0 : !dirty_nodes.is_empty()
1005 0 : }
1006 :
1007 : #[allow(clippy::too_many_arguments)]
1008 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1009 : pub(crate) fn get_reconcile_needed(
1010 : &mut self,
1011 : pageservers: &Arc<HashMap<NodeId, Node>>,
1012 : ) -> ReconcileNeeded {
1013 : // If there are any ambiguous observed states, and the nodes they refer to are available,
1014 : // we should reconcile to clean them up.
1015 : let mut dirty_observed = false;
1016 : for (node_id, observed_loc) in &self.observed.locations {
1017 : let node = pageservers
1018 : .get(node_id)
1019 : .expect("Nodes may not be removed while referenced");
1020 : if observed_loc.conf.is_none() && node.is_available() {
1021 : dirty_observed = true;
1022 : break;
1023 : }
1024 : }
1025 :
1026 : let active_nodes_dirty = self.dirty(pageservers);
1027 :
1028 : // Even if there is no pageserver work to be done, if we have a pending notification to computes,
1029 : // wake up a reconciler to send it.
1030 : let do_reconcile =
1031 : active_nodes_dirty || dirty_observed || self.pending_compute_notification;
1032 :
1033 : if !do_reconcile {
1034 : tracing::debug!("Not dirty, no reconciliation needed.");
1035 : return ReconcileNeeded::No;
1036 : }
1037 :
1038 : // If we are currently splitting, then never start a reconciler task: the splitting logic
1039 : // requires that shards are not interfered with while it runs. Do this check here rather than
1040 : // up top, so that we only log this message if we would otherwise have done a reconciliation.
1041 : if !matches!(self.splitting, SplitState::Idle) {
1042 : tracing::info!("Refusing to reconcile, splitting in progress");
1043 : return ReconcileNeeded::No;
1044 : }
1045 :
1046 : // Reconcile already in flight for the current sequence?
1047 : if let Some(handle) = &self.reconciler {
1048 : if handle.sequence == self.sequence {
1049 : tracing::info!(
1050 : "Reconciliation already in progress for sequence {:?}",
1051 : self.sequence,
1052 : );
1053 : return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
1054 : tenant_shard_id: self.tenant_shard_id,
1055 : seq_wait: self.waiter.clone(),
1056 : error_seq_wait: self.error_waiter.clone(),
1057 : error: self.last_error.clone(),
1058 : seq: self.sequence,
1059 : });
1060 : }
1061 : }
1062 :
1063 : // Pre-checks done: finally check whether we may actually do the work
1064 : match self.scheduling_policy {
1065 : ShardSchedulingPolicy::Active
1066 : | ShardSchedulingPolicy::Essential
1067 : | ShardSchedulingPolicy::Pause => {}
1068 : ShardSchedulingPolicy::Stop => {
1069 : // We only reach this point if there is work to do and we're going to skip
1070 : // doing it: warn it obvious why this tenant isn't doing what it ought to.
1071 : tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
1072 : return ReconcileNeeded::No;
1073 : }
1074 : }
1075 :
1076 : ReconcileNeeded::Yes
1077 : }
1078 :
1079 : /// Ensure the sequence number is set to a value where waiting for this value will make us wait
1080 : /// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
1081 : ///
1082 : /// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
1083 : /// that the waiter will not complete until some future Reconciler is constructed and run.
1084 0 : fn ensure_sequence_ahead(&mut self) {
1085 0 : // Find the highest sequence for which a Reconciler has previously run or is currently
1086 0 : // running
1087 0 : let max_seen = std::cmp::max(
1088 0 : self.reconciler
1089 0 : .as_ref()
1090 0 : .map(|r| r.sequence)
1091 0 : .unwrap_or(Sequence(0)),
1092 0 : std::cmp::max(self.waiter.load(), self.error_waiter.load()),
1093 0 : );
1094 0 :
1095 0 : if self.sequence <= max_seen {
1096 0 : self.sequence = max_seen.next();
1097 0 : }
1098 0 : }
1099 :
1100 : /// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
1101 : ///
1102 : /// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
1103 : /// you would like to wait on the next reconciler that gets spawned in the background.
1104 0 : pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
1105 0 : self.ensure_sequence_ahead();
1106 0 :
1107 0 : ReconcilerWaiter {
1108 0 : tenant_shard_id: self.tenant_shard_id,
1109 0 : seq_wait: self.waiter.clone(),
1110 0 : error_seq_wait: self.error_waiter.clone(),
1111 0 : error: self.last_error.clone(),
1112 0 : seq: self.sequence,
1113 0 : }
1114 0 : }
1115 :
1116 0 : async fn reconcile(
1117 0 : sequence: Sequence,
1118 0 : mut reconciler: Reconciler,
1119 0 : must_notify: bool,
1120 0 : ) -> ReconcileResult {
1121 : // Attempt to make observed state match intent state
1122 0 : let result = reconciler.reconcile().await;
1123 :
1124 : // If we know we had a pending compute notification from some previous action, send a notification irrespective
1125 : // of whether the above reconcile() did any work
1126 0 : if result.is_ok() && must_notify {
1127 : // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
1128 0 : reconciler.compute_notify().await.ok();
1129 0 : }
1130 :
1131 : // Update result counter
1132 0 : let outcome_label = match &result {
1133 0 : Ok(_) => ReconcileOutcome::Success,
1134 0 : Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
1135 0 : Err(_) => ReconcileOutcome::Error,
1136 : };
1137 :
1138 0 : metrics::METRICS_REGISTRY
1139 0 : .metrics_group
1140 0 : .storage_controller_reconcile_complete
1141 0 : .inc(ReconcileCompleteLabelGroup {
1142 0 : status: outcome_label,
1143 0 : });
1144 0 :
1145 0 : // Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
1146 0 : // try and schedule more work in response to our result.
1147 0 : ReconcileResult {
1148 0 : sequence,
1149 0 : result,
1150 0 : tenant_shard_id: reconciler.tenant_shard_id,
1151 0 : generation: reconciler.generation,
1152 0 : observed_deltas: reconciler.observed_deltas(),
1153 0 : pending_compute_notification: reconciler.compute_notify_failure,
1154 0 : }
1155 0 : }
1156 :
1157 : #[allow(clippy::too_many_arguments)]
1158 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1159 : pub(crate) fn spawn_reconciler(
1160 : &mut self,
1161 : result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
1162 : pageservers: &Arc<HashMap<NodeId, Node>>,
1163 : compute_hook: &Arc<ComputeHook>,
1164 : reconciler_config: ReconcilerConfig,
1165 : service_config: &service::Config,
1166 : persistence: &Arc<Persistence>,
1167 : units: ReconcileUnits,
1168 : gate_guard: GateGuard,
1169 : cancel: &CancellationToken,
1170 : ) -> Option<ReconcilerWaiter> {
1171 : // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
1172 : // doing our sequence's work.
1173 : let old_handle = self.reconciler.take();
1174 :
1175 : // Build list of nodes from which the reconciler should detach
1176 : let mut detach = Vec::new();
1177 : for node_id in self.observed.locations.keys() {
1178 : if self.intent.get_attached() != &Some(*node_id)
1179 : && !self.intent.secondary.contains(node_id)
1180 : {
1181 : detach.push(
1182 : pageservers
1183 : .get(node_id)
1184 : .expect("Intent references non-existent pageserver")
1185 : .clone(),
1186 : )
1187 : }
1188 : }
1189 :
1190 : // Advance the sequence before spawning a reconciler, so that sequence waiters
1191 : // can distinguish between before+after the reconcile completes.
1192 : self.ensure_sequence_ahead();
1193 :
1194 : let reconciler_cancel = cancel.child_token();
1195 : let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
1196 : let reconciler = Reconciler {
1197 : tenant_shard_id: self.tenant_shard_id,
1198 : shard: self.shard,
1199 : placement_policy: self.policy.clone(),
1200 : generation: self.generation,
1201 : intent: reconciler_intent,
1202 : detach,
1203 : reconciler_config,
1204 : config: self.config.clone(),
1205 : preferred_az: self.preferred_az_id.clone(),
1206 : observed: self.observed.clone(),
1207 : original_observed: self.observed.clone(),
1208 : compute_hook: compute_hook.clone(),
1209 : service_config: service_config.clone(),
1210 : _gate_guard: gate_guard,
1211 : _resource_units: units,
1212 : cancel: reconciler_cancel.clone(),
1213 : persistence: persistence.clone(),
1214 : compute_notify_failure: false,
1215 : };
1216 :
1217 : let reconcile_seq = self.sequence;
1218 : let long_reconcile_threshold = service_config.long_reconcile_threshold;
1219 :
1220 : tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
1221 : let must_notify = self.pending_compute_notification;
1222 : let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
1223 : tenant_id=%reconciler.tenant_shard_id.tenant_id,
1224 : shard_id=%reconciler.tenant_shard_id.shard_slug());
1225 : metrics::METRICS_REGISTRY
1226 : .metrics_group
1227 : .storage_controller_reconcile_spawn
1228 : .inc();
1229 : let result_tx = result_tx.clone();
1230 : let join_handle = tokio::task::spawn(
1231 0 : async move {
1232 : // Wait for any previous reconcile task to complete before we start
1233 0 : if let Some(old_handle) = old_handle {
1234 0 : old_handle.cancel.cancel();
1235 0 : if let Err(e) = old_handle.handle.await {
1236 : // We can't do much with this other than log it: the task is done, so
1237 : // we may proceed with our work.
1238 0 : tracing::error!("Unexpected join error waiting for reconcile task: {e}");
1239 0 : }
1240 0 : }
1241 :
1242 : // Early check for cancellation before doing any work
1243 : // TODO: wrap all remote API operations in cancellation check
1244 : // as well.
1245 0 : if reconciler.cancel.is_cancelled() {
1246 0 : metrics::METRICS_REGISTRY
1247 0 : .metrics_group
1248 0 : .storage_controller_reconcile_complete
1249 0 : .inc(ReconcileCompleteLabelGroup {
1250 0 : status: ReconcileOutcome::Cancel,
1251 0 : });
1252 0 : return;
1253 0 : }
1254 0 :
1255 0 : let (tenant_id_label, shard_number_label, sequence_label) = {
1256 0 : (
1257 0 : reconciler.tenant_shard_id.tenant_id.to_string(),
1258 0 : reconciler.tenant_shard_id.shard_number.0.to_string(),
1259 0 : reconcile_seq.to_string(),
1260 0 : )
1261 0 : };
1262 0 :
1263 0 : let label_group = ReconcileLongRunningLabelGroup {
1264 0 : tenant_id: &tenant_id_label,
1265 0 : shard_number: &shard_number_label,
1266 0 : sequence: &sequence_label,
1267 0 : };
1268 0 :
1269 0 : let reconcile_fut = Self::reconcile(reconcile_seq, reconciler, must_notify);
1270 0 : let long_reconcile_fut = {
1271 0 : let label_group = label_group.clone();
1272 0 : async move {
1273 0 : tokio::time::sleep(long_reconcile_threshold).await;
1274 :
1275 0 : tracing::warn!("Reconcile passed the long running threshold of {long_reconcile_threshold:?}");
1276 :
1277 0 : metrics::METRICS_REGISTRY
1278 0 : .metrics_group
1279 0 : .storage_controller_reconcile_long_running
1280 0 : .inc(label_group);
1281 0 : }
1282 : };
1283 :
1284 0 : let reconcile_fut = std::pin::pin!(reconcile_fut);
1285 0 : let long_reconcile_fut = std::pin::pin!(long_reconcile_fut);
1286 :
1287 0 : let (was_long, result) =
1288 0 : match future::select(reconcile_fut, long_reconcile_fut).await {
1289 0 : Either::Left((reconcile_result, _)) => (false, reconcile_result),
1290 0 : Either::Right((_, reconcile_fut)) => (true, reconcile_fut.await),
1291 : };
1292 :
1293 0 : if was_long {
1294 0 : let id = metrics::METRICS_REGISTRY
1295 0 : .metrics_group
1296 0 : .storage_controller_reconcile_long_running
1297 0 : .with_labels(label_group);
1298 0 : metrics::METRICS_REGISTRY
1299 0 : .metrics_group
1300 0 : .storage_controller_reconcile_long_running
1301 0 : .remove_metric(id);
1302 0 : }
1303 :
1304 0 : result_tx
1305 0 : .send(ReconcileResultRequest::ReconcileResult(result))
1306 0 : .ok();
1307 0 : }
1308 : .instrument(reconciler_span),
1309 : );
1310 :
1311 : self.reconciler = Some(ReconcilerHandle {
1312 : sequence: self.sequence,
1313 : handle: join_handle,
1314 : cancel: reconciler_cancel,
1315 : });
1316 :
1317 : Some(ReconcilerWaiter {
1318 : tenant_shard_id: self.tenant_shard_id,
1319 : seq_wait: self.waiter.clone(),
1320 : error_seq_wait: self.error_waiter.clone(),
1321 : error: self.last_error.clone(),
1322 : seq: self.sequence,
1323 : })
1324 : }
1325 :
1326 0 : pub(crate) fn cancel_reconciler(&self) {
1327 0 : if let Some(handle) = self.reconciler.as_ref() {
1328 0 : handle.cancel.cancel()
1329 0 : }
1330 0 : }
1331 :
1332 : /// Get a waiter for any reconciliation in flight, but do not start reconciliation
1333 : /// if it is not already running
1334 0 : pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
1335 0 : if self.reconciler.is_some() {
1336 0 : Some(ReconcilerWaiter {
1337 0 : tenant_shard_id: self.tenant_shard_id,
1338 0 : seq_wait: self.waiter.clone(),
1339 0 : error_seq_wait: self.error_waiter.clone(),
1340 0 : error: self.last_error.clone(),
1341 0 : seq: self.sequence,
1342 0 : })
1343 : } else {
1344 0 : None
1345 : }
1346 0 : }
1347 :
1348 : /// Called when a ReconcileResult has been emitted and the service is updating
1349 : /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
1350 : /// the handle to indicate there is no longer a reconciliation in progress.
1351 0 : pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
1352 0 : if let Some(reconcile_handle) = &self.reconciler {
1353 0 : if reconcile_handle.sequence <= sequence {
1354 0 : self.reconciler = None;
1355 0 : }
1356 0 : }
1357 0 : }
1358 :
1359 : /// If we had any state at all referring to this node ID, drop it. Does not
1360 : /// attempt to reschedule.
1361 : ///
1362 : /// Returns true if we modified the node's intent state.
1363 0 : pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
1364 0 : let mut intent_modified = false;
1365 0 :
1366 0 : // Drop if this node was our attached intent
1367 0 : if self.intent.attached == Some(node_id) {
1368 0 : self.intent.attached = None;
1369 0 : intent_modified = true;
1370 0 : }
1371 :
1372 : // Drop from the list of secondaries, and check if we modified it
1373 0 : let had_secondaries = self.intent.secondary.len();
1374 0 : self.intent.secondary.retain(|n| n != &node_id);
1375 0 : intent_modified |= self.intent.secondary.len() != had_secondaries;
1376 0 :
1377 0 : debug_assert!(!self.intent.all_pageservers().contains(&node_id));
1378 :
1379 0 : intent_modified
1380 0 : }
1381 :
1382 0 : pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
1383 0 : self.scheduling_policy = p;
1384 0 : }
1385 :
1386 0 : pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
1387 0 : &self.scheduling_policy
1388 0 : }
1389 :
1390 0 : pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
1391 0 : // Ordering: always set last_error before advancing sequence, so that sequence
1392 0 : // waiters are guaranteed to see a Some value when they see an error.
1393 0 : *(self.last_error.lock().unwrap()) = Some(Arc::new(error));
1394 0 : self.error_waiter.advance(sequence);
1395 0 : }
1396 :
1397 0 : pub(crate) fn from_persistent(
1398 0 : tsp: TenantShardPersistence,
1399 0 : intent: IntentState,
1400 0 : ) -> anyhow::Result<Self> {
1401 0 : let tenant_shard_id = tsp.get_tenant_shard_id()?;
1402 0 : let shard_identity = tsp.get_shard_identity()?;
1403 :
1404 0 : metrics::METRICS_REGISTRY
1405 0 : .metrics_group
1406 0 : .storage_controller_tenant_shards
1407 0 : .inc();
1408 0 :
1409 0 : Ok(Self {
1410 0 : tenant_shard_id,
1411 0 : shard: shard_identity,
1412 0 : sequence: Sequence::initial(),
1413 0 : generation: tsp.generation.map(|g| Generation::new(g as u32)),
1414 0 : policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
1415 0 : intent,
1416 0 : observed: ObservedState::new(),
1417 0 : config: serde_json::from_str(&tsp.config).unwrap(),
1418 0 : reconciler: None,
1419 0 : splitting: tsp.splitting,
1420 0 : waiter: Arc::new(SeqWait::new(Sequence::initial())),
1421 0 : error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
1422 0 : last_error: Arc::default(),
1423 0 : pending_compute_notification: false,
1424 0 : delayed_reconcile: false,
1425 0 : scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
1426 0 : preferred_az_id: tsp.preferred_az_id.map(AvailabilityZone),
1427 0 : })
1428 0 : }
1429 :
1430 0 : pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
1431 0 : TenantShardPersistence {
1432 0 : tenant_id: self.tenant_shard_id.tenant_id.to_string(),
1433 0 : shard_number: self.tenant_shard_id.shard_number.0 as i32,
1434 0 : shard_count: self.tenant_shard_id.shard_count.literal() as i32,
1435 0 : shard_stripe_size: self.shard.stripe_size.0 as i32,
1436 0 : generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
1437 0 : generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
1438 0 : placement_policy: serde_json::to_string(&self.policy).unwrap(),
1439 0 : config: serde_json::to_string(&self.config).unwrap(),
1440 0 : splitting: SplitState::default(),
1441 0 : scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
1442 0 : preferred_az_id: self.preferred_az_id.as_ref().map(|az| az.0.clone()),
1443 0 : }
1444 0 : }
1445 :
1446 12500 : pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> {
1447 12500 : self.preferred_az_id.as_ref()
1448 12500 : }
1449 :
1450 0 : pub(crate) fn set_preferred_az(&mut self, preferred_az_id: AvailabilityZone) {
1451 0 : self.preferred_az_id = Some(preferred_az_id);
1452 0 : }
1453 :
1454 : /// Returns all the nodes to which this tenant shard is attached according to the
1455 : /// observed state and the generations. Return vector is sorted from latest generation
1456 : /// to earliest.
1457 0 : pub(crate) fn attached_locations(&self) -> Vec<(NodeId, Generation)> {
1458 0 : self.observed
1459 0 : .locations
1460 0 : .iter()
1461 0 : .filter_map(|(node_id, observed)| {
1462 : use LocationConfigMode::{AttachedMulti, AttachedSingle, AttachedStale};
1463 :
1464 0 : let conf = observed.conf.as_ref()?;
1465 :
1466 0 : match (conf.generation, conf.mode) {
1467 0 : (Some(gen), AttachedMulti | AttachedSingle | AttachedStale) => {
1468 0 : Some((*node_id, gen))
1469 : }
1470 0 : _ => None,
1471 : }
1472 0 : })
1473 0 : .sorted_by(|(_lhs_node_id, lhs_gen), (_rhs_node_id, rhs_gen)| {
1474 0 : lhs_gen.cmp(rhs_gen).reverse()
1475 0 : })
1476 0 : .map(|(node_id, gen)| (node_id, Generation::new(gen)))
1477 0 : .collect()
1478 0 : }
1479 :
1480 : /// Update the observed state of the tenant by applying incremental deltas
1481 : ///
1482 : /// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
1483 : /// They are then filtered in [`crate::service::Service::process_result`].
1484 0 : pub(crate) fn apply_observed_deltas(
1485 0 : &mut self,
1486 0 : deltas: impl Iterator<Item = ObservedStateDelta>,
1487 0 : ) {
1488 0 : for delta in deltas {
1489 0 : match delta {
1490 0 : ObservedStateDelta::Upsert(ups) => {
1491 0 : let (node_id, loc) = *ups;
1492 0 :
1493 0 : // If the generation of the observed location in the delta is lagging
1494 0 : // behind the current one, then we have a race condition and cannot
1495 0 : // be certain about the true observed state. Set the observed state
1496 0 : // to None in order to reflect this.
1497 0 : let crnt_gen = self
1498 0 : .observed
1499 0 : .locations
1500 0 : .get(&node_id)
1501 0 : .and_then(|loc| loc.conf.as_ref())
1502 0 : .and_then(|conf| conf.generation);
1503 0 : let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
1504 0 : match (crnt_gen, new_gen) {
1505 0 : (Some(crnt), Some(new)) if crnt_gen > new_gen => {
1506 0 : tracing::warn!(
1507 0 : "Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
1508 : node_id, loc, crnt, new
1509 : );
1510 :
1511 0 : self.observed
1512 0 : .locations
1513 0 : .insert(node_id, ObservedStateLocation { conf: None });
1514 0 :
1515 0 : continue;
1516 : }
1517 0 : _ => {}
1518 : }
1519 :
1520 0 : if let Some(conf) = &loc.conf {
1521 0 : tracing::info!("Updating observed location {}: {:?}", node_id, conf);
1522 : } else {
1523 0 : tracing::info!("Setting observed location {} to None", node_id,)
1524 : }
1525 :
1526 0 : self.observed.locations.insert(node_id, loc);
1527 : }
1528 0 : ObservedStateDelta::Delete(node_id) => {
1529 0 : tracing::info!("Deleting observed location {}", node_id);
1530 0 : self.observed.locations.remove(&node_id);
1531 : }
1532 : }
1533 : }
1534 0 : }
1535 : }
1536 :
1537 : impl Drop for TenantShard {
1538 12525 : fn drop(&mut self) {
1539 12525 : metrics::METRICS_REGISTRY
1540 12525 : .metrics_group
1541 12525 : .storage_controller_tenant_shards
1542 12525 : .dec();
1543 12525 : }
1544 : }
1545 :
1546 : #[cfg(test)]
1547 : pub(crate) mod tests {
1548 : use std::{cell::RefCell, rc::Rc};
1549 :
1550 : use pageserver_api::{
1551 : controller_api::NodeAvailability,
1552 : shard::{ShardCount, ShardNumber},
1553 : };
1554 : use rand::{rngs::StdRng, SeedableRng};
1555 : use utils::id::TenantId;
1556 :
1557 : use crate::scheduler::test_utils::make_test_nodes;
1558 :
1559 : use super::*;
1560 :
1561 7 : fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
1562 7 : let tenant_id = TenantId::generate();
1563 7 : let shard_number = ShardNumber(0);
1564 7 : let shard_count = ShardCount::new(1);
1565 7 :
1566 7 : let tenant_shard_id = TenantShardId {
1567 7 : tenant_id,
1568 7 : shard_number,
1569 7 : shard_count,
1570 7 : };
1571 7 : TenantShard::new(
1572 7 : tenant_shard_id,
1573 7 : ShardIdentity::new(
1574 7 : shard_number,
1575 7 : shard_count,
1576 7 : pageserver_api::shard::ShardStripeSize(32768),
1577 7 : )
1578 7 : .unwrap(),
1579 7 : policy,
1580 7 : None,
1581 7 : )
1582 7 : }
1583 :
1584 5003 : pub(crate) fn make_test_tenant(
1585 5003 : policy: PlacementPolicy,
1586 5003 : shard_count: ShardCount,
1587 5003 : preferred_az: Option<AvailabilityZone>,
1588 5003 : ) -> Vec<TenantShard> {
1589 5003 : make_test_tenant_with_id(TenantId::generate(), policy, shard_count, preferred_az)
1590 5003 : }
1591 :
1592 5006 : pub(crate) fn make_test_tenant_with_id(
1593 5006 : tenant_id: TenantId,
1594 5006 : policy: PlacementPolicy,
1595 5006 : shard_count: ShardCount,
1596 5006 : preferred_az: Option<AvailabilityZone>,
1597 5006 : ) -> Vec<TenantShard> {
1598 5006 : (0..shard_count.count())
1599 12518 : .map(|i| {
1600 12518 : let shard_number = ShardNumber(i);
1601 12518 :
1602 12518 : let tenant_shard_id = TenantShardId {
1603 12518 : tenant_id,
1604 12518 : shard_number,
1605 12518 : shard_count,
1606 12518 : };
1607 12518 : TenantShard::new(
1608 12518 : tenant_shard_id,
1609 12518 : ShardIdentity::new(
1610 12518 : shard_number,
1611 12518 : shard_count,
1612 12518 : pageserver_api::shard::ShardStripeSize(32768),
1613 12518 : )
1614 12518 : .unwrap(),
1615 12518 : policy.clone(),
1616 12518 : preferred_az.clone(),
1617 12518 : )
1618 12518 : })
1619 5006 : .collect()
1620 5006 : }
1621 :
1622 : /// Test the scheduling behaviors used when a tenant configured for HA is subject
1623 : /// to nodes being marked offline.
1624 : #[test]
1625 1 : fn tenant_ha_scheduling() -> anyhow::Result<()> {
1626 1 : // Start with three nodes. Our tenant will only use two. The third one is
1627 1 : // expected to remain unused.
1628 1 : let mut nodes = make_test_nodes(3, &[]);
1629 1 :
1630 1 : let mut scheduler = Scheduler::new(nodes.values());
1631 1 : let mut context = ScheduleContext::default();
1632 1 :
1633 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1634 1 : tenant_shard
1635 1 : .schedule(&mut scheduler, &mut context)
1636 1 : .expect("we have enough nodes, scheduling should work");
1637 1 :
1638 1 : // Expect to initially be schedule on to different nodes
1639 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
1640 1 : assert!(tenant_shard.intent.attached.is_some());
1641 :
1642 1 : let attached_node_id = tenant_shard.intent.attached.unwrap();
1643 1 : let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
1644 1 : assert_ne!(attached_node_id, secondary_node_id);
1645 :
1646 : // Notifying the attached node is offline should demote it to a secondary
1647 1 : let changed = tenant_shard
1648 1 : .intent
1649 1 : .demote_attached(&mut scheduler, attached_node_id);
1650 1 : assert!(changed);
1651 1 : assert!(tenant_shard.intent.attached.is_none());
1652 1 : assert_eq!(tenant_shard.intent.secondary.len(), 2);
1653 :
1654 : // Update the scheduler state to indicate the node is offline
1655 1 : nodes
1656 1 : .get_mut(&attached_node_id)
1657 1 : .unwrap()
1658 1 : .set_availability(NodeAvailability::Offline);
1659 1 : scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
1660 1 :
1661 1 : // Scheduling the node should promote the still-available secondary node to attached
1662 1 : tenant_shard
1663 1 : .schedule(&mut scheduler, &mut context)
1664 1 : .expect("active nodes are available");
1665 1 : assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
1666 :
1667 : // The original attached node should have been retained as a secondary
1668 1 : assert_eq!(
1669 1 : *tenant_shard.intent.secondary.iter().last().unwrap(),
1670 1 : attached_node_id
1671 1 : );
1672 :
1673 1 : tenant_shard.intent.clear(&mut scheduler);
1674 1 :
1675 1 : Ok(())
1676 1 : }
1677 :
1678 : #[test]
1679 1 : fn intent_from_observed() -> anyhow::Result<()> {
1680 1 : let nodes = make_test_nodes(3, &[]);
1681 1 : let mut scheduler = Scheduler::new(nodes.values());
1682 1 :
1683 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1684 1 :
1685 1 : tenant_shard.observed.locations.insert(
1686 1 : NodeId(3),
1687 1 : ObservedStateLocation {
1688 1 : conf: Some(LocationConfig {
1689 1 : mode: LocationConfigMode::AttachedMulti,
1690 1 : generation: Some(2),
1691 1 : secondary_conf: None,
1692 1 : shard_number: tenant_shard.shard.number.0,
1693 1 : shard_count: tenant_shard.shard.count.literal(),
1694 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1695 1 : tenant_conf: TenantConfig::default(),
1696 1 : }),
1697 1 : },
1698 1 : );
1699 1 :
1700 1 : tenant_shard.observed.locations.insert(
1701 1 : NodeId(2),
1702 1 : ObservedStateLocation {
1703 1 : conf: Some(LocationConfig {
1704 1 : mode: LocationConfigMode::AttachedStale,
1705 1 : generation: Some(1),
1706 1 : secondary_conf: None,
1707 1 : shard_number: tenant_shard.shard.number.0,
1708 1 : shard_count: tenant_shard.shard.count.literal(),
1709 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1710 1 : tenant_conf: TenantConfig::default(),
1711 1 : }),
1712 1 : },
1713 1 : );
1714 1 :
1715 1 : tenant_shard.intent_from_observed(&mut scheduler);
1716 1 :
1717 1 : // The highest generationed attached location gets used as attached
1718 1 : assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
1719 : // Other locations get used as secondary
1720 1 : assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
1721 :
1722 1 : scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
1723 :
1724 1 : tenant_shard.intent.clear(&mut scheduler);
1725 1 : Ok(())
1726 1 : }
1727 :
1728 : #[test]
1729 1 : fn scheduling_mode() -> anyhow::Result<()> {
1730 1 : let nodes = make_test_nodes(3, &[]);
1731 1 : let mut scheduler = Scheduler::new(nodes.values());
1732 1 :
1733 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1734 1 :
1735 1 : // In pause mode, schedule() shouldn't do anything
1736 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
1737 1 : assert!(tenant_shard
1738 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
1739 1 : .is_ok());
1740 1 : assert!(tenant_shard.intent.all_pageservers().is_empty());
1741 :
1742 : // In active mode, schedule() works
1743 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
1744 1 : assert!(tenant_shard
1745 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
1746 1 : .is_ok());
1747 1 : assert!(!tenant_shard.intent.all_pageservers().is_empty());
1748 :
1749 1 : tenant_shard.intent.clear(&mut scheduler);
1750 1 : Ok(())
1751 1 : }
1752 :
1753 : #[test]
1754 1 : fn optimize_attachment() -> anyhow::Result<()> {
1755 1 : let nodes = make_test_nodes(3, &[]);
1756 1 : let mut scheduler = Scheduler::new(nodes.values());
1757 1 :
1758 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
1759 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
1760 1 :
1761 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
1762 1 : // on different nodes.
1763 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1764 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
1765 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1766 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
1767 1 :
1768 1 : let mut schedule_context = ScheduleContext::default();
1769 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
1770 1 : schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
1771 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
1772 1 : schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
1773 1 :
1774 1 : let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
1775 1 :
1776 1 : // Either shard should recognize that it has the option to switch to a secondary location where there
1777 1 : // would be no other shards from the same tenant, and request to do so.
1778 1 : assert_eq!(
1779 1 : optimization_a,
1780 1 : Some(ScheduleOptimization {
1781 1 : sequence: shard_a.sequence,
1782 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1783 1 : old_attached_node_id: NodeId(1),
1784 1 : new_attached_node_id: NodeId(2)
1785 1 : })
1786 1 : })
1787 1 : );
1788 :
1789 : // Note that these optimizing two shards in the same tenant with the same ScheduleContext is
1790 : // mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
1791 : // of [`Service::optimize_all`] to avoid trying
1792 : // to do optimizations for multiple shards in the same tenant at the same time. Generating
1793 : // both optimizations is just done for test purposes
1794 1 : let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
1795 1 : assert_eq!(
1796 1 : optimization_b,
1797 1 : Some(ScheduleOptimization {
1798 1 : sequence: shard_b.sequence,
1799 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1800 1 : old_attached_node_id: NodeId(1),
1801 1 : new_attached_node_id: NodeId(3)
1802 1 : })
1803 1 : })
1804 1 : );
1805 :
1806 : // Applying these optimizations should result in the end state proposed
1807 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
1808 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
1809 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
1810 1 : shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
1811 1 : assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
1812 1 : assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
1813 :
1814 1 : shard_a.intent.clear(&mut scheduler);
1815 1 : shard_b.intent.clear(&mut scheduler);
1816 1 :
1817 1 : Ok(())
1818 1 : }
1819 :
1820 : #[test]
1821 1 : fn optimize_secondary() -> anyhow::Result<()> {
1822 1 : let nodes = make_test_nodes(4, &[]);
1823 1 : let mut scheduler = Scheduler::new(nodes.values());
1824 1 :
1825 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
1826 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
1827 1 :
1828 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
1829 1 : // on different nodes.
1830 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1831 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
1832 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
1833 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
1834 1 :
1835 1 : let mut schedule_context = ScheduleContext::default();
1836 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
1837 1 : schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
1838 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
1839 1 : schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
1840 1 :
1841 1 : let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);
1842 1 :
1843 1 : // Since there is a node with no locations available, the node with two locations for the
1844 1 : // same tenant should generate an optimization to move one away
1845 1 : assert_eq!(
1846 1 : optimization_a,
1847 1 : Some(ScheduleOptimization {
1848 1 : sequence: shard_a.sequence,
1849 1 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
1850 1 : old_node_id: NodeId(3),
1851 1 : new_node_id: NodeId(4)
1852 1 : })
1853 1 : })
1854 1 : );
1855 :
1856 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
1857 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
1858 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
1859 :
1860 1 : shard_a.intent.clear(&mut scheduler);
1861 1 : shard_b.intent.clear(&mut scheduler);
1862 1 :
1863 1 : Ok(())
1864 1 : }
1865 :
1866 : // Optimize til quiescent: this emulates what Service::optimize_all does, when
1867 : // called repeatedly in the background.
1868 : // Returns the applied optimizations
1869 3 : fn optimize_til_idle(
1870 3 : nodes: &HashMap<NodeId, Node>,
1871 3 : scheduler: &mut Scheduler,
1872 3 : shards: &mut [TenantShard],
1873 3 : ) -> Vec<ScheduleOptimization> {
1874 3 : let mut loop_n = 0;
1875 3 : let mut optimizations = Vec::default();
1876 : loop {
1877 9 : let mut schedule_context = ScheduleContext::default();
1878 9 : let mut any_changed = false;
1879 :
1880 36 : for shard in shards.iter() {
1881 36 : schedule_context.avoid(&shard.intent.all_pageservers());
1882 36 : if let Some(attached) = shard.intent.get_attached() {
1883 36 : schedule_context.push_attached(*attached);
1884 36 : }
1885 : }
1886 :
1887 21 : for shard in shards.iter_mut() {
1888 21 : let optimization = shard.optimize_attachment(nodes, &schedule_context);
1889 21 : if let Some(optimization) = optimization {
1890 2 : optimizations.push(optimization.clone());
1891 2 : shard.apply_optimization(scheduler, optimization);
1892 2 : any_changed = true;
1893 2 : break;
1894 19 : }
1895 19 :
1896 19 : let optimization = shard.optimize_secondary(scheduler, &schedule_context);
1897 19 : if let Some(optimization) = optimization {
1898 4 : optimizations.push(optimization.clone());
1899 4 : shard.apply_optimization(scheduler, optimization);
1900 4 : any_changed = true;
1901 4 : break;
1902 15 : }
1903 : }
1904 :
1905 9 : if !any_changed {
1906 3 : break;
1907 6 : }
1908 6 :
1909 6 : // Assert no infinite loop
1910 6 : loop_n += 1;
1911 6 : assert!(loop_n < 1000);
1912 : }
1913 :
1914 3 : optimizations
1915 3 : }
1916 :
1917 : /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
1918 : /// that it converges.
1919 : #[test]
1920 1 : fn optimize_add_nodes() -> anyhow::Result<()> {
1921 1 : let nodes = make_test_nodes(4, &[]);
1922 1 :
1923 1 : // Only show the scheduler a couple of nodes
1924 1 : let mut scheduler = Scheduler::new([].iter());
1925 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1926 1 : scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
1927 1 :
1928 1 : let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
1929 1 : let mut schedule_context = ScheduleContext::default();
1930 5 : for shard in &mut shards {
1931 4 : assert!(shard
1932 4 : .schedule(&mut scheduler, &mut schedule_context)
1933 4 : .is_ok());
1934 : }
1935 :
1936 : // We should see equal number of locations on the two nodes.
1937 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
1938 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
1939 :
1940 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
1941 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
1942 :
1943 : // Add another two nodes: we should see the shards spread out when their optimize
1944 : // methods are called
1945 1 : scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
1946 1 : scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
1947 1 : optimize_til_idle(&nodes, &mut scheduler, &mut shards);
1948 1 :
1949 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
1950 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
1951 :
1952 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
1953 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
1954 :
1955 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
1956 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1);
1957 :
1958 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
1959 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1);
1960 :
1961 4 : for shard in shards.iter_mut() {
1962 4 : shard.intent.clear(&mut scheduler);
1963 4 : }
1964 :
1965 1 : Ok(())
1966 1 : }
1967 :
1968 : /// Test that initial shard scheduling is optimal. By optimal we mean
1969 : /// that the optimizer cannot find a way to improve it.
1970 : ///
1971 : /// This test is an example of the scheduling issue described in
1972 : /// https://github.com/neondatabase/neon/issues/8969
1973 : #[test]
1974 1 : fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
1975 : use itertools::Itertools;
1976 :
1977 1 : let nodes = make_test_nodes(2, &[]);
1978 1 :
1979 1 : let mut scheduler = Scheduler::new([].iter());
1980 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1981 1 : scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
1982 1 :
1983 1 : let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
1984 1 : let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
1985 1 :
1986 1 : let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
1987 1 : let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
1988 1 :
1989 4 : let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
1990 4 : let b_shards_with_context = b.iter_mut().map(|shard| (shard, b_context.clone()));
1991 1 :
1992 1 : let schedule_order = a_shards_with_context.interleave(b_shards_with_context);
1993 :
1994 9 : for (shard, context) in schedule_order {
1995 8 : let context = &mut *context.borrow_mut();
1996 8 : shard.schedule(&mut scheduler, context).unwrap();
1997 8 : }
1998 :
1999 1 : let applied_to_a = optimize_til_idle(&nodes, &mut scheduler, &mut a);
2000 1 : assert_eq!(applied_to_a, vec![]);
2001 :
2002 1 : let applied_to_b = optimize_til_idle(&nodes, &mut scheduler, &mut b);
2003 1 : assert_eq!(applied_to_b, vec![]);
2004 :
2005 8 : for shard in a.iter_mut().chain(b.iter_mut()) {
2006 8 : shard.intent.clear(&mut scheduler);
2007 8 : }
2008 :
2009 1 : Ok(())
2010 1 : }
2011 :
2012 : #[test]
2013 1 : fn random_az_shard_scheduling() -> anyhow::Result<()> {
2014 : use rand::seq::SliceRandom;
2015 :
2016 51 : for seed in 0..50 {
2017 50 : eprintln!("Running test with seed {seed}");
2018 50 : let mut rng = StdRng::seed_from_u64(seed);
2019 50 :
2020 50 : let az_a_tag = AvailabilityZone("az-a".to_string());
2021 50 : let az_b_tag = AvailabilityZone("az-b".to_string());
2022 50 : let azs = [az_a_tag, az_b_tag];
2023 50 : let nodes = make_test_nodes(4, &azs);
2024 50 : let mut shards_per_az: HashMap<AvailabilityZone, u32> = HashMap::new();
2025 50 :
2026 50 : let mut scheduler = Scheduler::new([].iter());
2027 200 : for node in nodes.values() {
2028 200 : scheduler.node_upsert(node);
2029 200 : }
2030 :
2031 50 : let mut shards = Vec::default();
2032 50 : let mut contexts = Vec::default();
2033 50 : let mut az_picker = azs.iter().cycle().cloned();
2034 5050 : for i in 0..100 {
2035 5000 : let az = az_picker.next().unwrap();
2036 5000 : let shard_count = i % 4 + 1;
2037 5000 : *shards_per_az.entry(az.clone()).or_default() += shard_count;
2038 5000 :
2039 5000 : let tenant_shards = make_test_tenant(
2040 5000 : PlacementPolicy::Attached(1),
2041 5000 : ShardCount::new(shard_count.try_into().unwrap()),
2042 5000 : Some(az),
2043 5000 : );
2044 5000 : let context = Rc::new(RefCell::new(ScheduleContext::default()));
2045 5000 :
2046 5000 : contexts.push(context.clone());
2047 5000 : let with_ctx = tenant_shards
2048 5000 : .into_iter()
2049 12500 : .map(|shard| (shard, context.clone()));
2050 17500 : for shard_with_ctx in with_ctx {
2051 12500 : shards.push(shard_with_ctx);
2052 12500 : }
2053 : }
2054 :
2055 50 : shards.shuffle(&mut rng);
2056 :
2057 : #[derive(Default, Debug)]
2058 : struct NodeStats {
2059 : attachments: u32,
2060 : secondaries: u32,
2061 : }
2062 :
2063 50 : let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::default();
2064 50 : let mut attachments_in_wrong_az = 0;
2065 50 : let mut secondaries_in_wrong_az = 0;
2066 :
2067 12550 : for (shard, context) in &mut shards {
2068 12500 : let context = &mut *context.borrow_mut();
2069 12500 : shard.schedule(&mut scheduler, context).unwrap();
2070 12500 :
2071 12500 : let attached_node = shard.intent.get_attached().unwrap();
2072 12500 : let stats = node_stats.entry(attached_node).or_default();
2073 12500 : stats.attachments += 1;
2074 12500 :
2075 12500 : let secondary_node = *shard.intent.get_secondary().first().unwrap();
2076 12500 : let stats = node_stats.entry(secondary_node).or_default();
2077 12500 : stats.secondaries += 1;
2078 12500 :
2079 12500 : let attached_node_az = nodes
2080 12500 : .get(&attached_node)
2081 12500 : .unwrap()
2082 12500 : .get_availability_zone_id();
2083 12500 : let secondary_node_az = nodes
2084 12500 : .get(&secondary_node)
2085 12500 : .unwrap()
2086 12500 : .get_availability_zone_id();
2087 12500 : let preferred_az = shard.preferred_az().unwrap();
2088 12500 :
2089 12500 : if attached_node_az != preferred_az {
2090 0 : eprintln!(
2091 0 : "{} attachment was scheduled in AZ {} but preferred AZ {}",
2092 0 : shard.tenant_shard_id, attached_node_az, preferred_az
2093 0 : );
2094 0 : attachments_in_wrong_az += 1;
2095 12500 : }
2096 :
2097 12500 : if secondary_node_az == preferred_az {
2098 0 : eprintln!(
2099 0 : "{} secondary was scheduled in AZ {} which matches preference",
2100 0 : shard.tenant_shard_id, attached_node_az
2101 0 : );
2102 0 : secondaries_in_wrong_az += 1;
2103 12500 : }
2104 : }
2105 :
2106 50 : let mut violations = Vec::default();
2107 50 :
2108 50 : if attachments_in_wrong_az > 0 {
2109 0 : violations.push(format!(
2110 0 : "{} attachments scheduled to the incorrect AZ",
2111 0 : attachments_in_wrong_az
2112 0 : ));
2113 50 : }
2114 :
2115 50 : if secondaries_in_wrong_az > 0 {
2116 0 : violations.push(format!(
2117 0 : "{} secondaries scheduled to the incorrect AZ",
2118 0 : secondaries_in_wrong_az
2119 0 : ));
2120 50 : }
2121 :
2122 50 : eprintln!(
2123 50 : "attachments_in_wrong_az={} secondaries_in_wrong_az={}",
2124 50 : attachments_in_wrong_az, secondaries_in_wrong_az
2125 50 : );
2126 :
2127 250 : for (node_id, stats) in &node_stats {
2128 200 : let node_az = nodes.get(node_id).unwrap().get_availability_zone_id();
2129 200 : let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2;
2130 200 : let allowed_attachment_load =
2131 200 : (ideal_attachment_load - 1)..(ideal_attachment_load + 2);
2132 200 :
2133 200 : if !allowed_attachment_load.contains(&stats.attachments) {
2134 0 : violations.push(format!(
2135 0 : "Found {} attachments on node {}, but expected {}",
2136 0 : stats.attachments, node_id, ideal_attachment_load
2137 0 : ));
2138 200 : }
2139 :
2140 200 : eprintln!(
2141 200 : "{}: attachments={} secondaries={} ideal_attachment_load={}",
2142 200 : node_id, stats.attachments, stats.secondaries, ideal_attachment_load
2143 200 : );
2144 : }
2145 :
2146 50 : assert!(violations.is_empty(), "{violations:?}");
2147 :
2148 12550 : for (mut shard, _ctx) in shards {
2149 12500 : shard.intent.clear(&mut scheduler);
2150 12500 : }
2151 : }
2152 1 : Ok(())
2153 1 : }
2154 : }
|