Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : sync::Arc,
4 : time::Duration,
5 : };
6 :
7 : use crate::{
8 : metrics::{
9 : self, ReconcileCompleteLabelGroup, ReconcileLongRunningLabelGroup, ReconcileOutcome,
10 : },
11 : persistence::TenantShardPersistence,
12 : reconciler::{ReconcileUnits, ReconcilerConfig},
13 : scheduler::{
14 : AffinityScore, AttachedShardTag, MaySchedule, RefCountUpdate, ScheduleContext,
15 : SecondaryShardTag,
16 : },
17 : service::ReconcileResultRequest,
18 : };
19 : use futures::future::{self, Either};
20 : use itertools::Itertools;
21 : use pageserver_api::controller_api::{
22 : AvailabilityZone, NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
23 : };
24 : use pageserver_api::{
25 : models::{LocationConfig, LocationConfigMode, TenantConfig},
26 : shard::{ShardIdentity, TenantShardId},
27 : };
28 : use serde::{Deserialize, Serialize};
29 : use tokio::task::JoinHandle;
30 : use tokio_util::sync::CancellationToken;
31 : use tracing::{instrument, Instrument};
32 : use utils::{
33 : generation::Generation,
34 : id::NodeId,
35 : seqwait::{SeqWait, SeqWaitError},
36 : sync::gate::GateGuard,
37 : };
38 :
39 : use crate::{
40 : compute_hook::ComputeHook,
41 : node::Node,
42 : persistence::{split_state::SplitState, Persistence},
43 : reconciler::{
44 : attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
45 : },
46 : scheduler::{ScheduleError, Scheduler},
47 : service, Sequence,
48 : };
49 :
50 : /// Serialization helper
51 0 : fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
52 0 : where
53 0 : S: serde::ser::Serializer,
54 0 : T: std::fmt::Display,
55 0 : {
56 0 : serializer.collect_str(
57 0 : &v.lock()
58 0 : .unwrap()
59 0 : .as_ref()
60 0 : .map(|e| format!("{e}"))
61 0 : .unwrap_or("".to_string()),
62 0 : )
63 0 : }
64 :
65 : /// In-memory state for a particular tenant shard.
66 : ///
67 : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
68 : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
69 0 : #[derive(Serialize)]
70 : pub(crate) struct TenantShard {
71 : pub(crate) tenant_shard_id: TenantShardId,
72 :
73 : pub(crate) shard: ShardIdentity,
74 :
75 : // Runtime only: sequence used to coordinate when updating this object while
76 : // with background reconcilers may be running. A reconciler runs to a particular
77 : // sequence.
78 : pub(crate) sequence: Sequence,
79 :
80 : // Latest generation number: next time we attach, increment this
81 : // and use the incremented number when attaching.
82 : //
83 : // None represents an incompletely onboarded tenant via the [`Service::location_config`]
84 : // API, where this tenant may only run in PlacementPolicy::Secondary.
85 : pub(crate) generation: Option<Generation>,
86 :
87 : // High level description of how the tenant should be set up. Provided
88 : // externally.
89 : pub(crate) policy: PlacementPolicy,
90 :
91 : // Low level description of exactly which pageservers should fulfil
92 : // which role. Generated by `Self::schedule`.
93 : pub(crate) intent: IntentState,
94 :
95 : // Low level description of how the tenant is configured on pageservers:
96 : // if this does not match `Self::intent` then the tenant needs reconciliation
97 : // with `Self::reconcile`.
98 : pub(crate) observed: ObservedState,
99 :
100 : // Tenant configuration, passed through opaquely to the pageserver. Identical
101 : // for all shards in a tenant.
102 : pub(crate) config: TenantConfig,
103 :
104 : /// If a reconcile task is currently in flight, it may be joined here (it is
105 : /// only safe to join if either the result has been received or the reconciler's
106 : /// cancellation token has been fired)
107 : #[serde(skip)]
108 : pub(crate) reconciler: Option<ReconcilerHandle>,
109 :
110 : /// If a tenant is being split, then all shards with that TenantId will have a
111 : /// SplitState set, this acts as a guard against other operations such as background
112 : /// reconciliation, and timeline creation.
113 : pub(crate) splitting: SplitState,
114 :
115 : /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
116 : /// is set. This flag is cleared when the tenant is popped off the delay queue.
117 : pub(crate) delayed_reconcile: bool,
118 :
119 : /// Optionally wait for reconciliation to complete up to a particular
120 : /// sequence number.
121 : #[serde(skip)]
122 : pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
123 :
124 : /// Indicates sequence number for which we have encountered an error reconciling. If
125 : /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
126 : /// and callers should stop waiting for `waiter` and propagate the error.
127 : #[serde(skip)]
128 : pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
129 :
130 : /// The most recent error from a reconcile on this tenant. This is a nested Arc
131 : /// because:
132 : /// - ReconcileWaiters need to Arc-clone the overall object to read it later
133 : /// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
134 : /// many waiters for one shard, and the underlying error types are not Clone.
135 : ///
136 : /// TODO: generalize to an array of recent events
137 : /// TOOD: use a ArcSwap instead of mutex for faster reads?
138 : #[serde(serialize_with = "read_last_error")]
139 : pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
140 :
141 : /// If we have a pending compute notification that for some reason we weren't able to send,
142 : /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
143 : /// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
144 : /// of state that we publish externally in an eventually consistent way.
145 : pub(crate) pending_compute_notification: bool,
146 :
147 : // Support/debug tool: if something is going wrong or flapping with scheduling, this may
148 : // be set to a non-active state to avoid making changes while the issue is fixed.
149 : scheduling_policy: ShardSchedulingPolicy,
150 :
151 : // We should attempt to schedule this shard in the provided AZ to
152 : // decrease chances of cross-AZ compute.
153 : preferred_az_id: Option<AvailabilityZone>,
154 : }
155 :
156 : #[derive(Default, Clone, Debug, Serialize)]
157 : pub(crate) struct IntentState {
158 : attached: Option<NodeId>,
159 : secondary: Vec<NodeId>,
160 : }
161 :
162 : impl IntentState {
163 19 : pub(crate) fn new() -> Self {
164 19 : Self {
165 19 : attached: None,
166 19 : secondary: vec![],
167 19 : }
168 19 : }
169 0 : pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
170 0 : if let Some(node_id) = node_id {
171 0 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach);
172 0 : }
173 0 : Self {
174 0 : attached: node_id,
175 0 : secondary: vec![],
176 0 : }
177 0 : }
178 :
179 12544 : pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
180 12544 : if self.attached != new_attached {
181 12544 : if let Some(old_attached) = self.attached.take() {
182 0 : scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
183 12544 : }
184 12544 : if let Some(new_attached) = &new_attached {
185 12544 : scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach);
186 12544 : }
187 12544 : self.attached = new_attached;
188 0 : }
189 12544 : }
190 :
191 : /// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
192 : /// secondary to attached while maintaining the scheduler's reference counts.
193 5 : pub(crate) fn promote_attached(
194 5 : &mut self,
195 5 : scheduler: &mut Scheduler,
196 5 : promote_secondary: NodeId,
197 5 : ) {
198 5 : // If we call this with a node that isn't in secondary, it would cause incorrect
199 5 : // scheduler reference counting, since we assume the node is already referenced as a secondary.
200 5 : debug_assert!(self.secondary.contains(&promote_secondary));
201 :
202 10 : self.secondary.retain(|n| n != &promote_secondary);
203 5 :
204 5 : let demoted = self.attached;
205 5 : self.attached = Some(promote_secondary);
206 5 :
207 5 : scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary);
208 5 : if let Some(demoted) = demoted {
209 0 : scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached);
210 5 : }
211 5 : }
212 :
213 12531 : pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
214 12531 : debug_assert!(!self.secondary.contains(&new_secondary));
215 12531 : scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary);
216 12531 : self.secondary.push(new_secondary);
217 12531 : }
218 :
219 : /// It is legal to call this with a node that is not currently a secondary: that is a no-op
220 5 : pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
221 5 : let index = self.secondary.iter().position(|n| *n == node_id);
222 5 : if let Some(index) = index {
223 5 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
224 5 : self.secondary.remove(index);
225 5 : }
226 5 : }
227 :
228 12543 : pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
229 12543 : for secondary in self.secondary.drain(..) {
230 12526 : scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary);
231 12526 : }
232 12543 : }
233 :
234 : /// Remove the last secondary node from the list of secondaries
235 0 : pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
236 0 : if let Some(node_id) = self.secondary.pop() {
237 0 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
238 0 : }
239 0 : }
240 :
241 12543 : pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
242 12543 : if let Some(old_attached) = self.attached.take() {
243 12543 : scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
244 12543 : }
245 :
246 12543 : self.clear_secondary(scheduler);
247 12543 : }
248 :
249 12614 : pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
250 12614 : let mut result = Vec::new();
251 12614 : if let Some(p) = self.attached {
252 12612 : result.push(p)
253 2 : }
254 :
255 12614 : result.extend(self.secondary.iter().copied());
256 12614 :
257 12614 : result
258 12614 : }
259 :
260 25095 : pub(crate) fn get_attached(&self) -> &Option<NodeId> {
261 25095 : &self.attached
262 25095 : }
263 :
264 12524 : pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
265 12524 : &self.secondary
266 12524 : }
267 :
268 : /// If the node is in use as the attached location, demote it into
269 : /// the list of secondary locations. This is used when a node goes offline,
270 : /// and we want to use a different node for attachment, but not permanently
271 : /// forget the location on the offline node.
272 : ///
273 : /// Returns true if a change was made
274 5 : pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
275 5 : if self.attached == Some(node_id) {
276 5 : self.attached = None;
277 5 : self.secondary.push(node_id);
278 5 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached);
279 5 : true
280 : } else {
281 0 : false
282 : }
283 5 : }
284 : }
285 :
286 : impl Drop for IntentState {
287 12544 : fn drop(&mut self) {
288 12544 : // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
289 12544 : // We do not check this while panicking, to avoid polluting unit test failures or
290 12544 : // other assertions with this assertion's output. It's still wrong to leak these,
291 12544 : // but if we already have a panic then we don't need to independently flag this case.
292 12544 : if !(std::thread::panicking()) {
293 12544 : debug_assert!(self.attached.is_none() && self.secondary.is_empty());
294 0 : }
295 12543 : }
296 : }
297 :
298 0 : #[derive(Default, Clone, Serialize, Deserialize, Debug)]
299 : pub(crate) struct ObservedState {
300 : pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
301 : }
302 :
303 : /// Our latest knowledge of how this tenant is configured in the outside world.
304 : ///
305 : /// Meaning:
306 : /// * No instance of this type exists for a node: we are certain that we have nothing configured on that
307 : /// node for this shard.
308 : /// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
309 : /// what it is (e.g. we failed partway through configuring it)
310 : /// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
311 : /// and that configuration will still be present unless something external interfered.
312 0 : #[derive(Clone, Serialize, Deserialize, Debug)]
313 : pub(crate) struct ObservedStateLocation {
314 : /// If None, it means we do not know the status of this shard's location on this node, but
315 : /// we know that we might have some state on this node.
316 : pub(crate) conf: Option<LocationConfig>,
317 : }
318 : pub(crate) struct ReconcilerWaiter {
319 : // For observability purposes, remember the ID of the shard we're
320 : // waiting for.
321 : pub(crate) tenant_shard_id: TenantShardId,
322 :
323 : seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
324 : error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
325 : error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
326 : seq: Sequence,
327 : }
328 :
329 : pub(crate) enum ReconcilerStatus {
330 : Done,
331 : Failed,
332 : InProgress,
333 : }
334 :
335 : #[derive(thiserror::Error, Debug)]
336 : pub(crate) enum ReconcileWaitError {
337 : #[error("Timeout waiting for shard {0}")]
338 : Timeout(TenantShardId),
339 : #[error("shutting down")]
340 : Shutdown,
341 : #[error("Reconcile error on shard {0}: {1}")]
342 : Failed(TenantShardId, Arc<ReconcileError>),
343 : }
344 :
345 : #[derive(Eq, PartialEq, Debug, Clone)]
346 : pub(crate) struct ReplaceSecondary {
347 : old_node_id: NodeId,
348 : new_node_id: NodeId,
349 : }
350 :
351 : #[derive(Eq, PartialEq, Debug, Clone)]
352 : pub(crate) struct MigrateAttachment {
353 : pub(crate) old_attached_node_id: NodeId,
354 : pub(crate) new_attached_node_id: NodeId,
355 : }
356 :
357 : #[derive(Eq, PartialEq, Debug, Clone)]
358 : pub(crate) enum ScheduleOptimizationAction {
359 : // Replace one of our secondary locations with a different node
360 : ReplaceSecondary(ReplaceSecondary),
361 : // Migrate attachment to an existing secondary location
362 : MigrateAttachment(MigrateAttachment),
363 : }
364 :
365 : #[derive(Eq, PartialEq, Debug, Clone)]
366 : pub(crate) struct ScheduleOptimization {
367 : // What was the reconcile sequence when we generated this optimization? The optimization
368 : // should only be applied if the shard's sequence is still at this value, in case other changes
369 : // happened between planning the optimization and applying it.
370 : sequence: Sequence,
371 :
372 : pub(crate) action: ScheduleOptimizationAction,
373 : }
374 :
375 : impl ReconcilerWaiter {
376 0 : pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
377 0 : tokio::select! {
378 0 : result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
379 0 : result.map_err(|e| match e {
380 0 : SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
381 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
382 0 : })?;
383 : },
384 0 : result = self.error_seq_wait.wait_for(self.seq) => {
385 0 : result.map_err(|e| match e {
386 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
387 0 : SeqWaitError::Timeout => unreachable!()
388 0 : })?;
389 :
390 0 : return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
391 0 : self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
392 : }
393 : }
394 :
395 0 : Ok(())
396 0 : }
397 :
398 0 : pub(crate) fn get_status(&self) -> ReconcilerStatus {
399 0 : if self.seq_wait.would_wait_for(self.seq).is_ok() {
400 0 : ReconcilerStatus::Done
401 0 : } else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
402 0 : ReconcilerStatus::Failed
403 : } else {
404 0 : ReconcilerStatus::InProgress
405 : }
406 0 : }
407 : }
408 :
409 : /// Having spawned a reconciler task, the tenant shard's state will carry enough
410 : /// information to optionally cancel & await it later.
411 : pub(crate) struct ReconcilerHandle {
412 : sequence: Sequence,
413 : handle: JoinHandle<()>,
414 : cancel: CancellationToken,
415 : }
416 :
417 : pub(crate) enum ReconcileNeeded {
418 : /// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
419 : /// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
420 : No,
421 : /// shard has a reconciler running, and its intent hasn't changed since that one was
422 : /// spawned: wait for the existing reconciler rather than spawning a new one.
423 : WaitExisting(ReconcilerWaiter),
424 : /// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
425 : Yes,
426 : }
427 :
428 : /// Pending modification to the observed state of a tenant shard.
429 : /// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
430 : pub(crate) enum ObservedStateDelta {
431 : Upsert(Box<(NodeId, ObservedStateLocation)>),
432 : Delete(NodeId),
433 : }
434 :
435 : impl ObservedStateDelta {
436 0 : pub(crate) fn node_id(&self) -> &NodeId {
437 0 : match self {
438 0 : Self::Upsert(up) => &up.0,
439 0 : Self::Delete(nid) => nid,
440 : }
441 0 : }
442 : }
443 :
444 : /// When a reconcile task completes, it sends this result object
445 : /// to be applied to the primary TenantShard.
446 : pub(crate) struct ReconcileResult {
447 : pub(crate) sequence: Sequence,
448 : /// On errors, `observed` should be treated as an incompleted description
449 : /// of state (i.e. any nodes present in the result should override nodes
450 : /// present in the parent tenant state, but any unmentioned nodes should
451 : /// not be removed from parent tenant state)
452 : pub(crate) result: Result<(), ReconcileError>,
453 :
454 : pub(crate) tenant_shard_id: TenantShardId,
455 : pub(crate) generation: Option<Generation>,
456 : pub(crate) observed_deltas: Vec<ObservedStateDelta>,
457 :
458 : /// Set [`TenantShard::pending_compute_notification`] from this flag
459 : pub(crate) pending_compute_notification: bool,
460 : }
461 :
462 : impl ObservedState {
463 0 : pub(crate) fn new() -> Self {
464 0 : Self {
465 0 : locations: HashMap::new(),
466 0 : }
467 0 : }
468 : }
469 :
470 : impl TenantShard {
471 12525 : pub(crate) fn new(
472 12525 : tenant_shard_id: TenantShardId,
473 12525 : shard: ShardIdentity,
474 12525 : policy: PlacementPolicy,
475 12525 : preferred_az_id: Option<AvailabilityZone>,
476 12525 : ) -> Self {
477 12525 : metrics::METRICS_REGISTRY
478 12525 : .metrics_group
479 12525 : .storage_controller_tenant_shards
480 12525 : .inc();
481 12525 :
482 12525 : Self {
483 12525 : tenant_shard_id,
484 12525 : policy,
485 12525 : intent: IntentState::default(),
486 12525 : generation: Some(Generation::new(0)),
487 12525 : shard,
488 12525 : observed: ObservedState::default(),
489 12525 : config: TenantConfig::default(),
490 12525 : reconciler: None,
491 12525 : splitting: SplitState::Idle,
492 12525 : sequence: Sequence(1),
493 12525 : delayed_reconcile: false,
494 12525 : waiter: Arc::new(SeqWait::new(Sequence(0))),
495 12525 : error_waiter: Arc::new(SeqWait::new(Sequence(0))),
496 12525 : last_error: Arc::default(),
497 12525 : pending_compute_notification: false,
498 12525 : scheduling_policy: ShardSchedulingPolicy::default(),
499 12525 : preferred_az_id,
500 12525 : }
501 12525 : }
502 :
503 : /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
504 : /// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
505 : /// to get an intent state that complies with placement policy. The overall goal is to do scheduling
506 : /// in a way that makes use of any configured locations that already exist in the outside world.
507 1 : pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
508 1 : // Choose an attached location by filtering observed locations, and then sorting to get the highest
509 1 : // generation
510 1 : let mut attached_locs = self
511 1 : .observed
512 1 : .locations
513 1 : .iter()
514 2 : .filter_map(|(node_id, l)| {
515 2 : if let Some(conf) = &l.conf {
516 2 : if conf.mode == LocationConfigMode::AttachedMulti
517 1 : || conf.mode == LocationConfigMode::AttachedSingle
518 1 : || conf.mode == LocationConfigMode::AttachedStale
519 : {
520 2 : Some((node_id, conf.generation))
521 : } else {
522 0 : None
523 : }
524 : } else {
525 0 : None
526 : }
527 2 : })
528 1 : .collect::<Vec<_>>();
529 1 :
530 2 : attached_locs.sort_by_key(|i| i.1);
531 1 : if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
532 1 : self.intent.set_attached(scheduler, Some(*node_id));
533 1 : }
534 :
535 : // All remaining observed locations generate secondary intents. This includes None
536 : // observations, as these may well have some local content on disk that is usable (this
537 : // is an edge case that might occur if we restarted during a migration or other change)
538 : //
539 : // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
540 : // will take care of promoting one of these secondaries to be attached.
541 2 : self.observed.locations.keys().for_each(|node_id| {
542 2 : if Some(*node_id) != self.intent.attached {
543 1 : self.intent.push_secondary(scheduler, *node_id);
544 1 : }
545 2 : });
546 1 : }
547 :
548 : /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
549 : /// attached pageserver for a shard.
550 : ///
551 : /// Returns whether we modified it, and the NodeId selected.
552 12521 : fn schedule_attached(
553 12521 : &mut self,
554 12521 : scheduler: &mut Scheduler,
555 12521 : context: &ScheduleContext,
556 12521 : ) -> Result<(bool, NodeId), ScheduleError> {
557 : // No work to do if we already have an attached tenant
558 12521 : if let Some(node_id) = self.intent.attached {
559 0 : return Ok((false, node_id));
560 12521 : }
561 :
562 12521 : if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
563 : // Promote a secondary
564 1 : tracing::debug!("Promoted secondary {} to attached", promote_secondary);
565 1 : self.intent.promote_attached(scheduler, promote_secondary);
566 1 : Ok((true, promote_secondary))
567 : } else {
568 : // Pick a fresh node: either we had no secondaries or none were schedulable
569 12520 : let node_id = scheduler.schedule_shard::<AttachedShardTag>(
570 12520 : &self.intent.secondary,
571 12520 : &self.preferred_az_id,
572 12520 : context,
573 12520 : )?;
574 12520 : tracing::debug!("Selected {} as attached", node_id);
575 12520 : self.intent.set_attached(scheduler, Some(node_id));
576 12520 : Ok((true, node_id))
577 : }
578 12521 : }
579 :
580 12522 : #[instrument(skip_all, fields(
581 : tenant_id=%self.tenant_shard_id.tenant_id,
582 : shard_id=%self.tenant_shard_id.shard_slug(),
583 : sequence=%self.sequence
584 12522 : ))]
585 : pub(crate) fn schedule(
586 : &mut self,
587 : scheduler: &mut Scheduler,
588 : context: &mut ScheduleContext,
589 : ) -> Result<(), ScheduleError> {
590 : let r = self.do_schedule(scheduler, context);
591 :
592 : context.avoid(&self.intent.all_pageservers());
593 : if let Some(attached) = self.intent.get_attached() {
594 : context.push_attached(*attached);
595 : }
596 :
597 : r
598 : }
599 :
600 12522 : pub(crate) fn do_schedule(
601 12522 : &mut self,
602 12522 : scheduler: &mut Scheduler,
603 12522 : context: &ScheduleContext,
604 12522 : ) -> Result<(), ScheduleError> {
605 12522 : // TODO: before scheduling new nodes, check if any existing content in
606 12522 : // self.intent refers to pageservers that are offline, and pick other
607 12522 : // pageservers if so.
608 12522 :
609 12522 : // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
610 12522 : // change their attach location.
611 12522 :
612 12522 : match self.scheduling_policy {
613 12521 : ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
614 : ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
615 : // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
616 1 : tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
617 0 : "Scheduling is disabled by policy {:?}", self.scheduling_policy);
618 1 : return Ok(());
619 : }
620 : }
621 :
622 : // Build the set of pageservers already in use by this tenant, to avoid scheduling
623 : // more work on the same pageservers we're already using.
624 12521 : let mut modified = false;
625 :
626 : // Add/remove nodes to fulfil policy
627 : use PlacementPolicy::*;
628 12521 : match self.policy {
629 12521 : Attached(secondary_count) => {
630 12521 : let retain_secondaries = if self.intent.attached.is_none()
631 12521 : && scheduler.node_preferred(&self.intent.secondary).is_some()
632 : {
633 : // If we have no attached, and one of the secondaries is elegible to be promoted, retain
634 : // one more secondary than we usually would, as one of them will become attached futher down this function.
635 1 : secondary_count + 1
636 : } else {
637 12520 : secondary_count
638 : };
639 :
640 12521 : while self.intent.secondary.len() > retain_secondaries {
641 0 : // We have no particular preference for one secondary location over another: just
642 0 : // arbitrarily drop from the end
643 0 : self.intent.pop_secondary(scheduler);
644 0 : modified = true;
645 0 : }
646 :
647 : // Should have exactly one attached, and N secondaries
648 12521 : let (modified_attached, attached_node_id) =
649 12521 : self.schedule_attached(scheduler, context)?;
650 12521 : modified |= modified_attached;
651 12521 :
652 12521 : let mut used_pageservers = vec![attached_node_id];
653 25041 : while self.intent.secondary.len() < secondary_count {
654 12520 : let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
655 12520 : &used_pageservers,
656 12520 : &self.preferred_az_id,
657 12520 : context,
658 12520 : )?;
659 12520 : self.intent.push_secondary(scheduler, node_id);
660 12520 : used_pageservers.push(node_id);
661 12520 : modified = true;
662 : }
663 : }
664 : Secondary => {
665 0 : if let Some(node_id) = self.intent.get_attached() {
666 0 : // Populate secondary by demoting the attached node
667 0 : self.intent.demote_attached(scheduler, *node_id);
668 0 : modified = true;
669 0 : } else if self.intent.secondary.is_empty() {
670 : // Populate secondary by scheduling a fresh node
671 0 : let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
672 0 : &[],
673 0 : &self.preferred_az_id,
674 0 : context,
675 0 : )?;
676 0 : self.intent.push_secondary(scheduler, node_id);
677 0 : modified = true;
678 0 : }
679 0 : while self.intent.secondary.len() > 1 {
680 0 : // We have no particular preference for one secondary location over another: just
681 0 : // arbitrarily drop from the end
682 0 : self.intent.pop_secondary(scheduler);
683 0 : modified = true;
684 0 : }
685 : }
686 : Detached => {
687 : // Never add locations in this mode
688 0 : if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
689 0 : self.intent.clear(scheduler);
690 0 : modified = true;
691 0 : }
692 : }
693 : }
694 :
695 12521 : if modified {
696 12521 : self.sequence.0 += 1;
697 12521 : }
698 :
699 12521 : Ok(())
700 12522 : }
701 :
702 : /// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
703 : /// if the swap is not possible and leaves the intent state in its original state.
704 : ///
705 : /// Arguments:
706 : /// `attached_to`: the currently attached location matching the intent state (may be None if the
707 : /// shard is not attached)
708 : /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
709 : /// the scheduler to recommend a node
710 0 : pub(crate) fn reschedule_to_secondary(
711 0 : &mut self,
712 0 : promote_to: Option<NodeId>,
713 0 : scheduler: &mut Scheduler,
714 0 : ) -> Result<(), ScheduleError> {
715 0 : let promote_to = match promote_to {
716 0 : Some(node) => node,
717 0 : None => match scheduler.node_preferred(self.intent.get_secondary()) {
718 0 : Some(node) => node,
719 : None => {
720 0 : return Err(ScheduleError::ImpossibleConstraint);
721 : }
722 : },
723 : };
724 :
725 0 : assert!(self.intent.get_secondary().contains(&promote_to));
726 :
727 0 : if let Some(node) = self.intent.get_attached() {
728 0 : let demoted = self.intent.demote_attached(scheduler, *node);
729 0 : if !demoted {
730 0 : return Err(ScheduleError::ImpossibleConstraint);
731 0 : }
732 0 : }
733 :
734 0 : self.intent.promote_attached(scheduler, promote_to);
735 0 :
736 0 : // Increment the sequence number for the edge case where a
737 0 : // reconciler is already running to avoid waiting on the
738 0 : // current reconcile instead of spawning a new one.
739 0 : self.sequence = self.sequence.next();
740 0 :
741 0 : Ok(())
742 0 : }
743 :
744 : /// Optimize attachments: if a shard has a secondary location that is preferable to
745 : /// its primary location based on soft constraints, switch that secondary location
746 : /// to be attached.
747 23 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
748 : pub(crate) fn optimize_attachment(
749 : &self,
750 : nodes: &HashMap<NodeId, Node>,
751 : schedule_context: &ScheduleContext,
752 : ) -> Option<ScheduleOptimization> {
753 : let attached = (*self.intent.get_attached())?;
754 : if self.intent.secondary.is_empty() {
755 : // We can only do useful work if we have both attached and secondary locations: this
756 : // function doesn't schedule new locations, only swaps between attached and secondaries.
757 : return None;
758 : }
759 :
760 : let current_affinity_score = schedule_context.get_node_affinity(attached);
761 : let current_attachment_count = schedule_context.get_node_attachments(attached);
762 :
763 : // Generate score for each node, dropping any un-schedulable nodes.
764 : let all_pageservers = self.intent.all_pageservers();
765 : let mut scores = all_pageservers
766 : .iter()
767 46 : .flat_map(|node_id| {
768 46 : let node = nodes.get(node_id);
769 46 : if node.is_none() {
770 0 : None
771 46 : } else if matches!(
772 46 : node.unwrap().get_scheduling(),
773 : NodeSchedulingPolicy::Filling
774 : ) {
775 : // If the node is currently filling, don't count it as a candidate to avoid,
776 : // racing with the background fill.
777 0 : None
778 46 : } else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
779 0 : None
780 : } else {
781 46 : let affinity_score = schedule_context.get_node_affinity(*node_id);
782 46 : let attachment_count = schedule_context.get_node_attachments(*node_id);
783 46 : Some((*node_id, affinity_score, attachment_count))
784 : }
785 46 : })
786 : .collect::<Vec<_>>();
787 :
788 : // Sort precedence:
789 : // 1st - prefer nodes with the lowest total affinity score
790 : // 2nd - prefer nodes with the lowest number of attachments in this context
791 : // 3rd - if all else is equal, sort by node ID for determinism in tests.
792 46 : scores.sort_by_key(|i| (i.1, i.2, i.0));
793 :
794 : if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
795 : scores.first()
796 : {
797 : if attached != *preferred_node {
798 : // The best alternative must be more than 1 better than us, otherwise we could end
799 : // up flapping back next time we're called (e.g. there's no point migrating from
800 : // a location with score 1 to a score zero, because on next location the situation
801 : // would be the same, but in reverse).
802 : if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
803 : || current_attachment_count > *preferred_attachment_count + 1
804 : {
805 : tracing::info!(
806 : "Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
807 : self.intent.get_secondary()
808 : );
809 : return Some(ScheduleOptimization {
810 : sequence: self.sequence,
811 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
812 : old_attached_node_id: attached,
813 : new_attached_node_id: *preferred_node,
814 : }),
815 : });
816 : }
817 : } else {
818 : tracing::debug!(
819 : "Node {} is already preferred (score {:?})",
820 : preferred_node,
821 : preferred_affinity_score
822 : );
823 : }
824 : }
825 :
826 : // Fall-through: we didn't find an optimization
827 : None
828 : }
829 :
830 20 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
831 : pub(crate) fn optimize_secondary(
832 : &self,
833 : scheduler: &mut Scheduler,
834 : schedule_context: &ScheduleContext,
835 : ) -> Option<ScheduleOptimization> {
836 : if self.intent.secondary.is_empty() {
837 : // We can only do useful work if we have both attached and secondary locations: this
838 : // function doesn't schedule new locations, only swaps between attached and secondaries.
839 : return None;
840 : }
841 :
842 : for secondary in self.intent.get_secondary() {
843 : let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
844 : // We're already on a node unaffected any affinity constraints,
845 : // so we won't change it.
846 : continue;
847 : };
848 :
849 : // Let the scheduler suggest a node, where it would put us if we were scheduling afresh
850 : // This implicitly limits the choice to nodes that are available, and prefers nodes
851 : // with lower utilization.
852 : let Ok(candidate_node) = scheduler.schedule_shard::<SecondaryShardTag>(
853 : &self.intent.all_pageservers(),
854 : &self.preferred_az_id,
855 : schedule_context,
856 : ) else {
857 : // A scheduling error means we have no possible candidate replacements
858 : continue;
859 : };
860 :
861 : let candidate_affinity_score = schedule_context
862 : .nodes
863 : .get(&candidate_node)
864 : .unwrap_or(&AffinityScore::FREE);
865 :
866 : // The best alternative must be more than 1 better than us, otherwise we could end
867 : // up flapping back next time we're called.
868 : if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
869 : // If some other node is available and has a lower score than this node, then
870 : // that other node is a good place to migrate to.
871 : tracing::info!(
872 : "Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
873 : self.intent.get_secondary()
874 : );
875 : return Some(ScheduleOptimization {
876 : sequence: self.sequence,
877 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
878 : old_node_id: *secondary,
879 : new_node_id: candidate_node,
880 : }),
881 : });
882 : }
883 : }
884 :
885 : None
886 : }
887 :
888 : /// Return true if the optimization was really applied: it will not be applied if the optimization's
889 : /// sequence is behind this tenant shard's
890 9 : pub(crate) fn apply_optimization(
891 9 : &mut self,
892 9 : scheduler: &mut Scheduler,
893 9 : optimization: ScheduleOptimization,
894 9 : ) -> bool {
895 9 : if optimization.sequence != self.sequence {
896 0 : return false;
897 9 : }
898 9 :
899 9 : metrics::METRICS_REGISTRY
900 9 : .metrics_group
901 9 : .storage_controller_schedule_optimization
902 9 : .inc();
903 9 :
904 9 : match optimization.action {
905 : ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
906 4 : old_attached_node_id,
907 4 : new_attached_node_id,
908 4 : }) => {
909 4 : self.intent.demote_attached(scheduler, old_attached_node_id);
910 4 : self.intent
911 4 : .promote_attached(scheduler, new_attached_node_id);
912 4 : }
913 : ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
914 5 : old_node_id,
915 5 : new_node_id,
916 5 : }) => {
917 5 : self.intent.remove_secondary(scheduler, old_node_id);
918 5 : self.intent.push_secondary(scheduler, new_node_id);
919 5 : }
920 : }
921 :
922 9 : true
923 9 : }
924 :
925 : /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
926 : /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
927 : /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
928 : ///
929 : /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
930 : /// funciton should not be used to decide whether to reconcile.
931 0 : pub(crate) fn stably_attached(&self) -> Option<NodeId> {
932 0 : if let Some(attach_intent) = self.intent.attached {
933 0 : match self.observed.locations.get(&attach_intent) {
934 0 : Some(loc) => match &loc.conf {
935 0 : Some(conf) => match conf.mode {
936 : LocationConfigMode::AttachedMulti
937 : | LocationConfigMode::AttachedSingle
938 : | LocationConfigMode::AttachedStale => {
939 : // Our intent and observed state agree that this node is in an attached state.
940 0 : Some(attach_intent)
941 : }
942 : // Our observed config is not an attached state
943 0 : _ => None,
944 : },
945 : // Our observed state is None, i.e. in flux
946 0 : None => None,
947 : },
948 : // We have no observed state for this node
949 0 : None => None,
950 : }
951 : } else {
952 : // Our intent is not to attach
953 0 : None
954 : }
955 0 : }
956 :
957 0 : fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
958 0 : let mut dirty_nodes = HashSet::new();
959 :
960 0 : if let Some(node_id) = self.intent.attached {
961 : // Maybe panic: it is a severe bug if we try to attach while generation is null.
962 0 : let generation = self
963 0 : .generation
964 0 : .expect("Attempted to enter attached state without a generation");
965 0 :
966 0 : let wanted_conf =
967 0 : attached_location_conf(generation, &self.shard, &self.config, &self.policy);
968 0 : match self.observed.locations.get(&node_id) {
969 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
970 0 : Some(_) | None => {
971 0 : dirty_nodes.insert(node_id);
972 0 : }
973 : }
974 0 : }
975 :
976 0 : for node_id in &self.intent.secondary {
977 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
978 0 : match self.observed.locations.get(node_id) {
979 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
980 0 : Some(_) | None => {
981 0 : dirty_nodes.insert(*node_id);
982 0 : }
983 : }
984 : }
985 :
986 0 : for node_id in self.observed.locations.keys() {
987 0 : if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
988 0 : // We have observed state that isn't part of our intent: need to clean it up.
989 0 : dirty_nodes.insert(*node_id);
990 0 : }
991 : }
992 :
993 0 : dirty_nodes.retain(|node_id| {
994 0 : nodes
995 0 : .get(node_id)
996 0 : .map(|n| n.is_available())
997 0 : .unwrap_or(false)
998 0 : });
999 0 :
1000 0 : !dirty_nodes.is_empty()
1001 0 : }
1002 :
1003 : #[allow(clippy::too_many_arguments)]
1004 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1005 : pub(crate) fn get_reconcile_needed(
1006 : &mut self,
1007 : pageservers: &Arc<HashMap<NodeId, Node>>,
1008 : ) -> ReconcileNeeded {
1009 : // If there are any ambiguous observed states, and the nodes they refer to are available,
1010 : // we should reconcile to clean them up.
1011 : let mut dirty_observed = false;
1012 : for (node_id, observed_loc) in &self.observed.locations {
1013 : let node = pageservers
1014 : .get(node_id)
1015 : .expect("Nodes may not be removed while referenced");
1016 : if observed_loc.conf.is_none() && node.is_available() {
1017 : dirty_observed = true;
1018 : break;
1019 : }
1020 : }
1021 :
1022 : let active_nodes_dirty = self.dirty(pageservers);
1023 :
1024 : // Even if there is no pageserver work to be done, if we have a pending notification to computes,
1025 : // wake up a reconciler to send it.
1026 : let do_reconcile =
1027 : active_nodes_dirty || dirty_observed || self.pending_compute_notification;
1028 :
1029 : if !do_reconcile {
1030 : tracing::debug!("Not dirty, no reconciliation needed.");
1031 : return ReconcileNeeded::No;
1032 : }
1033 :
1034 : // If we are currently splitting, then never start a reconciler task: the splitting logic
1035 : // requires that shards are not interfered with while it runs. Do this check here rather than
1036 : // up top, so that we only log this message if we would otherwise have done a reconciliation.
1037 : if !matches!(self.splitting, SplitState::Idle) {
1038 : tracing::info!("Refusing to reconcile, splitting in progress");
1039 : return ReconcileNeeded::No;
1040 : }
1041 :
1042 : // Reconcile already in flight for the current sequence?
1043 : if let Some(handle) = &self.reconciler {
1044 : if handle.sequence == self.sequence {
1045 : tracing::info!(
1046 : "Reconciliation already in progress for sequence {:?}",
1047 : self.sequence,
1048 : );
1049 : return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
1050 : tenant_shard_id: self.tenant_shard_id,
1051 : seq_wait: self.waiter.clone(),
1052 : error_seq_wait: self.error_waiter.clone(),
1053 : error: self.last_error.clone(),
1054 : seq: self.sequence,
1055 : });
1056 : }
1057 : }
1058 :
1059 : // Pre-checks done: finally check whether we may actually do the work
1060 : match self.scheduling_policy {
1061 : ShardSchedulingPolicy::Active
1062 : | ShardSchedulingPolicy::Essential
1063 : | ShardSchedulingPolicy::Pause => {}
1064 : ShardSchedulingPolicy::Stop => {
1065 : // We only reach this point if there is work to do and we're going to skip
1066 : // doing it: warn it obvious why this tenant isn't doing what it ought to.
1067 : tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
1068 : return ReconcileNeeded::No;
1069 : }
1070 : }
1071 :
1072 : ReconcileNeeded::Yes
1073 : }
1074 :
1075 : /// Ensure the sequence number is set to a value where waiting for this value will make us wait
1076 : /// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
1077 : ///
1078 : /// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
1079 : /// that the waiter will not complete until some future Reconciler is constructed and run.
1080 0 : fn ensure_sequence_ahead(&mut self) {
1081 0 : // Find the highest sequence for which a Reconciler has previously run or is currently
1082 0 : // running
1083 0 : let max_seen = std::cmp::max(
1084 0 : self.reconciler
1085 0 : .as_ref()
1086 0 : .map(|r| r.sequence)
1087 0 : .unwrap_or(Sequence(0)),
1088 0 : std::cmp::max(self.waiter.load(), self.error_waiter.load()),
1089 0 : );
1090 0 :
1091 0 : if self.sequence <= max_seen {
1092 0 : self.sequence = max_seen.next();
1093 0 : }
1094 0 : }
1095 :
1096 : /// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
1097 : ///
1098 : /// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
1099 : /// you would like to wait on the next reconciler that gets spawned in the background.
1100 0 : pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
1101 0 : self.ensure_sequence_ahead();
1102 0 :
1103 0 : ReconcilerWaiter {
1104 0 : tenant_shard_id: self.tenant_shard_id,
1105 0 : seq_wait: self.waiter.clone(),
1106 0 : error_seq_wait: self.error_waiter.clone(),
1107 0 : error: self.last_error.clone(),
1108 0 : seq: self.sequence,
1109 0 : }
1110 0 : }
1111 :
1112 0 : async fn reconcile(
1113 0 : sequence: Sequence,
1114 0 : mut reconciler: Reconciler,
1115 0 : must_notify: bool,
1116 0 : ) -> ReconcileResult {
1117 : // Attempt to make observed state match intent state
1118 0 : let result = reconciler.reconcile().await;
1119 :
1120 : // If we know we had a pending compute notification from some previous action, send a notification irrespective
1121 : // of whether the above reconcile() did any work
1122 0 : if result.is_ok() && must_notify {
1123 : // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
1124 0 : reconciler.compute_notify().await.ok();
1125 0 : }
1126 :
1127 : // Update result counter
1128 0 : let outcome_label = match &result {
1129 0 : Ok(_) => ReconcileOutcome::Success,
1130 0 : Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
1131 0 : Err(_) => ReconcileOutcome::Error,
1132 : };
1133 :
1134 0 : metrics::METRICS_REGISTRY
1135 0 : .metrics_group
1136 0 : .storage_controller_reconcile_complete
1137 0 : .inc(ReconcileCompleteLabelGroup {
1138 0 : status: outcome_label,
1139 0 : });
1140 0 :
1141 0 : // Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
1142 0 : // try and schedule more work in response to our result.
1143 0 : ReconcileResult {
1144 0 : sequence,
1145 0 : result,
1146 0 : tenant_shard_id: reconciler.tenant_shard_id,
1147 0 : generation: reconciler.generation,
1148 0 : observed_deltas: reconciler.observed_deltas(),
1149 0 : pending_compute_notification: reconciler.compute_notify_failure,
1150 0 : }
1151 0 : }
1152 :
1153 : #[allow(clippy::too_many_arguments)]
1154 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1155 : pub(crate) fn spawn_reconciler(
1156 : &mut self,
1157 : result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
1158 : pageservers: &Arc<HashMap<NodeId, Node>>,
1159 : compute_hook: &Arc<ComputeHook>,
1160 : reconciler_config: ReconcilerConfig,
1161 : service_config: &service::Config,
1162 : persistence: &Arc<Persistence>,
1163 : units: ReconcileUnits,
1164 : gate_guard: GateGuard,
1165 : cancel: &CancellationToken,
1166 : ) -> Option<ReconcilerWaiter> {
1167 : // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
1168 : // doing our sequence's work.
1169 : let old_handle = self.reconciler.take();
1170 :
1171 : // Build list of nodes from which the reconciler should detach
1172 : let mut detach = Vec::new();
1173 : for node_id in self.observed.locations.keys() {
1174 : if self.intent.get_attached() != &Some(*node_id)
1175 : && !self.intent.secondary.contains(node_id)
1176 : {
1177 : detach.push(
1178 : pageservers
1179 : .get(node_id)
1180 : .expect("Intent references non-existent pageserver")
1181 : .clone(),
1182 : )
1183 : }
1184 : }
1185 :
1186 : // Advance the sequence before spawning a reconciler, so that sequence waiters
1187 : // can distinguish between before+after the reconcile completes.
1188 : self.ensure_sequence_ahead();
1189 :
1190 : let reconciler_cancel = cancel.child_token();
1191 : let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
1192 : let reconciler = Reconciler {
1193 : tenant_shard_id: self.tenant_shard_id,
1194 : shard: self.shard,
1195 : placement_policy: self.policy.clone(),
1196 : generation: self.generation,
1197 : intent: reconciler_intent,
1198 : detach,
1199 : reconciler_config,
1200 : config: self.config.clone(),
1201 : preferred_az: self.preferred_az_id.clone(),
1202 : observed: self.observed.clone(),
1203 : original_observed: self.observed.clone(),
1204 : compute_hook: compute_hook.clone(),
1205 : service_config: service_config.clone(),
1206 : _gate_guard: gate_guard,
1207 : _resource_units: units,
1208 : cancel: reconciler_cancel.clone(),
1209 : persistence: persistence.clone(),
1210 : compute_notify_failure: false,
1211 : };
1212 :
1213 : let reconcile_seq = self.sequence;
1214 : let long_reconcile_threshold = service_config.long_reconcile_threshold;
1215 :
1216 : tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
1217 : let must_notify = self.pending_compute_notification;
1218 : let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
1219 : tenant_id=%reconciler.tenant_shard_id.tenant_id,
1220 : shard_id=%reconciler.tenant_shard_id.shard_slug());
1221 : metrics::METRICS_REGISTRY
1222 : .metrics_group
1223 : .storage_controller_reconcile_spawn
1224 : .inc();
1225 : let result_tx = result_tx.clone();
1226 : let join_handle = tokio::task::spawn(
1227 0 : async move {
1228 : // Wait for any previous reconcile task to complete before we start
1229 0 : if let Some(old_handle) = old_handle {
1230 0 : old_handle.cancel.cancel();
1231 0 : if let Err(e) = old_handle.handle.await {
1232 : // We can't do much with this other than log it: the task is done, so
1233 : // we may proceed with our work.
1234 0 : tracing::error!("Unexpected join error waiting for reconcile task: {e}");
1235 0 : }
1236 0 : }
1237 :
1238 : // Early check for cancellation before doing any work
1239 : // TODO: wrap all remote API operations in cancellation check
1240 : // as well.
1241 0 : if reconciler.cancel.is_cancelled() {
1242 0 : metrics::METRICS_REGISTRY
1243 0 : .metrics_group
1244 0 : .storage_controller_reconcile_complete
1245 0 : .inc(ReconcileCompleteLabelGroup {
1246 0 : status: ReconcileOutcome::Cancel,
1247 0 : });
1248 0 : return;
1249 0 : }
1250 0 :
1251 0 : let (tenant_id_label, shard_number_label, sequence_label) = {
1252 0 : (
1253 0 : reconciler.tenant_shard_id.tenant_id.to_string(),
1254 0 : reconciler.tenant_shard_id.shard_number.0.to_string(),
1255 0 : reconcile_seq.to_string(),
1256 0 : )
1257 0 : };
1258 0 :
1259 0 : let label_group = ReconcileLongRunningLabelGroup {
1260 0 : tenant_id: &tenant_id_label,
1261 0 : shard_number: &shard_number_label,
1262 0 : sequence: &sequence_label,
1263 0 : };
1264 0 :
1265 0 : let reconcile_fut = Self::reconcile(reconcile_seq, reconciler, must_notify);
1266 0 : let long_reconcile_fut = {
1267 0 : let label_group = label_group.clone();
1268 0 : async move {
1269 0 : tokio::time::sleep(long_reconcile_threshold).await;
1270 :
1271 0 : tracing::warn!("Reconcile passed the long running threshold of {long_reconcile_threshold:?}");
1272 :
1273 0 : metrics::METRICS_REGISTRY
1274 0 : .metrics_group
1275 0 : .storage_controller_reconcile_long_running
1276 0 : .inc(label_group);
1277 0 : }
1278 : };
1279 :
1280 0 : let reconcile_fut = std::pin::pin!(reconcile_fut);
1281 0 : let long_reconcile_fut = std::pin::pin!(long_reconcile_fut);
1282 :
1283 0 : let (was_long, result) =
1284 0 : match future::select(reconcile_fut, long_reconcile_fut).await {
1285 0 : Either::Left((reconcile_result, _)) => (false, reconcile_result),
1286 0 : Either::Right((_, reconcile_fut)) => (true, reconcile_fut.await),
1287 : };
1288 :
1289 0 : if was_long {
1290 0 : let id = metrics::METRICS_REGISTRY
1291 0 : .metrics_group
1292 0 : .storage_controller_reconcile_long_running
1293 0 : .with_labels(label_group);
1294 0 : metrics::METRICS_REGISTRY
1295 0 : .metrics_group
1296 0 : .storage_controller_reconcile_long_running
1297 0 : .remove_metric(id);
1298 0 : }
1299 :
1300 0 : result_tx
1301 0 : .send(ReconcileResultRequest::ReconcileResult(result))
1302 0 : .ok();
1303 0 : }
1304 : .instrument(reconciler_span),
1305 : );
1306 :
1307 : self.reconciler = Some(ReconcilerHandle {
1308 : sequence: self.sequence,
1309 : handle: join_handle,
1310 : cancel: reconciler_cancel,
1311 : });
1312 :
1313 : Some(ReconcilerWaiter {
1314 : tenant_shard_id: self.tenant_shard_id,
1315 : seq_wait: self.waiter.clone(),
1316 : error_seq_wait: self.error_waiter.clone(),
1317 : error: self.last_error.clone(),
1318 : seq: self.sequence,
1319 : })
1320 : }
1321 :
1322 0 : pub(crate) fn cancel_reconciler(&self) {
1323 0 : if let Some(handle) = self.reconciler.as_ref() {
1324 0 : handle.cancel.cancel()
1325 0 : }
1326 0 : }
1327 :
1328 : /// Get a waiter for any reconciliation in flight, but do not start reconciliation
1329 : /// if it is not already running
1330 0 : pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
1331 0 : if self.reconciler.is_some() {
1332 0 : Some(ReconcilerWaiter {
1333 0 : tenant_shard_id: self.tenant_shard_id,
1334 0 : seq_wait: self.waiter.clone(),
1335 0 : error_seq_wait: self.error_waiter.clone(),
1336 0 : error: self.last_error.clone(),
1337 0 : seq: self.sequence,
1338 0 : })
1339 : } else {
1340 0 : None
1341 : }
1342 0 : }
1343 :
1344 : /// Called when a ReconcileResult has been emitted and the service is updating
1345 : /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
1346 : /// the handle to indicate there is no longer a reconciliation in progress.
1347 0 : pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
1348 0 : if let Some(reconcile_handle) = &self.reconciler {
1349 0 : if reconcile_handle.sequence <= sequence {
1350 0 : self.reconciler = None;
1351 0 : }
1352 0 : }
1353 0 : }
1354 :
1355 : /// If we had any state at all referring to this node ID, drop it. Does not
1356 : /// attempt to reschedule.
1357 : ///
1358 : /// Returns true if we modified the node's intent state.
1359 0 : pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
1360 0 : let mut intent_modified = false;
1361 0 :
1362 0 : // Drop if this node was our attached intent
1363 0 : if self.intent.attached == Some(node_id) {
1364 0 : self.intent.attached = None;
1365 0 : intent_modified = true;
1366 0 : }
1367 :
1368 : // Drop from the list of secondaries, and check if we modified it
1369 0 : let had_secondaries = self.intent.secondary.len();
1370 0 : self.intent.secondary.retain(|n| n != &node_id);
1371 0 : intent_modified |= self.intent.secondary.len() != had_secondaries;
1372 0 :
1373 0 : debug_assert!(!self.intent.all_pageservers().contains(&node_id));
1374 :
1375 0 : intent_modified
1376 0 : }
1377 :
1378 0 : pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
1379 0 : self.scheduling_policy = p;
1380 0 : }
1381 :
1382 0 : pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
1383 0 : &self.scheduling_policy
1384 0 : }
1385 :
1386 0 : pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
1387 0 : // Ordering: always set last_error before advancing sequence, so that sequence
1388 0 : // waiters are guaranteed to see a Some value when they see an error.
1389 0 : *(self.last_error.lock().unwrap()) = Some(Arc::new(error));
1390 0 : self.error_waiter.advance(sequence);
1391 0 : }
1392 :
1393 0 : pub(crate) fn from_persistent(
1394 0 : tsp: TenantShardPersistence,
1395 0 : intent: IntentState,
1396 0 : ) -> anyhow::Result<Self> {
1397 0 : let tenant_shard_id = tsp.get_tenant_shard_id()?;
1398 0 : let shard_identity = tsp.get_shard_identity()?;
1399 :
1400 0 : metrics::METRICS_REGISTRY
1401 0 : .metrics_group
1402 0 : .storage_controller_tenant_shards
1403 0 : .inc();
1404 0 :
1405 0 : Ok(Self {
1406 0 : tenant_shard_id,
1407 0 : shard: shard_identity,
1408 0 : sequence: Sequence::initial(),
1409 0 : generation: tsp.generation.map(|g| Generation::new(g as u32)),
1410 0 : policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
1411 0 : intent,
1412 0 : observed: ObservedState::new(),
1413 0 : config: serde_json::from_str(&tsp.config).unwrap(),
1414 0 : reconciler: None,
1415 0 : splitting: tsp.splitting,
1416 0 : waiter: Arc::new(SeqWait::new(Sequence::initial())),
1417 0 : error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
1418 0 : last_error: Arc::default(),
1419 0 : pending_compute_notification: false,
1420 0 : delayed_reconcile: false,
1421 0 : scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
1422 0 : preferred_az_id: tsp.preferred_az_id.map(AvailabilityZone),
1423 0 : })
1424 0 : }
1425 :
1426 0 : pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
1427 0 : TenantShardPersistence {
1428 0 : tenant_id: self.tenant_shard_id.tenant_id.to_string(),
1429 0 : shard_number: self.tenant_shard_id.shard_number.0 as i32,
1430 0 : shard_count: self.tenant_shard_id.shard_count.literal() as i32,
1431 0 : shard_stripe_size: self.shard.stripe_size.0 as i32,
1432 0 : generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
1433 0 : generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
1434 0 : placement_policy: serde_json::to_string(&self.policy).unwrap(),
1435 0 : config: serde_json::to_string(&self.config).unwrap(),
1436 0 : splitting: SplitState::default(),
1437 0 : scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
1438 0 : preferred_az_id: self.preferred_az_id.as_ref().map(|az| az.0.clone()),
1439 0 : }
1440 0 : }
1441 :
1442 12500 : pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> {
1443 12500 : self.preferred_az_id.as_ref()
1444 12500 : }
1445 :
1446 0 : pub(crate) fn set_preferred_az(&mut self, preferred_az_id: AvailabilityZone) {
1447 0 : self.preferred_az_id = Some(preferred_az_id);
1448 0 : }
1449 :
1450 : /// Returns all the nodes to which this tenant shard is attached according to the
1451 : /// observed state and the generations. Return vector is sorted from latest generation
1452 : /// to earliest.
1453 0 : pub(crate) fn attached_locations(&self) -> Vec<(NodeId, Generation)> {
1454 0 : self.observed
1455 0 : .locations
1456 0 : .iter()
1457 0 : .filter_map(|(node_id, observed)| {
1458 : use LocationConfigMode::{AttachedMulti, AttachedSingle, AttachedStale};
1459 :
1460 0 : let conf = observed.conf.as_ref()?;
1461 :
1462 0 : match (conf.generation, conf.mode) {
1463 0 : (Some(gen), AttachedMulti | AttachedSingle | AttachedStale) => {
1464 0 : Some((*node_id, gen))
1465 : }
1466 0 : _ => None,
1467 : }
1468 0 : })
1469 0 : .sorted_by(|(_lhs_node_id, lhs_gen), (_rhs_node_id, rhs_gen)| {
1470 0 : lhs_gen.cmp(rhs_gen).reverse()
1471 0 : })
1472 0 : .map(|(node_id, gen)| (node_id, Generation::new(gen)))
1473 0 : .collect()
1474 0 : }
1475 :
1476 : /// Update the observed state of the tenant by applying incremental deltas
1477 : ///
1478 : /// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
1479 : /// They are then filtered in [`crate::service::Service::process_result`].
1480 0 : pub(crate) fn apply_observed_deltas(
1481 0 : &mut self,
1482 0 : deltas: impl Iterator<Item = ObservedStateDelta>,
1483 0 : ) {
1484 0 : for delta in deltas {
1485 0 : match delta {
1486 0 : ObservedStateDelta::Upsert(ups) => {
1487 0 : let (node_id, loc) = *ups;
1488 0 :
1489 0 : // If the generation of the observed location in the delta is lagging
1490 0 : // behind the current one, then we have a race condition and cannot
1491 0 : // be certain about the true observed state. Set the observed state
1492 0 : // to None in order to reflect this.
1493 0 : let crnt_gen = self
1494 0 : .observed
1495 0 : .locations
1496 0 : .get(&node_id)
1497 0 : .and_then(|loc| loc.conf.as_ref())
1498 0 : .and_then(|conf| conf.generation);
1499 0 : let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
1500 0 : match (crnt_gen, new_gen) {
1501 0 : (Some(crnt), Some(new)) if crnt_gen > new_gen => {
1502 0 : tracing::warn!(
1503 0 : "Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
1504 : node_id, loc, crnt, new
1505 : );
1506 :
1507 0 : self.observed
1508 0 : .locations
1509 0 : .insert(node_id, ObservedStateLocation { conf: None });
1510 0 :
1511 0 : continue;
1512 : }
1513 0 : _ => {}
1514 : }
1515 :
1516 0 : if let Some(conf) = &loc.conf {
1517 0 : tracing::info!("Updating observed location {}: {:?}", node_id, conf);
1518 : } else {
1519 0 : tracing::info!("Setting observed location {} to None", node_id,)
1520 : }
1521 :
1522 0 : self.observed.locations.insert(node_id, loc);
1523 : }
1524 0 : ObservedStateDelta::Delete(node_id) => {
1525 0 : tracing::info!("Deleting observed location {}", node_id);
1526 0 : self.observed.locations.remove(&node_id);
1527 : }
1528 : }
1529 : }
1530 0 : }
1531 : }
1532 :
1533 : impl Drop for TenantShard {
1534 12525 : fn drop(&mut self) {
1535 12525 : metrics::METRICS_REGISTRY
1536 12525 : .metrics_group
1537 12525 : .storage_controller_tenant_shards
1538 12525 : .dec();
1539 12525 : }
1540 : }
1541 :
1542 : #[cfg(test)]
1543 : pub(crate) mod tests {
1544 : use std::{cell::RefCell, rc::Rc};
1545 :
1546 : use pageserver_api::{
1547 : controller_api::NodeAvailability,
1548 : shard::{ShardCount, ShardNumber},
1549 : };
1550 : use rand::{rngs::StdRng, SeedableRng};
1551 : use utils::id::TenantId;
1552 :
1553 : use crate::scheduler::test_utils::make_test_nodes;
1554 :
1555 : use super::*;
1556 :
1557 7 : fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
1558 7 : let tenant_id = TenantId::generate();
1559 7 : let shard_number = ShardNumber(0);
1560 7 : let shard_count = ShardCount::new(1);
1561 7 :
1562 7 : let tenant_shard_id = TenantShardId {
1563 7 : tenant_id,
1564 7 : shard_number,
1565 7 : shard_count,
1566 7 : };
1567 7 : TenantShard::new(
1568 7 : tenant_shard_id,
1569 7 : ShardIdentity::new(
1570 7 : shard_number,
1571 7 : shard_count,
1572 7 : pageserver_api::shard::ShardStripeSize(32768),
1573 7 : )
1574 7 : .unwrap(),
1575 7 : policy,
1576 7 : None,
1577 7 : )
1578 7 : }
1579 :
1580 5003 : pub(crate) fn make_test_tenant(
1581 5003 : policy: PlacementPolicy,
1582 5003 : shard_count: ShardCount,
1583 5003 : preferred_az: Option<AvailabilityZone>,
1584 5003 : ) -> Vec<TenantShard> {
1585 5003 : make_test_tenant_with_id(TenantId::generate(), policy, shard_count, preferred_az)
1586 5003 : }
1587 :
1588 5006 : pub(crate) fn make_test_tenant_with_id(
1589 5006 : tenant_id: TenantId,
1590 5006 : policy: PlacementPolicy,
1591 5006 : shard_count: ShardCount,
1592 5006 : preferred_az: Option<AvailabilityZone>,
1593 5006 : ) -> Vec<TenantShard> {
1594 5006 : (0..shard_count.count())
1595 12518 : .map(|i| {
1596 12518 : let shard_number = ShardNumber(i);
1597 12518 :
1598 12518 : let tenant_shard_id = TenantShardId {
1599 12518 : tenant_id,
1600 12518 : shard_number,
1601 12518 : shard_count,
1602 12518 : };
1603 12518 : TenantShard::new(
1604 12518 : tenant_shard_id,
1605 12518 : ShardIdentity::new(
1606 12518 : shard_number,
1607 12518 : shard_count,
1608 12518 : pageserver_api::shard::ShardStripeSize(32768),
1609 12518 : )
1610 12518 : .unwrap(),
1611 12518 : policy.clone(),
1612 12518 : preferred_az.clone(),
1613 12518 : )
1614 12518 : })
1615 5006 : .collect()
1616 5006 : }
1617 :
1618 : /// Test the scheduling behaviors used when a tenant configured for HA is subject
1619 : /// to nodes being marked offline.
1620 : #[test]
1621 1 : fn tenant_ha_scheduling() -> anyhow::Result<()> {
1622 1 : // Start with three nodes. Our tenant will only use two. The third one is
1623 1 : // expected to remain unused.
1624 1 : let mut nodes = make_test_nodes(3, &[]);
1625 1 :
1626 1 : let mut scheduler = Scheduler::new(nodes.values());
1627 1 : let mut context = ScheduleContext::default();
1628 1 :
1629 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1630 1 : tenant_shard
1631 1 : .schedule(&mut scheduler, &mut context)
1632 1 : .expect("we have enough nodes, scheduling should work");
1633 1 :
1634 1 : // Expect to initially be schedule on to different nodes
1635 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
1636 1 : assert!(tenant_shard.intent.attached.is_some());
1637 :
1638 1 : let attached_node_id = tenant_shard.intent.attached.unwrap();
1639 1 : let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
1640 1 : assert_ne!(attached_node_id, secondary_node_id);
1641 :
1642 : // Notifying the attached node is offline should demote it to a secondary
1643 1 : let changed = tenant_shard
1644 1 : .intent
1645 1 : .demote_attached(&mut scheduler, attached_node_id);
1646 1 : assert!(changed);
1647 1 : assert!(tenant_shard.intent.attached.is_none());
1648 1 : assert_eq!(tenant_shard.intent.secondary.len(), 2);
1649 :
1650 : // Update the scheduler state to indicate the node is offline
1651 1 : nodes
1652 1 : .get_mut(&attached_node_id)
1653 1 : .unwrap()
1654 1 : .set_availability(NodeAvailability::Offline);
1655 1 : scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
1656 1 :
1657 1 : // Scheduling the node should promote the still-available secondary node to attached
1658 1 : tenant_shard
1659 1 : .schedule(&mut scheduler, &mut context)
1660 1 : .expect("active nodes are available");
1661 1 : assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
1662 :
1663 : // The original attached node should have been retained as a secondary
1664 1 : assert_eq!(
1665 1 : *tenant_shard.intent.secondary.iter().last().unwrap(),
1666 1 : attached_node_id
1667 1 : );
1668 :
1669 1 : tenant_shard.intent.clear(&mut scheduler);
1670 1 :
1671 1 : Ok(())
1672 1 : }
1673 :
1674 : #[test]
1675 1 : fn intent_from_observed() -> anyhow::Result<()> {
1676 1 : let nodes = make_test_nodes(3, &[]);
1677 1 : let mut scheduler = Scheduler::new(nodes.values());
1678 1 :
1679 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1680 1 :
1681 1 : tenant_shard.observed.locations.insert(
1682 1 : NodeId(3),
1683 1 : ObservedStateLocation {
1684 1 : conf: Some(LocationConfig {
1685 1 : mode: LocationConfigMode::AttachedMulti,
1686 1 : generation: Some(2),
1687 1 : secondary_conf: None,
1688 1 : shard_number: tenant_shard.shard.number.0,
1689 1 : shard_count: tenant_shard.shard.count.literal(),
1690 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1691 1 : tenant_conf: TenantConfig::default(),
1692 1 : }),
1693 1 : },
1694 1 : );
1695 1 :
1696 1 : tenant_shard.observed.locations.insert(
1697 1 : NodeId(2),
1698 1 : ObservedStateLocation {
1699 1 : conf: Some(LocationConfig {
1700 1 : mode: LocationConfigMode::AttachedStale,
1701 1 : generation: Some(1),
1702 1 : secondary_conf: None,
1703 1 : shard_number: tenant_shard.shard.number.0,
1704 1 : shard_count: tenant_shard.shard.count.literal(),
1705 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1706 1 : tenant_conf: TenantConfig::default(),
1707 1 : }),
1708 1 : },
1709 1 : );
1710 1 :
1711 1 : tenant_shard.intent_from_observed(&mut scheduler);
1712 1 :
1713 1 : // The highest generationed attached location gets used as attached
1714 1 : assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
1715 : // Other locations get used as secondary
1716 1 : assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
1717 :
1718 1 : scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
1719 :
1720 1 : tenant_shard.intent.clear(&mut scheduler);
1721 1 : Ok(())
1722 1 : }
1723 :
1724 : #[test]
1725 1 : fn scheduling_mode() -> anyhow::Result<()> {
1726 1 : let nodes = make_test_nodes(3, &[]);
1727 1 : let mut scheduler = Scheduler::new(nodes.values());
1728 1 :
1729 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1730 1 :
1731 1 : // In pause mode, schedule() shouldn't do anything
1732 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
1733 1 : assert!(tenant_shard
1734 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
1735 1 : .is_ok());
1736 1 : assert!(tenant_shard.intent.all_pageservers().is_empty());
1737 :
1738 : // In active mode, schedule() works
1739 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
1740 1 : assert!(tenant_shard
1741 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
1742 1 : .is_ok());
1743 1 : assert!(!tenant_shard.intent.all_pageservers().is_empty());
1744 :
1745 1 : tenant_shard.intent.clear(&mut scheduler);
1746 1 : Ok(())
1747 1 : }
1748 :
1749 : #[test]
1750 1 : fn optimize_attachment() -> anyhow::Result<()> {
1751 1 : let nodes = make_test_nodes(3, &[]);
1752 1 : let mut scheduler = Scheduler::new(nodes.values());
1753 1 :
1754 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
1755 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
1756 1 :
1757 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
1758 1 : // on different nodes.
1759 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1760 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
1761 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1762 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
1763 1 :
1764 1 : let mut schedule_context = ScheduleContext::default();
1765 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
1766 1 : schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
1767 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
1768 1 : schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
1769 1 :
1770 1 : let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
1771 1 :
1772 1 : // Either shard should recognize that it has the option to switch to a secondary location where there
1773 1 : // would be no other shards from the same tenant, and request to do so.
1774 1 : assert_eq!(
1775 1 : optimization_a,
1776 1 : Some(ScheduleOptimization {
1777 1 : sequence: shard_a.sequence,
1778 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1779 1 : old_attached_node_id: NodeId(1),
1780 1 : new_attached_node_id: NodeId(2)
1781 1 : })
1782 1 : })
1783 1 : );
1784 :
1785 : // Note that these optimizing two shards in the same tenant with the same ScheduleContext is
1786 : // mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
1787 : // of [`Service::optimize_all`] to avoid trying
1788 : // to do optimizations for multiple shards in the same tenant at the same time. Generating
1789 : // both optimizations is just done for test purposes
1790 1 : let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
1791 1 : assert_eq!(
1792 1 : optimization_b,
1793 1 : Some(ScheduleOptimization {
1794 1 : sequence: shard_b.sequence,
1795 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1796 1 : old_attached_node_id: NodeId(1),
1797 1 : new_attached_node_id: NodeId(3)
1798 1 : })
1799 1 : })
1800 1 : );
1801 :
1802 : // Applying these optimizations should result in the end state proposed
1803 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
1804 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
1805 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
1806 1 : shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
1807 1 : assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
1808 1 : assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
1809 :
1810 1 : shard_a.intent.clear(&mut scheduler);
1811 1 : shard_b.intent.clear(&mut scheduler);
1812 1 :
1813 1 : Ok(())
1814 1 : }
1815 :
1816 : #[test]
1817 1 : fn optimize_secondary() -> anyhow::Result<()> {
1818 1 : let nodes = make_test_nodes(4, &[]);
1819 1 : let mut scheduler = Scheduler::new(nodes.values());
1820 1 :
1821 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
1822 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
1823 1 :
1824 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
1825 1 : // on different nodes.
1826 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1827 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
1828 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
1829 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
1830 1 :
1831 1 : let mut schedule_context = ScheduleContext::default();
1832 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
1833 1 : schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
1834 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
1835 1 : schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
1836 1 :
1837 1 : let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);
1838 1 :
1839 1 : // Since there is a node with no locations available, the node with two locations for the
1840 1 : // same tenant should generate an optimization to move one away
1841 1 : assert_eq!(
1842 1 : optimization_a,
1843 1 : Some(ScheduleOptimization {
1844 1 : sequence: shard_a.sequence,
1845 1 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
1846 1 : old_node_id: NodeId(3),
1847 1 : new_node_id: NodeId(4)
1848 1 : })
1849 1 : })
1850 1 : );
1851 :
1852 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
1853 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
1854 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
1855 :
1856 1 : shard_a.intent.clear(&mut scheduler);
1857 1 : shard_b.intent.clear(&mut scheduler);
1858 1 :
1859 1 : Ok(())
1860 1 : }
1861 :
1862 : // Optimize til quiescent: this emulates what Service::optimize_all does, when
1863 : // called repeatedly in the background.
1864 : // Returns the applied optimizations
1865 3 : fn optimize_til_idle(
1866 3 : nodes: &HashMap<NodeId, Node>,
1867 3 : scheduler: &mut Scheduler,
1868 3 : shards: &mut [TenantShard],
1869 3 : ) -> Vec<ScheduleOptimization> {
1870 3 : let mut loop_n = 0;
1871 3 : let mut optimizations = Vec::default();
1872 : loop {
1873 9 : let mut schedule_context = ScheduleContext::default();
1874 9 : let mut any_changed = false;
1875 :
1876 36 : for shard in shards.iter() {
1877 36 : schedule_context.avoid(&shard.intent.all_pageservers());
1878 36 : if let Some(attached) = shard.intent.get_attached() {
1879 36 : schedule_context.push_attached(*attached);
1880 36 : }
1881 : }
1882 :
1883 21 : for shard in shards.iter_mut() {
1884 21 : let optimization = shard.optimize_attachment(nodes, &schedule_context);
1885 21 : if let Some(optimization) = optimization {
1886 2 : optimizations.push(optimization.clone());
1887 2 : shard.apply_optimization(scheduler, optimization);
1888 2 : any_changed = true;
1889 2 : break;
1890 19 : }
1891 19 :
1892 19 : let optimization = shard.optimize_secondary(scheduler, &schedule_context);
1893 19 : if let Some(optimization) = optimization {
1894 4 : optimizations.push(optimization.clone());
1895 4 : shard.apply_optimization(scheduler, optimization);
1896 4 : any_changed = true;
1897 4 : break;
1898 15 : }
1899 : }
1900 :
1901 9 : if !any_changed {
1902 3 : break;
1903 6 : }
1904 6 :
1905 6 : // Assert no infinite loop
1906 6 : loop_n += 1;
1907 6 : assert!(loop_n < 1000);
1908 : }
1909 :
1910 3 : optimizations
1911 3 : }
1912 :
1913 : /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
1914 : /// that it converges.
1915 : #[test]
1916 1 : fn optimize_add_nodes() -> anyhow::Result<()> {
1917 1 : let nodes = make_test_nodes(4, &[]);
1918 1 :
1919 1 : // Only show the scheduler a couple of nodes
1920 1 : let mut scheduler = Scheduler::new([].iter());
1921 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1922 1 : scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
1923 1 :
1924 1 : let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
1925 1 : let mut schedule_context = ScheduleContext::default();
1926 5 : for shard in &mut shards {
1927 4 : assert!(shard
1928 4 : .schedule(&mut scheduler, &mut schedule_context)
1929 4 : .is_ok());
1930 : }
1931 :
1932 : // We should see equal number of locations on the two nodes.
1933 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
1934 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
1935 :
1936 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
1937 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
1938 :
1939 : // Add another two nodes: we should see the shards spread out when their optimize
1940 : // methods are called
1941 1 : scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
1942 1 : scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
1943 1 : optimize_til_idle(&nodes, &mut scheduler, &mut shards);
1944 1 :
1945 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
1946 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
1947 :
1948 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
1949 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
1950 :
1951 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
1952 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1);
1953 :
1954 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
1955 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1);
1956 :
1957 4 : for shard in shards.iter_mut() {
1958 4 : shard.intent.clear(&mut scheduler);
1959 4 : }
1960 :
1961 1 : Ok(())
1962 1 : }
1963 :
1964 : /// Test that initial shard scheduling is optimal. By optimal we mean
1965 : /// that the optimizer cannot find a way to improve it.
1966 : ///
1967 : /// This test is an example of the scheduling issue described in
1968 : /// https://github.com/neondatabase/neon/issues/8969
1969 : #[test]
1970 1 : fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
1971 : use itertools::Itertools;
1972 :
1973 1 : let nodes = make_test_nodes(2, &[]);
1974 1 :
1975 1 : let mut scheduler = Scheduler::new([].iter());
1976 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1977 1 : scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
1978 1 :
1979 1 : let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
1980 1 : let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
1981 1 :
1982 1 : let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
1983 1 : let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
1984 1 :
1985 4 : let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
1986 4 : let b_shards_with_context = b.iter_mut().map(|shard| (shard, b_context.clone()));
1987 1 :
1988 1 : let schedule_order = a_shards_with_context.interleave(b_shards_with_context);
1989 :
1990 9 : for (shard, context) in schedule_order {
1991 8 : let context = &mut *context.borrow_mut();
1992 8 : shard.schedule(&mut scheduler, context).unwrap();
1993 8 : }
1994 :
1995 1 : let applied_to_a = optimize_til_idle(&nodes, &mut scheduler, &mut a);
1996 1 : assert_eq!(applied_to_a, vec![]);
1997 :
1998 1 : let applied_to_b = optimize_til_idle(&nodes, &mut scheduler, &mut b);
1999 1 : assert_eq!(applied_to_b, vec![]);
2000 :
2001 8 : for shard in a.iter_mut().chain(b.iter_mut()) {
2002 8 : shard.intent.clear(&mut scheduler);
2003 8 : }
2004 :
2005 1 : Ok(())
2006 1 : }
2007 :
2008 : #[test]
2009 1 : fn random_az_shard_scheduling() -> anyhow::Result<()> {
2010 : use rand::seq::SliceRandom;
2011 :
2012 51 : for seed in 0..50 {
2013 50 : eprintln!("Running test with seed {seed}");
2014 50 : let mut rng = StdRng::seed_from_u64(seed);
2015 50 :
2016 50 : let az_a_tag = AvailabilityZone("az-a".to_string());
2017 50 : let az_b_tag = AvailabilityZone("az-b".to_string());
2018 50 : let azs = [az_a_tag, az_b_tag];
2019 50 : let nodes = make_test_nodes(4, &azs);
2020 50 : let mut shards_per_az: HashMap<AvailabilityZone, u32> = HashMap::new();
2021 50 :
2022 50 : let mut scheduler = Scheduler::new([].iter());
2023 200 : for node in nodes.values() {
2024 200 : scheduler.node_upsert(node);
2025 200 : }
2026 :
2027 50 : let mut shards = Vec::default();
2028 50 : let mut contexts = Vec::default();
2029 50 : let mut az_picker = azs.iter().cycle().cloned();
2030 5050 : for i in 0..100 {
2031 5000 : let az = az_picker.next().unwrap();
2032 5000 : let shard_count = i % 4 + 1;
2033 5000 : *shards_per_az.entry(az.clone()).or_default() += shard_count;
2034 5000 :
2035 5000 : let tenant_shards = make_test_tenant(
2036 5000 : PlacementPolicy::Attached(1),
2037 5000 : ShardCount::new(shard_count.try_into().unwrap()),
2038 5000 : Some(az),
2039 5000 : );
2040 5000 : let context = Rc::new(RefCell::new(ScheduleContext::default()));
2041 5000 :
2042 5000 : contexts.push(context.clone());
2043 5000 : let with_ctx = tenant_shards
2044 5000 : .into_iter()
2045 12500 : .map(|shard| (shard, context.clone()));
2046 17500 : for shard_with_ctx in with_ctx {
2047 12500 : shards.push(shard_with_ctx);
2048 12500 : }
2049 : }
2050 :
2051 50 : shards.shuffle(&mut rng);
2052 :
2053 : #[derive(Default, Debug)]
2054 : struct NodeStats {
2055 : attachments: u32,
2056 : secondaries: u32,
2057 : }
2058 :
2059 50 : let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::default();
2060 50 : let mut attachments_in_wrong_az = 0;
2061 50 : let mut secondaries_in_wrong_az = 0;
2062 :
2063 12550 : for (shard, context) in &mut shards {
2064 12500 : let context = &mut *context.borrow_mut();
2065 12500 : shard.schedule(&mut scheduler, context).unwrap();
2066 12500 :
2067 12500 : let attached_node = shard.intent.get_attached().unwrap();
2068 12500 : let stats = node_stats.entry(attached_node).or_default();
2069 12500 : stats.attachments += 1;
2070 12500 :
2071 12500 : let secondary_node = *shard.intent.get_secondary().first().unwrap();
2072 12500 : let stats = node_stats.entry(secondary_node).or_default();
2073 12500 : stats.secondaries += 1;
2074 12500 :
2075 12500 : let attached_node_az = nodes
2076 12500 : .get(&attached_node)
2077 12500 : .unwrap()
2078 12500 : .get_availability_zone_id();
2079 12500 : let secondary_node_az = nodes
2080 12500 : .get(&secondary_node)
2081 12500 : .unwrap()
2082 12500 : .get_availability_zone_id();
2083 12500 : let preferred_az = shard.preferred_az().unwrap();
2084 12500 :
2085 12500 : if attached_node_az != preferred_az {
2086 0 : eprintln!(
2087 0 : "{} attachment was scheduled in AZ {} but preferred AZ {}",
2088 0 : shard.tenant_shard_id, attached_node_az, preferred_az
2089 0 : );
2090 0 : attachments_in_wrong_az += 1;
2091 12500 : }
2092 :
2093 12500 : if secondary_node_az == preferred_az {
2094 0 : eprintln!(
2095 0 : "{} secondary was scheduled in AZ {} which matches preference",
2096 0 : shard.tenant_shard_id, attached_node_az
2097 0 : );
2098 0 : secondaries_in_wrong_az += 1;
2099 12500 : }
2100 : }
2101 :
2102 50 : let mut violations = Vec::default();
2103 50 :
2104 50 : if attachments_in_wrong_az > 0 {
2105 0 : violations.push(format!(
2106 0 : "{} attachments scheduled to the incorrect AZ",
2107 0 : attachments_in_wrong_az
2108 0 : ));
2109 50 : }
2110 :
2111 50 : if secondaries_in_wrong_az > 0 {
2112 0 : violations.push(format!(
2113 0 : "{} secondaries scheduled to the incorrect AZ",
2114 0 : secondaries_in_wrong_az
2115 0 : ));
2116 50 : }
2117 :
2118 50 : eprintln!(
2119 50 : "attachments_in_wrong_az={} secondaries_in_wrong_az={}",
2120 50 : attachments_in_wrong_az, secondaries_in_wrong_az
2121 50 : );
2122 :
2123 250 : for (node_id, stats) in &node_stats {
2124 200 : let node_az = nodes.get(node_id).unwrap().get_availability_zone_id();
2125 200 : let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2;
2126 200 : let allowed_attachment_load =
2127 200 : (ideal_attachment_load - 1)..(ideal_attachment_load + 2);
2128 200 :
2129 200 : if !allowed_attachment_load.contains(&stats.attachments) {
2130 0 : violations.push(format!(
2131 0 : "Found {} attachments on node {}, but expected {}",
2132 0 : stats.attachments, node_id, ideal_attachment_load
2133 0 : ));
2134 200 : }
2135 :
2136 200 : eprintln!(
2137 200 : "{}: attachments={} secondaries={} ideal_attachment_load={}",
2138 200 : node_id, stats.attachments, stats.secondaries, ideal_attachment_load
2139 200 : );
2140 : }
2141 :
2142 50 : assert!(violations.is_empty(), "{violations:?}");
2143 :
2144 12550 : for (mut shard, _ctx) in shards {
2145 12500 : shard.intent.clear(&mut scheduler);
2146 12500 : }
2147 : }
2148 1 : Ok(())
2149 1 : }
2150 : }
|