Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : sync::Arc,
4 : time::Duration,
5 : };
6 :
7 : use crate::{
8 : metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
9 : persistence::TenantShardPersistence,
10 : reconciler::ReconcileUnits,
11 : scheduler::{AffinityScore, MaySchedule, ScheduleContext},
12 : };
13 : use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy};
14 : use pageserver_api::{
15 : models::{LocationConfig, LocationConfigMode, TenantConfig},
16 : shard::{ShardIdentity, TenantShardId},
17 : };
18 : use serde::Serialize;
19 : use tokio::task::JoinHandle;
20 : use tokio_util::sync::CancellationToken;
21 : use tracing::{instrument, Instrument};
22 : use utils::{
23 : generation::Generation,
24 : id::NodeId,
25 : seqwait::{SeqWait, SeqWaitError},
26 : sync::gate::GateGuard,
27 : };
28 :
29 : use crate::{
30 : compute_hook::ComputeHook,
31 : node::Node,
32 : persistence::{split_state::SplitState, Persistence},
33 : reconciler::{
34 : attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
35 : },
36 : scheduler::{ScheduleError, Scheduler},
37 : service, Sequence,
38 : };
39 :
40 : /// Serialization helper
41 0 : fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
42 0 : where
43 0 : S: serde::ser::Serializer,
44 0 : T: std::fmt::Display,
45 0 : {
46 0 : serializer.collect_str(
47 0 : &v.lock()
48 0 : .unwrap()
49 0 : .as_ref()
50 0 : .map(|e| format!("{e}"))
51 0 : .unwrap_or("".to_string()),
52 0 : )
53 0 : }
54 :
55 : /// In-memory state for a particular tenant shard.
56 : ///
57 : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
58 : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
59 0 : #[derive(Serialize)]
60 : pub(crate) struct TenantShard {
61 : pub(crate) tenant_shard_id: TenantShardId,
62 :
63 : pub(crate) shard: ShardIdentity,
64 :
65 : // Runtime only: sequence used to coordinate when updating this object while
66 : // with background reconcilers may be running. A reconciler runs to a particular
67 : // sequence.
68 : pub(crate) sequence: Sequence,
69 :
70 : // Latest generation number: next time we attach, increment this
71 : // and use the incremented number when attaching.
72 : //
73 : // None represents an incompletely onboarded tenant via the [`Service::location_config`]
74 : // API, where this tenant may only run in PlacementPolicy::Secondary.
75 : pub(crate) generation: Option<Generation>,
76 :
77 : // High level description of how the tenant should be set up. Provided
78 : // externally.
79 : pub(crate) policy: PlacementPolicy,
80 :
81 : // Low level description of exactly which pageservers should fulfil
82 : // which role. Generated by `Self::schedule`.
83 : pub(crate) intent: IntentState,
84 :
85 : // Low level description of how the tenant is configured on pageservers:
86 : // if this does not match `Self::intent` then the tenant needs reconciliation
87 : // with `Self::reconcile`.
88 : pub(crate) observed: ObservedState,
89 :
90 : // Tenant configuration, passed through opaquely to the pageserver. Identical
91 : // for all shards in a tenant.
92 : pub(crate) config: TenantConfig,
93 :
94 : /// If a reconcile task is currently in flight, it may be joined here (it is
95 : /// only safe to join if either the result has been received or the reconciler's
96 : /// cancellation token has been fired)
97 : #[serde(skip)]
98 : pub(crate) reconciler: Option<ReconcilerHandle>,
99 :
100 : /// If a tenant is being split, then all shards with that TenantId will have a
101 : /// SplitState set, this acts as a guard against other operations such as background
102 : /// reconciliation, and timeline creation.
103 : pub(crate) splitting: SplitState,
104 :
105 : /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
106 : /// is set. This flag is cleared when the tenant is popped off the delay queue.
107 : pub(crate) delayed_reconcile: bool,
108 :
109 : /// Optionally wait for reconciliation to complete up to a particular
110 : /// sequence number.
111 : #[serde(skip)]
112 : pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
113 :
114 : /// Indicates sequence number for which we have encountered an error reconciling. If
115 : /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
116 : /// and callers should stop waiting for `waiter` and propagate the error.
117 : #[serde(skip)]
118 : pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
119 :
120 : /// The most recent error from a reconcile on this tenant. This is a nested Arc
121 : /// because:
122 : /// - ReconcileWaiters need to Arc-clone the overall object to read it later
123 : /// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
124 : /// many waiters for one shard, and the underlying error types are not Clone.
125 : /// TODO: generalize to an array of recent events
126 : /// TOOD: use a ArcSwap instead of mutex for faster reads?
127 : #[serde(serialize_with = "read_last_error")]
128 : pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
129 :
130 : /// If we have a pending compute notification that for some reason we weren't able to send,
131 : /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
132 : /// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
133 : /// of state that we publish externally in an eventually consistent way.
134 : pub(crate) pending_compute_notification: bool,
135 :
136 : // Support/debug tool: if something is going wrong or flapping with scheduling, this may
137 : // be set to a non-active state to avoid making changes while the issue is fixed.
138 : scheduling_policy: ShardSchedulingPolicy,
139 : }
140 :
141 : #[derive(Default, Clone, Debug, Serialize)]
142 : pub(crate) struct IntentState {
143 : attached: Option<NodeId>,
144 : secondary: Vec<NodeId>,
145 : }
146 :
147 : impl IntentState {
148 4 : pub(crate) fn new() -> Self {
149 4 : Self {
150 4 : attached: None,
151 4 : secondary: vec![],
152 4 : }
153 4 : }
154 0 : pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
155 0 : if let Some(node_id) = node_id {
156 0 : scheduler.node_inc_ref(node_id);
157 0 : }
158 0 : Self {
159 0 : attached: node_id,
160 0 : secondary: vec![],
161 0 : }
162 0 : }
163 :
164 26 : pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
165 26 : if self.attached != new_attached {
166 26 : if let Some(old_attached) = self.attached.take() {
167 0 : scheduler.node_dec_ref(old_attached);
168 26 : }
169 26 : if let Some(new_attached) = &new_attached {
170 26 : scheduler.node_inc_ref(*new_attached);
171 26 : }
172 26 : self.attached = new_attached;
173 0 : }
174 26 : }
175 :
176 : /// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
177 : /// secondary to attached while maintaining the scheduler's reference counts.
178 14 : pub(crate) fn promote_attached(
179 14 : &mut self,
180 14 : _scheduler: &mut Scheduler,
181 14 : promote_secondary: NodeId,
182 14 : ) {
183 14 : // If we call this with a node that isn't in secondary, it would cause incorrect
184 14 : // scheduler reference counting, since we assume the node is already referenced as a secondary.
185 14 : debug_assert!(self.secondary.contains(&promote_secondary));
186 :
187 : // TODO: when scheduler starts tracking attached + secondary counts separately, we will
188 : // need to call into it here.
189 28 : self.secondary.retain(|n| n != &promote_secondary);
190 14 : self.attached = Some(promote_secondary);
191 14 : }
192 :
193 34 : pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
194 34 : debug_assert!(!self.secondary.contains(&new_secondary));
195 34 : scheduler.node_inc_ref(new_secondary);
196 34 : self.secondary.push(new_secondary);
197 34 : }
198 :
199 : /// It is legal to call this with a node that is not currently a secondary: that is a no-op
200 10 : pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
201 10 : let index = self.secondary.iter().position(|n| *n == node_id);
202 10 : if let Some(index) = index {
203 10 : scheduler.node_dec_ref(node_id);
204 10 : self.secondary.remove(index);
205 10 : }
206 10 : }
207 :
208 24 : pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
209 24 : for secondary in self.secondary.drain(..) {
210 24 : scheduler.node_dec_ref(secondary);
211 24 : }
212 24 : }
213 :
214 : /// Remove the last secondary node from the list of secondaries
215 0 : pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
216 0 : if let Some(node_id) = self.secondary.pop() {
217 0 : scheduler.node_dec_ref(node_id);
218 0 : }
219 0 : }
220 :
221 24 : pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
222 24 : if let Some(old_attached) = self.attached.take() {
223 24 : scheduler.node_dec_ref(old_attached);
224 24 : }
225 :
226 24 : self.clear_secondary(scheduler);
227 24 : }
228 :
229 172 : pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
230 172 : let mut result = Vec::new();
231 172 : if let Some(p) = self.attached {
232 168 : result.push(p)
233 4 : }
234 :
235 172 : result.extend(self.secondary.iter().copied());
236 172 :
237 172 : result
238 172 : }
239 :
240 144 : pub(crate) fn get_attached(&self) -> &Option<NodeId> {
241 144 : &self.attached
242 144 : }
243 :
244 38 : pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
245 38 : &self.secondary
246 38 : }
247 :
248 : /// If the node is in use as the attached location, demote it into
249 : /// the list of secondary locations. This is used when a node goes offline,
250 : /// and we want to use a different node for attachment, but not permanently
251 : /// forget the location on the offline node.
252 : ///
253 : /// Returns true if a change was made
254 14 : pub(crate) fn demote_attached(&mut self, node_id: NodeId) -> bool {
255 14 : if self.attached == Some(node_id) {
256 : // TODO: when scheduler starts tracking attached + secondary counts separately, we will
257 : // need to call into it here.
258 14 : self.attached = None;
259 14 : self.secondary.push(node_id);
260 14 : true
261 : } else {
262 0 : false
263 : }
264 14 : }
265 : }
266 :
267 : impl Drop for IntentState {
268 26 : fn drop(&mut self) {
269 26 : // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
270 26 : // We do not check this while panicking, to avoid polluting unit test failures or
271 26 : // other assertions with this assertion's output. It's still wrong to leak these,
272 26 : // but if we already have a panic then we don't need to independently flag this case.
273 26 : if !(std::thread::panicking()) {
274 26 : debug_assert!(self.attached.is_none() && self.secondary.is_empty());
275 0 : }
276 24 : }
277 : }
278 :
279 : #[derive(Default, Clone, Serialize)]
280 : pub(crate) struct ObservedState {
281 : pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
282 : }
283 :
284 : /// Our latest knowledge of how this tenant is configured in the outside world.
285 : ///
286 : /// Meaning:
287 : /// * No instance of this type exists for a node: we are certain that we have nothing configured on that
288 : /// node for this shard.
289 : /// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
290 : /// what it is (e.g. we failed partway through configuring it)
291 : /// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
292 : /// and that configuration will still be present unless something external interfered.
293 : #[derive(Clone, Serialize)]
294 : pub(crate) struct ObservedStateLocation {
295 : /// If None, it means we do not know the status of this shard's location on this node, but
296 : /// we know that we might have some state on this node.
297 : pub(crate) conf: Option<LocationConfig>,
298 : }
299 : pub(crate) struct ReconcilerWaiter {
300 : // For observability purposes, remember the ID of the shard we're
301 : // waiting for.
302 : pub(crate) tenant_shard_id: TenantShardId,
303 :
304 : seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
305 : error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
306 : error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
307 : seq: Sequence,
308 : }
309 :
310 0 : #[derive(thiserror::Error, Debug)]
311 : pub(crate) enum ReconcileWaitError {
312 : #[error("Timeout waiting for shard {0}")]
313 : Timeout(TenantShardId),
314 : #[error("shutting down")]
315 : Shutdown,
316 : #[error("Reconcile error on shard {0}: {1}")]
317 : Failed(TenantShardId, Arc<ReconcileError>),
318 : }
319 :
320 : #[derive(Eq, PartialEq, Debug)]
321 : pub(crate) struct ReplaceSecondary {
322 : old_node_id: NodeId,
323 : new_node_id: NodeId,
324 : }
325 :
326 : #[derive(Eq, PartialEq, Debug)]
327 : pub(crate) struct MigrateAttachment {
328 : pub(crate) old_attached_node_id: NodeId,
329 : pub(crate) new_attached_node_id: NodeId,
330 : }
331 :
332 : #[derive(Eq, PartialEq, Debug)]
333 : pub(crate) enum ScheduleOptimizationAction {
334 : // Replace one of our secondary locations with a different node
335 : ReplaceSecondary(ReplaceSecondary),
336 : // Migrate attachment to an existing secondary location
337 : MigrateAttachment(MigrateAttachment),
338 : }
339 :
340 : #[derive(Eq, PartialEq, Debug)]
341 : pub(crate) struct ScheduleOptimization {
342 : // What was the reconcile sequence when we generated this optimization? The optimization
343 : // should only be applied if the shard's sequence is still at this value, in case other changes
344 : // happened between planning the optimization and applying it.
345 : sequence: Sequence,
346 :
347 : pub(crate) action: ScheduleOptimizationAction,
348 : }
349 :
350 : impl ReconcilerWaiter {
351 0 : pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
352 : tokio::select! {
353 : result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
354 0 : result.map_err(|e| match e {
355 0 : SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
356 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
357 0 : })?;
358 : },
359 : result = self.error_seq_wait.wait_for(self.seq) => {
360 0 : result.map_err(|e| match e {
361 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
362 0 : SeqWaitError::Timeout => unreachable!()
363 0 : })?;
364 :
365 : return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
366 : self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
367 : }
368 : }
369 :
370 0 : Ok(())
371 0 : }
372 : }
373 :
374 : /// Having spawned a reconciler task, the tenant shard's state will carry enough
375 : /// information to optionally cancel & await it later.
376 : pub(crate) struct ReconcilerHandle {
377 : sequence: Sequence,
378 : handle: JoinHandle<()>,
379 : cancel: CancellationToken,
380 : }
381 :
382 : pub(crate) enum ReconcileNeeded {
383 : /// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
384 : /// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
385 : No,
386 : /// shard has a reconciler running, and its intent hasn't changed since that one was
387 : /// spawned: wait for the existing reconciler rather than spawning a new one.
388 : WaitExisting(ReconcilerWaiter),
389 : /// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
390 : Yes,
391 : }
392 :
393 : /// When a reconcile task completes, it sends this result object
394 : /// to be applied to the primary TenantShard.
395 : pub(crate) struct ReconcileResult {
396 : pub(crate) sequence: Sequence,
397 : /// On errors, `observed` should be treated as an incompleted description
398 : /// of state (i.e. any nodes present in the result should override nodes
399 : /// present in the parent tenant state, but any unmentioned nodes should
400 : /// not be removed from parent tenant state)
401 : pub(crate) result: Result<(), ReconcileError>,
402 :
403 : pub(crate) tenant_shard_id: TenantShardId,
404 : pub(crate) generation: Option<Generation>,
405 : pub(crate) observed: ObservedState,
406 :
407 : /// Set [`TenantShard::pending_compute_notification`] from this flag
408 : pub(crate) pending_compute_notification: bool,
409 : }
410 :
411 : impl ObservedState {
412 0 : pub(crate) fn new() -> Self {
413 0 : Self {
414 0 : locations: HashMap::new(),
415 0 : }
416 0 : }
417 : }
418 :
419 : impl TenantShard {
420 22 : pub(crate) fn new(
421 22 : tenant_shard_id: TenantShardId,
422 22 : shard: ShardIdentity,
423 22 : policy: PlacementPolicy,
424 22 : ) -> Self {
425 22 : Self {
426 22 : tenant_shard_id,
427 22 : policy,
428 22 : intent: IntentState::default(),
429 22 : generation: Some(Generation::new(0)),
430 22 : shard,
431 22 : observed: ObservedState::default(),
432 22 : config: TenantConfig::default(),
433 22 : reconciler: None,
434 22 : splitting: SplitState::Idle,
435 22 : sequence: Sequence(1),
436 22 : delayed_reconcile: false,
437 22 : waiter: Arc::new(SeqWait::new(Sequence(0))),
438 22 : error_waiter: Arc::new(SeqWait::new(Sequence(0))),
439 22 : last_error: Arc::default(),
440 22 : pending_compute_notification: false,
441 22 : scheduling_policy: ShardSchedulingPolicy::default(),
442 22 : }
443 22 : }
444 :
445 : /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
446 : /// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
447 : /// to get an intent state that complies with placement policy. The overall goal is to do scheduling
448 : /// in a way that makes use of any configured locations that already exist in the outside world.
449 2 : pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
450 2 : // Choose an attached location by filtering observed locations, and then sorting to get the highest
451 2 : // generation
452 2 : let mut attached_locs = self
453 2 : .observed
454 2 : .locations
455 2 : .iter()
456 4 : .filter_map(|(node_id, l)| {
457 4 : if let Some(conf) = &l.conf {
458 4 : if conf.mode == LocationConfigMode::AttachedMulti
459 2 : || conf.mode == LocationConfigMode::AttachedSingle
460 2 : || conf.mode == LocationConfigMode::AttachedStale
461 : {
462 4 : Some((node_id, conf.generation))
463 : } else {
464 0 : None
465 : }
466 : } else {
467 0 : None
468 : }
469 4 : })
470 2 : .collect::<Vec<_>>();
471 2 :
472 4 : attached_locs.sort_by_key(|i| i.1);
473 2 : if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
474 2 : self.intent.set_attached(scheduler, Some(*node_id));
475 2 : }
476 :
477 : // All remaining observed locations generate secondary intents. This includes None
478 : // observations, as these may well have some local content on disk that is usable (this
479 : // is an edge case that might occur if we restarted during a migration or other change)
480 : //
481 : // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
482 : // will take care of promoting one of these secondaries to be attached.
483 4 : self.observed.locations.keys().for_each(|node_id| {
484 4 : if Some(*node_id) != self.intent.attached {
485 2 : self.intent.push_secondary(scheduler, *node_id);
486 2 : }
487 4 : });
488 2 : }
489 :
490 : /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
491 : /// attached pageserver for a shard.
492 : ///
493 : /// Returns whether we modified it, and the NodeId selected.
494 14 : fn schedule_attached(
495 14 : &mut self,
496 14 : scheduler: &mut Scheduler,
497 14 : context: &ScheduleContext,
498 14 : ) -> Result<(bool, NodeId), ScheduleError> {
499 : // No work to do if we already have an attached tenant
500 14 : if let Some(node_id) = self.intent.attached {
501 0 : return Ok((false, node_id));
502 14 : }
503 :
504 14 : if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
505 : // Promote a secondary
506 2 : tracing::debug!("Promoted secondary {} to attached", promote_secondary);
507 2 : self.intent.promote_attached(scheduler, promote_secondary);
508 2 : Ok((true, promote_secondary))
509 : } else {
510 : // Pick a fresh node: either we had no secondaries or none were schedulable
511 12 : let node_id = scheduler.schedule_shard(&self.intent.secondary, context)?;
512 12 : tracing::debug!("Selected {} as attached", node_id);
513 12 : self.intent.set_attached(scheduler, Some(node_id));
514 12 : Ok((true, node_id))
515 : }
516 14 : }
517 :
518 16 : pub(crate) fn schedule(
519 16 : &mut self,
520 16 : scheduler: &mut Scheduler,
521 16 : context: &mut ScheduleContext,
522 16 : ) -> Result<(), ScheduleError> {
523 16 : let r = self.do_schedule(scheduler, context);
524 16 :
525 16 : context.avoid(&self.intent.all_pageservers());
526 16 : if let Some(attached) = self.intent.get_attached() {
527 14 : context.push_attached(*attached);
528 14 : }
529 :
530 16 : r
531 16 : }
532 :
533 16 : pub(crate) fn do_schedule(
534 16 : &mut self,
535 16 : scheduler: &mut Scheduler,
536 16 : context: &ScheduleContext,
537 16 : ) -> Result<(), ScheduleError> {
538 16 : // TODO: before scheduling new nodes, check if any existing content in
539 16 : // self.intent refers to pageservers that are offline, and pick other
540 16 : // pageservers if so.
541 16 :
542 16 : // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
543 16 : // change their attach location.
544 16 :
545 16 : match self.scheduling_policy {
546 14 : ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
547 : ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
548 : // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
549 2 : tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
550 0 : "Scheduling is disabled by policy {:?}", self.scheduling_policy);
551 2 : return Ok(());
552 : }
553 : }
554 :
555 : // Build the set of pageservers already in use by this tenant, to avoid scheduling
556 : // more work on the same pageservers we're already using.
557 14 : let mut modified = false;
558 14 :
559 14 : // Add/remove nodes to fulfil policy
560 14 : use PlacementPolicy::*;
561 14 : match self.policy {
562 14 : Attached(secondary_count) => {
563 14 : let retain_secondaries = if self.intent.attached.is_none()
564 14 : && scheduler.node_preferred(&self.intent.secondary).is_some()
565 : {
566 : // If we have no attached, and one of the secondaries is elegible to be promoted, retain
567 : // one more secondary than we usually would, as one of them will become attached futher down this function.
568 2 : secondary_count + 1
569 : } else {
570 12 : secondary_count
571 : };
572 :
573 14 : while self.intent.secondary.len() > retain_secondaries {
574 0 : // We have no particular preference for one secondary location over another: just
575 0 : // arbitrarily drop from the end
576 0 : self.intent.pop_secondary(scheduler);
577 0 : modified = true;
578 0 : }
579 :
580 : // Should have exactly one attached, and N secondaries
581 14 : let (modified_attached, attached_node_id) =
582 14 : self.schedule_attached(scheduler, context)?;
583 14 : modified |= modified_attached;
584 14 :
585 14 : let mut used_pageservers = vec![attached_node_id];
586 26 : while self.intent.secondary.len() < secondary_count {
587 12 : let node_id = scheduler.schedule_shard(&used_pageservers, context)?;
588 12 : self.intent.push_secondary(scheduler, node_id);
589 12 : used_pageservers.push(node_id);
590 12 : modified = true;
591 : }
592 : }
593 : Secondary => {
594 0 : if let Some(node_id) = self.intent.get_attached() {
595 0 : // Populate secondary by demoting the attached node
596 0 : self.intent.demote_attached(*node_id);
597 0 : modified = true;
598 0 : } else if self.intent.secondary.is_empty() {
599 0 : // Populate secondary by scheduling a fresh node
600 0 : let node_id = scheduler.schedule_shard(&[], context)?;
601 0 : self.intent.push_secondary(scheduler, node_id);
602 0 : modified = true;
603 0 : }
604 0 : while self.intent.secondary.len() > 1 {
605 0 : // We have no particular preference for one secondary location over another: just
606 0 : // arbitrarily drop from the end
607 0 : self.intent.pop_secondary(scheduler);
608 0 : modified = true;
609 0 : }
610 : }
611 : Detached => {
612 : // Never add locations in this mode
613 0 : if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
614 0 : self.intent.clear(scheduler);
615 0 : modified = true;
616 0 : }
617 : }
618 : }
619 :
620 14 : if modified {
621 14 : self.sequence.0 += 1;
622 14 : }
623 :
624 14 : Ok(())
625 16 : }
626 :
627 : /// Optimize attachments: if a shard has a secondary location that is preferable to
628 : /// its primary location based on soft constraints, switch that secondary location
629 : /// to be attached.
630 40 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
631 : pub(crate) fn optimize_attachment(
632 : &self,
633 : nodes: &HashMap<NodeId, Node>,
634 : schedule_context: &ScheduleContext,
635 : ) -> Option<ScheduleOptimization> {
636 : let attached = (*self.intent.get_attached())?;
637 : if self.intent.secondary.is_empty() {
638 : // We can only do useful work if we have both attached and secondary locations: this
639 : // function doesn't schedule new locations, only swaps between attached and secondaries.
640 : return None;
641 : }
642 :
643 : let current_affinity_score = schedule_context.get_node_affinity(attached);
644 : let current_attachment_count = schedule_context.get_node_attachments(attached);
645 :
646 : // Generate score for each node, dropping any un-schedulable nodes.
647 : let all_pageservers = self.intent.all_pageservers();
648 : let mut scores = all_pageservers
649 : .iter()
650 80 : .flat_map(|node_id| {
651 80 : if matches!(
652 80 : nodes
653 80 : .get(node_id)
654 80 : .map(|n| n.may_schedule())
655 80 : .unwrap_or(MaySchedule::No),
656 : MaySchedule::No
657 : ) {
658 0 : None
659 : } else {
660 80 : let affinity_score = schedule_context.get_node_affinity(*node_id);
661 80 : let attachment_count = schedule_context.get_node_attachments(*node_id);
662 80 : Some((*node_id, affinity_score, attachment_count))
663 : }
664 80 : })
665 : .collect::<Vec<_>>();
666 :
667 : // Sort precedence:
668 : // 1st - prefer nodes with the lowest total affinity score
669 : // 2nd - prefer nodes with the lowest number of attachments in this context
670 : // 3rd - if all else is equal, sort by node ID for determinism in tests.
671 80 : scores.sort_by_key(|i| (i.1, i.2, i.0));
672 :
673 : if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
674 : scores.first()
675 : {
676 : if attached != *preferred_node {
677 : // The best alternative must be more than 1 better than us, otherwise we could end
678 : // up flapping back next time we're called (e.g. there's no point migrating from
679 : // a location with score 1 to a score zero, because on next location the situation
680 : // would be the same, but in reverse).
681 : if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
682 : || current_attachment_count > *preferred_attachment_count + 1
683 : {
684 : tracing::info!(
685 : "Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
686 : self.intent.get_secondary()
687 : );
688 : return Some(ScheduleOptimization {
689 : sequence: self.sequence,
690 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
691 : old_attached_node_id: attached,
692 : new_attached_node_id: *preferred_node,
693 : }),
694 : });
695 : }
696 : } else {
697 : tracing::debug!(
698 : "Node {} is already preferred (score {:?})",
699 : preferred_node,
700 : preferred_affinity_score
701 : );
702 : }
703 : }
704 :
705 : // Fall-through: we didn't find an optimization
706 : None
707 : }
708 :
709 30 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
710 : pub(crate) fn optimize_secondary(
711 : &self,
712 : scheduler: &Scheduler,
713 : schedule_context: &ScheduleContext,
714 : ) -> Option<ScheduleOptimization> {
715 : if self.intent.secondary.is_empty() {
716 : // We can only do useful work if we have both attached and secondary locations: this
717 : // function doesn't schedule new locations, only swaps between attached and secondaries.
718 : return None;
719 : }
720 :
721 : for secondary in self.intent.get_secondary() {
722 : let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
723 : // We're already on a node unaffected any affinity constraints,
724 : // so we won't change it.
725 : continue;
726 : };
727 :
728 : // Let the scheduler suggest a node, where it would put us if we were scheduling afresh
729 : // This implicitly limits the choice to nodes that are available, and prefers nodes
730 : // with lower utilization.
731 : let Ok(candidate_node) =
732 : scheduler.schedule_shard(&self.intent.all_pageservers(), schedule_context)
733 : else {
734 : // A scheduling error means we have no possible candidate replacements
735 : continue;
736 : };
737 :
738 : let candidate_affinity_score = schedule_context
739 : .nodes
740 : .get(&candidate_node)
741 : .unwrap_or(&AffinityScore::FREE);
742 :
743 : // The best alternative must be more than 1 better than us, otherwise we could end
744 : // up flapping back next time we're called.
745 : if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
746 : // If some other node is available and has a lower score than this node, then
747 : // that other node is a good place to migrate to.
748 : tracing::info!(
749 : "Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
750 : self.intent.get_secondary()
751 : );
752 : return Some(ScheduleOptimization {
753 : sequence: self.sequence,
754 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
755 : old_node_id: *secondary,
756 : new_node_id: candidate_node,
757 : }),
758 : });
759 : }
760 : }
761 :
762 : None
763 : }
764 :
765 : /// Return true if the optimization was really applied: it will not be applied if the optimization's
766 : /// sequence is behind this tenant shard's
767 22 : pub(crate) fn apply_optimization(
768 22 : &mut self,
769 22 : scheduler: &mut Scheduler,
770 22 : optimization: ScheduleOptimization,
771 22 : ) -> bool {
772 22 : if optimization.sequence != self.sequence {
773 0 : return false;
774 22 : }
775 22 :
776 22 : metrics::METRICS_REGISTRY
777 22 : .metrics_group
778 22 : .storage_controller_schedule_optimization
779 22 : .inc();
780 22 :
781 22 : match optimization.action {
782 : ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
783 12 : old_attached_node_id,
784 12 : new_attached_node_id,
785 12 : }) => {
786 12 : self.intent.demote_attached(old_attached_node_id);
787 12 : self.intent
788 12 : .promote_attached(scheduler, new_attached_node_id);
789 12 : }
790 : ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
791 10 : old_node_id,
792 10 : new_node_id,
793 10 : }) => {
794 10 : self.intent.remove_secondary(scheduler, old_node_id);
795 10 : self.intent.push_secondary(scheduler, new_node_id);
796 10 : }
797 : }
798 :
799 22 : true
800 22 : }
801 :
802 : /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
803 : /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
804 : /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
805 : ///
806 : /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
807 : /// funciton should not be used to decide whether to reconcile.
808 0 : pub(crate) fn stably_attached(&self) -> Option<NodeId> {
809 0 : if let Some(attach_intent) = self.intent.attached {
810 0 : match self.observed.locations.get(&attach_intent) {
811 0 : Some(loc) => match &loc.conf {
812 0 : Some(conf) => match conf.mode {
813 : LocationConfigMode::AttachedMulti
814 : | LocationConfigMode::AttachedSingle
815 : | LocationConfigMode::AttachedStale => {
816 : // Our intent and observed state agree that this node is in an attached state.
817 0 : Some(attach_intent)
818 : }
819 : // Our observed config is not an attached state
820 0 : _ => None,
821 : },
822 : // Our observed state is None, i.e. in flux
823 0 : None => None,
824 : },
825 : // We have no observed state for this node
826 0 : None => None,
827 : }
828 : } else {
829 : // Our intent is not to attach
830 0 : None
831 : }
832 0 : }
833 :
834 0 : fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
835 0 : let mut dirty_nodes = HashSet::new();
836 :
837 0 : if let Some(node_id) = self.intent.attached {
838 : // Maybe panic: it is a severe bug if we try to attach while generation is null.
839 0 : let generation = self
840 0 : .generation
841 0 : .expect("Attempted to enter attached state without a generation");
842 0 :
843 0 : let wanted_conf = attached_location_conf(
844 0 : generation,
845 0 : &self.shard,
846 0 : &self.config,
847 0 : !self.intent.secondary.is_empty(),
848 0 : );
849 0 : match self.observed.locations.get(&node_id) {
850 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
851 0 : Some(_) | None => {
852 0 : dirty_nodes.insert(node_id);
853 0 : }
854 : }
855 0 : }
856 :
857 0 : for node_id in &self.intent.secondary {
858 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
859 0 : match self.observed.locations.get(node_id) {
860 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
861 0 : Some(_) | None => {
862 0 : dirty_nodes.insert(*node_id);
863 0 : }
864 : }
865 : }
866 :
867 0 : for node_id in self.observed.locations.keys() {
868 0 : if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
869 0 : // We have observed state that isn't part of our intent: need to clean it up.
870 0 : dirty_nodes.insert(*node_id);
871 0 : }
872 : }
873 :
874 0 : dirty_nodes.retain(|node_id| {
875 0 : nodes
876 0 : .get(node_id)
877 0 : .map(|n| n.is_available())
878 0 : .unwrap_or(false)
879 0 : });
880 0 :
881 0 : !dirty_nodes.is_empty()
882 0 : }
883 :
884 : #[allow(clippy::too_many_arguments)]
885 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
886 : pub(crate) fn get_reconcile_needed(
887 : &mut self,
888 : pageservers: &Arc<HashMap<NodeId, Node>>,
889 : ) -> ReconcileNeeded {
890 : // If there are any ambiguous observed states, and the nodes they refer to are available,
891 : // we should reconcile to clean them up.
892 : let mut dirty_observed = false;
893 : for (node_id, observed_loc) in &self.observed.locations {
894 : let node = pageservers
895 : .get(node_id)
896 : .expect("Nodes may not be removed while referenced");
897 : if observed_loc.conf.is_none() && node.is_available() {
898 : dirty_observed = true;
899 : break;
900 : }
901 : }
902 :
903 : let active_nodes_dirty = self.dirty(pageservers);
904 :
905 : // Even if there is no pageserver work to be done, if we have a pending notification to computes,
906 : // wake up a reconciler to send it.
907 : let do_reconcile =
908 : active_nodes_dirty || dirty_observed || self.pending_compute_notification;
909 :
910 : if !do_reconcile {
911 : tracing::debug!("Not dirty, no reconciliation needed.");
912 : return ReconcileNeeded::No;
913 : }
914 :
915 : // If we are currently splitting, then never start a reconciler task: the splitting logic
916 : // requires that shards are not interfered with while it runs. Do this check here rather than
917 : // up top, so that we only log this message if we would otherwise have done a reconciliation.
918 : if !matches!(self.splitting, SplitState::Idle) {
919 : tracing::info!("Refusing to reconcile, splitting in progress");
920 : return ReconcileNeeded::No;
921 : }
922 :
923 : // Reconcile already in flight for the current sequence?
924 : if let Some(handle) = &self.reconciler {
925 : if handle.sequence == self.sequence {
926 : tracing::info!(
927 : "Reconciliation already in progress for sequence {:?}",
928 : self.sequence,
929 : );
930 : return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
931 : tenant_shard_id: self.tenant_shard_id,
932 : seq_wait: self.waiter.clone(),
933 : error_seq_wait: self.error_waiter.clone(),
934 : error: self.last_error.clone(),
935 : seq: self.sequence,
936 : });
937 : }
938 : }
939 :
940 : // Pre-checks done: finally check whether we may actually do the work
941 : match self.scheduling_policy {
942 : ShardSchedulingPolicy::Active
943 : | ShardSchedulingPolicy::Essential
944 : | ShardSchedulingPolicy::Pause => {}
945 : ShardSchedulingPolicy::Stop => {
946 : // We only reach this point if there is work to do and we're going to skip
947 : // doing it: warn it obvious why this tenant isn't doing what it ought to.
948 : tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
949 : return ReconcileNeeded::No;
950 : }
951 : }
952 :
953 : ReconcileNeeded::Yes
954 : }
955 :
956 : /// Ensure the sequence number is set to a value where waiting for this value will make us wait
957 : /// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
958 : ///
959 : /// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
960 : /// that the waiter will not complete until some future Reconciler is constructed and run.
961 0 : fn ensure_sequence_ahead(&mut self) {
962 0 : // Find the highest sequence for which a Reconciler has previously run or is currently
963 0 : // running
964 0 : let max_seen = std::cmp::max(
965 0 : self.reconciler
966 0 : .as_ref()
967 0 : .map(|r| r.sequence)
968 0 : .unwrap_or(Sequence(0)),
969 0 : std::cmp::max(self.waiter.load(), self.error_waiter.load()),
970 0 : );
971 0 :
972 0 : if self.sequence <= max_seen {
973 0 : self.sequence = max_seen.next();
974 0 : }
975 0 : }
976 :
977 : /// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
978 : ///
979 : /// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
980 : /// you would like to wait on the next reconciler that gets spawned in the background.
981 0 : pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
982 0 : self.ensure_sequence_ahead();
983 0 :
984 0 : ReconcilerWaiter {
985 0 : tenant_shard_id: self.tenant_shard_id,
986 0 : seq_wait: self.waiter.clone(),
987 0 : error_seq_wait: self.error_waiter.clone(),
988 0 : error: self.last_error.clone(),
989 0 : seq: self.sequence,
990 0 : }
991 0 : }
992 :
993 : #[allow(clippy::too_many_arguments)]
994 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
995 : pub(crate) fn spawn_reconciler(
996 : &mut self,
997 : result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
998 : pageservers: &Arc<HashMap<NodeId, Node>>,
999 : compute_hook: &Arc<ComputeHook>,
1000 : service_config: &service::Config,
1001 : persistence: &Arc<Persistence>,
1002 : units: ReconcileUnits,
1003 : gate_guard: GateGuard,
1004 : cancel: &CancellationToken,
1005 : ) -> Option<ReconcilerWaiter> {
1006 : // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
1007 : // doing our sequence's work.
1008 : let old_handle = self.reconciler.take();
1009 :
1010 : // Build list of nodes from which the reconciler should detach
1011 : let mut detach = Vec::new();
1012 : for node_id in self.observed.locations.keys() {
1013 : if self.intent.get_attached() != &Some(*node_id)
1014 : && !self.intent.secondary.contains(node_id)
1015 : {
1016 : detach.push(
1017 : pageservers
1018 : .get(node_id)
1019 : .expect("Intent references non-existent pageserver")
1020 : .clone(),
1021 : )
1022 : }
1023 : }
1024 :
1025 : // Advance the sequence before spawning a reconciler, so that sequence waiters
1026 : // can distinguish between before+after the reconcile completes.
1027 : self.ensure_sequence_ahead();
1028 :
1029 : let reconciler_cancel = cancel.child_token();
1030 : let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
1031 : let mut reconciler = Reconciler {
1032 : tenant_shard_id: self.tenant_shard_id,
1033 : shard: self.shard,
1034 : generation: self.generation,
1035 : intent: reconciler_intent,
1036 : detach,
1037 : config: self.config.clone(),
1038 : observed: self.observed.clone(),
1039 : compute_hook: compute_hook.clone(),
1040 : service_config: service_config.clone(),
1041 : _gate_guard: gate_guard,
1042 : _resource_units: units,
1043 : cancel: reconciler_cancel.clone(),
1044 : persistence: persistence.clone(),
1045 : compute_notify_failure: false,
1046 : };
1047 :
1048 : let reconcile_seq = self.sequence;
1049 :
1050 : tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
1051 : let must_notify = self.pending_compute_notification;
1052 : let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
1053 : tenant_id=%reconciler.tenant_shard_id.tenant_id,
1054 : shard_id=%reconciler.tenant_shard_id.shard_slug());
1055 : metrics::METRICS_REGISTRY
1056 : .metrics_group
1057 : .storage_controller_reconcile_spawn
1058 : .inc();
1059 : let result_tx = result_tx.clone();
1060 : let join_handle = tokio::task::spawn(
1061 0 : async move {
1062 : // Wait for any previous reconcile task to complete before we start
1063 0 : if let Some(old_handle) = old_handle {
1064 0 : old_handle.cancel.cancel();
1065 0 : if let Err(e) = old_handle.handle.await {
1066 : // We can't do much with this other than log it: the task is done, so
1067 : // we may proceed with our work.
1068 0 : tracing::error!("Unexpected join error waiting for reconcile task: {e}");
1069 0 : }
1070 0 : }
1071 :
1072 : // Early check for cancellation before doing any work
1073 : // TODO: wrap all remote API operations in cancellation check
1074 : // as well.
1075 0 : if reconciler.cancel.is_cancelled() {
1076 0 : metrics::METRICS_REGISTRY
1077 0 : .metrics_group
1078 0 : .storage_controller_reconcile_complete
1079 0 : .inc(ReconcileCompleteLabelGroup {
1080 0 : status: ReconcileOutcome::Cancel,
1081 0 : });
1082 0 : return;
1083 0 : }
1084 :
1085 : // Attempt to make observed state match intent state
1086 0 : let result = reconciler.reconcile().await;
1087 :
1088 : // If we know we had a pending compute notification from some previous action, send a notification irrespective
1089 : // of whether the above reconcile() did any work
1090 0 : if result.is_ok() && must_notify {
1091 : // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
1092 0 : reconciler.compute_notify().await.ok();
1093 0 : }
1094 :
1095 : // Update result counter
1096 0 : let outcome_label = match &result {
1097 0 : Ok(_) => ReconcileOutcome::Success,
1098 0 : Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
1099 0 : Err(_) => ReconcileOutcome::Error,
1100 : };
1101 :
1102 0 : metrics::METRICS_REGISTRY
1103 0 : .metrics_group
1104 0 : .storage_controller_reconcile_complete
1105 0 : .inc(ReconcileCompleteLabelGroup {
1106 0 : status: outcome_label,
1107 0 : });
1108 0 :
1109 0 : // Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
1110 0 : // try and schedule more work in response to our result.
1111 0 : let result = ReconcileResult {
1112 0 : sequence: reconcile_seq,
1113 0 : result,
1114 0 : tenant_shard_id: reconciler.tenant_shard_id,
1115 0 : generation: reconciler.generation,
1116 0 : observed: reconciler.observed,
1117 0 : pending_compute_notification: reconciler.compute_notify_failure,
1118 0 : };
1119 0 :
1120 0 : result_tx.send(result).ok();
1121 0 : }
1122 : .instrument(reconciler_span),
1123 : );
1124 :
1125 : self.reconciler = Some(ReconcilerHandle {
1126 : sequence: self.sequence,
1127 : handle: join_handle,
1128 : cancel: reconciler_cancel,
1129 : });
1130 :
1131 : Some(ReconcilerWaiter {
1132 : tenant_shard_id: self.tenant_shard_id,
1133 : seq_wait: self.waiter.clone(),
1134 : error_seq_wait: self.error_waiter.clone(),
1135 : error: self.last_error.clone(),
1136 : seq: self.sequence,
1137 : })
1138 : }
1139 :
1140 : /// Get a waiter for any reconciliation in flight, but do not start reconciliation
1141 : /// if it is not already running
1142 0 : pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
1143 0 : if self.reconciler.is_some() {
1144 0 : Some(ReconcilerWaiter {
1145 0 : tenant_shard_id: self.tenant_shard_id,
1146 0 : seq_wait: self.waiter.clone(),
1147 0 : error_seq_wait: self.error_waiter.clone(),
1148 0 : error: self.last_error.clone(),
1149 0 : seq: self.sequence,
1150 0 : })
1151 : } else {
1152 0 : None
1153 : }
1154 0 : }
1155 :
1156 : /// Called when a ReconcileResult has been emitted and the service is updating
1157 : /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
1158 : /// the handle to indicate there is no longer a reconciliation in progress.
1159 0 : pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
1160 0 : if let Some(reconcile_handle) = &self.reconciler {
1161 0 : if reconcile_handle.sequence <= sequence {
1162 0 : self.reconciler = None;
1163 0 : }
1164 0 : }
1165 0 : }
1166 :
1167 : // If we had any state at all referring to this node ID, drop it. Does not
1168 : // attempt to reschedule.
1169 0 : pub(crate) fn deref_node(&mut self, node_id: NodeId) {
1170 0 : if self.intent.attached == Some(node_id) {
1171 0 : self.intent.attached = None;
1172 0 : }
1173 :
1174 0 : self.intent.secondary.retain(|n| n != &node_id);
1175 0 :
1176 0 : self.observed.locations.remove(&node_id);
1177 0 :
1178 0 : debug_assert!(!self.intent.all_pageservers().contains(&node_id));
1179 0 : }
1180 :
1181 0 : pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
1182 0 : self.scheduling_policy = p;
1183 0 : }
1184 :
1185 0 : pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
1186 0 : &self.scheduling_policy
1187 0 : }
1188 :
1189 0 : pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
1190 0 : // Ordering: always set last_error before advancing sequence, so that sequence
1191 0 : // waiters are guaranteed to see a Some value when they see an error.
1192 0 : *(self.last_error.lock().unwrap()) = Some(Arc::new(error));
1193 0 : self.error_waiter.advance(sequence);
1194 0 : }
1195 :
1196 0 : pub(crate) fn from_persistent(
1197 0 : tsp: TenantShardPersistence,
1198 0 : intent: IntentState,
1199 0 : ) -> anyhow::Result<Self> {
1200 0 : let tenant_shard_id = tsp.get_tenant_shard_id()?;
1201 0 : let shard_identity = tsp.get_shard_identity()?;
1202 :
1203 0 : Ok(Self {
1204 0 : tenant_shard_id,
1205 0 : shard: shard_identity,
1206 0 : sequence: Sequence::initial(),
1207 0 : generation: tsp.generation.map(|g| Generation::new(g as u32)),
1208 0 : policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
1209 0 : intent,
1210 0 : observed: ObservedState::new(),
1211 0 : config: serde_json::from_str(&tsp.config).unwrap(),
1212 0 : reconciler: None,
1213 0 : splitting: tsp.splitting,
1214 0 : waiter: Arc::new(SeqWait::new(Sequence::initial())),
1215 0 : error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
1216 0 : last_error: Arc::default(),
1217 0 : pending_compute_notification: false,
1218 0 : delayed_reconcile: false,
1219 0 : scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
1220 0 : })
1221 0 : }
1222 :
1223 0 : pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
1224 0 : TenantShardPersistence {
1225 0 : tenant_id: self.tenant_shard_id.tenant_id.to_string(),
1226 0 : shard_number: self.tenant_shard_id.shard_number.0 as i32,
1227 0 : shard_count: self.tenant_shard_id.shard_count.literal() as i32,
1228 0 : shard_stripe_size: self.shard.stripe_size.0 as i32,
1229 0 : generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
1230 0 : generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
1231 0 : placement_policy: serde_json::to_string(&self.policy).unwrap(),
1232 0 : config: serde_json::to_string(&self.config).unwrap(),
1233 0 : splitting: SplitState::default(),
1234 0 : scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
1235 0 : }
1236 0 : }
1237 : }
1238 :
1239 : #[cfg(test)]
1240 : pub(crate) mod tests {
1241 : use pageserver_api::{
1242 : controller_api::NodeAvailability,
1243 : shard::{ShardCount, ShardNumber},
1244 : };
1245 : use utils::id::TenantId;
1246 :
1247 : use crate::scheduler::test_utils::make_test_nodes;
1248 :
1249 : use super::*;
1250 :
1251 14 : fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
1252 14 : let tenant_id = TenantId::generate();
1253 14 : let shard_number = ShardNumber(0);
1254 14 : let shard_count = ShardCount::new(1);
1255 14 :
1256 14 : let tenant_shard_id = TenantShardId {
1257 14 : tenant_id,
1258 14 : shard_number,
1259 14 : shard_count,
1260 14 : };
1261 14 : TenantShard::new(
1262 14 : tenant_shard_id,
1263 14 : ShardIdentity::new(
1264 14 : shard_number,
1265 14 : shard_count,
1266 14 : pageserver_api::shard::ShardStripeSize(32768),
1267 14 : )
1268 14 : .unwrap(),
1269 14 : policy,
1270 14 : )
1271 14 : }
1272 :
1273 2 : fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec<TenantShard> {
1274 2 : let tenant_id = TenantId::generate();
1275 2 :
1276 2 : (0..shard_count.count())
1277 8 : .map(|i| {
1278 8 : let shard_number = ShardNumber(i);
1279 8 :
1280 8 : let tenant_shard_id = TenantShardId {
1281 8 : tenant_id,
1282 8 : shard_number,
1283 8 : shard_count,
1284 8 : };
1285 8 : TenantShard::new(
1286 8 : tenant_shard_id,
1287 8 : ShardIdentity::new(
1288 8 : shard_number,
1289 8 : shard_count,
1290 8 : pageserver_api::shard::ShardStripeSize(32768),
1291 8 : )
1292 8 : .unwrap(),
1293 8 : policy.clone(),
1294 8 : )
1295 8 : })
1296 2 : .collect()
1297 2 : }
1298 :
1299 : /// Test the scheduling behaviors used when a tenant configured for HA is subject
1300 : /// to nodes being marked offline.
1301 : #[test]
1302 2 : fn tenant_ha_scheduling() -> anyhow::Result<()> {
1303 2 : // Start with three nodes. Our tenant will only use two. The third one is
1304 2 : // expected to remain unused.
1305 2 : let mut nodes = make_test_nodes(3);
1306 2 :
1307 2 : let mut scheduler = Scheduler::new(nodes.values());
1308 2 : let mut context = ScheduleContext::default();
1309 2 :
1310 2 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1311 2 : tenant_shard
1312 2 : .schedule(&mut scheduler, &mut context)
1313 2 : .expect("we have enough nodes, scheduling should work");
1314 2 :
1315 2 : // Expect to initially be schedule on to different nodes
1316 2 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
1317 2 : assert!(tenant_shard.intent.attached.is_some());
1318 :
1319 2 : let attached_node_id = tenant_shard.intent.attached.unwrap();
1320 2 : let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
1321 2 : assert_ne!(attached_node_id, secondary_node_id);
1322 :
1323 : // Notifying the attached node is offline should demote it to a secondary
1324 2 : let changed = tenant_shard.intent.demote_attached(attached_node_id);
1325 2 : assert!(changed);
1326 2 : assert!(tenant_shard.intent.attached.is_none());
1327 2 : assert_eq!(tenant_shard.intent.secondary.len(), 2);
1328 :
1329 : // Update the scheduler state to indicate the node is offline
1330 2 : nodes
1331 2 : .get_mut(&attached_node_id)
1332 2 : .unwrap()
1333 2 : .set_availability(NodeAvailability::Offline);
1334 2 : scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
1335 2 :
1336 2 : // Scheduling the node should promote the still-available secondary node to attached
1337 2 : tenant_shard
1338 2 : .schedule(&mut scheduler, &mut context)
1339 2 : .expect("active nodes are available");
1340 2 : assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
1341 :
1342 : // The original attached node should have been retained as a secondary
1343 2 : assert_eq!(
1344 2 : *tenant_shard.intent.secondary.iter().last().unwrap(),
1345 2 : attached_node_id
1346 2 : );
1347 :
1348 2 : tenant_shard.intent.clear(&mut scheduler);
1349 2 :
1350 2 : Ok(())
1351 2 : }
1352 :
1353 : #[test]
1354 2 : fn intent_from_observed() -> anyhow::Result<()> {
1355 2 : let nodes = make_test_nodes(3);
1356 2 : let mut scheduler = Scheduler::new(nodes.values());
1357 2 :
1358 2 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1359 2 :
1360 2 : tenant_shard.observed.locations.insert(
1361 2 : NodeId(3),
1362 2 : ObservedStateLocation {
1363 2 : conf: Some(LocationConfig {
1364 2 : mode: LocationConfigMode::AttachedMulti,
1365 2 : generation: Some(2),
1366 2 : secondary_conf: None,
1367 2 : shard_number: tenant_shard.shard.number.0,
1368 2 : shard_count: tenant_shard.shard.count.literal(),
1369 2 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1370 2 : tenant_conf: TenantConfig::default(),
1371 2 : }),
1372 2 : },
1373 2 : );
1374 2 :
1375 2 : tenant_shard.observed.locations.insert(
1376 2 : NodeId(2),
1377 2 : ObservedStateLocation {
1378 2 : conf: Some(LocationConfig {
1379 2 : mode: LocationConfigMode::AttachedStale,
1380 2 : generation: Some(1),
1381 2 : secondary_conf: None,
1382 2 : shard_number: tenant_shard.shard.number.0,
1383 2 : shard_count: tenant_shard.shard.count.literal(),
1384 2 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1385 2 : tenant_conf: TenantConfig::default(),
1386 2 : }),
1387 2 : },
1388 2 : );
1389 2 :
1390 2 : tenant_shard.intent_from_observed(&mut scheduler);
1391 2 :
1392 2 : // The highest generationed attached location gets used as attached
1393 2 : assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
1394 : // Other locations get used as secondary
1395 2 : assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
1396 :
1397 2 : scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
1398 :
1399 2 : tenant_shard.intent.clear(&mut scheduler);
1400 2 : Ok(())
1401 2 : }
1402 :
1403 : #[test]
1404 2 : fn scheduling_mode() -> anyhow::Result<()> {
1405 2 : let nodes = make_test_nodes(3);
1406 2 : let mut scheduler = Scheduler::new(nodes.values());
1407 2 :
1408 2 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1409 2 :
1410 2 : // In pause mode, schedule() shouldn't do anything
1411 2 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
1412 2 : assert!(tenant_shard
1413 2 : .schedule(&mut scheduler, &mut ScheduleContext::default())
1414 2 : .is_ok());
1415 2 : assert!(tenant_shard.intent.all_pageservers().is_empty());
1416 :
1417 : // In active mode, schedule() works
1418 2 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
1419 2 : assert!(tenant_shard
1420 2 : .schedule(&mut scheduler, &mut ScheduleContext::default())
1421 2 : .is_ok());
1422 2 : assert!(!tenant_shard.intent.all_pageservers().is_empty());
1423 :
1424 2 : tenant_shard.intent.clear(&mut scheduler);
1425 2 : Ok(())
1426 2 : }
1427 :
1428 : #[test]
1429 2 : fn optimize_attachment() -> anyhow::Result<()> {
1430 2 : let nodes = make_test_nodes(3);
1431 2 : let mut scheduler = Scheduler::new(nodes.values());
1432 2 :
1433 2 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
1434 2 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
1435 2 :
1436 2 : // Initially: both nodes attached on shard 1, and both have secondary locations
1437 2 : // on different nodes.
1438 2 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1439 2 : shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
1440 2 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1441 2 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
1442 2 :
1443 2 : let mut schedule_context = ScheduleContext::default();
1444 2 : schedule_context.avoid(&shard_a.intent.all_pageservers());
1445 2 : schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
1446 2 : schedule_context.avoid(&shard_b.intent.all_pageservers());
1447 2 : schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
1448 2 :
1449 2 : let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
1450 2 :
1451 2 : // Either shard should recognize that it has the option to switch to a secondary location where there
1452 2 : // would be no other shards from the same tenant, and request to do so.
1453 2 : assert_eq!(
1454 2 : optimization_a,
1455 2 : Some(ScheduleOptimization {
1456 2 : sequence: shard_a.sequence,
1457 2 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1458 2 : old_attached_node_id: NodeId(1),
1459 2 : new_attached_node_id: NodeId(2)
1460 2 : })
1461 2 : })
1462 2 : );
1463 :
1464 : // Note that these optimizing two shards in the same tenant with the same ScheduleContext is
1465 : // mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
1466 : // of [`Service::optimize_all`] to avoid trying
1467 : // to do optimizations for multiple shards in the same tenant at the same time. Generating
1468 : // both optimizations is just done for test purposes
1469 2 : let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
1470 2 : assert_eq!(
1471 2 : optimization_b,
1472 2 : Some(ScheduleOptimization {
1473 2 : sequence: shard_b.sequence,
1474 2 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1475 2 : old_attached_node_id: NodeId(1),
1476 2 : new_attached_node_id: NodeId(3)
1477 2 : })
1478 2 : })
1479 2 : );
1480 :
1481 : // Applying these optimizations should result in the end state proposed
1482 2 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
1483 2 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
1484 2 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
1485 2 : shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
1486 2 : assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
1487 2 : assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
1488 :
1489 2 : shard_a.intent.clear(&mut scheduler);
1490 2 : shard_b.intent.clear(&mut scheduler);
1491 2 :
1492 2 : Ok(())
1493 2 : }
1494 :
1495 : #[test]
1496 2 : fn optimize_secondary() -> anyhow::Result<()> {
1497 2 : let nodes = make_test_nodes(4);
1498 2 : let mut scheduler = Scheduler::new(nodes.values());
1499 2 :
1500 2 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
1501 2 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
1502 2 :
1503 2 : // Initially: both nodes attached on shard 1, and both have secondary locations
1504 2 : // on different nodes.
1505 2 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1506 2 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
1507 2 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
1508 2 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
1509 2 :
1510 2 : let mut schedule_context = ScheduleContext::default();
1511 2 : schedule_context.avoid(&shard_a.intent.all_pageservers());
1512 2 : schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
1513 2 : schedule_context.avoid(&shard_b.intent.all_pageservers());
1514 2 : schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
1515 2 :
1516 2 : let optimization_a = shard_a.optimize_secondary(&scheduler, &schedule_context);
1517 2 :
1518 2 : // Since there is a node with no locations available, the node with two locations for the
1519 2 : // same tenant should generate an optimization to move one away
1520 2 : assert_eq!(
1521 2 : optimization_a,
1522 2 : Some(ScheduleOptimization {
1523 2 : sequence: shard_a.sequence,
1524 2 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
1525 2 : old_node_id: NodeId(3),
1526 2 : new_node_id: NodeId(4)
1527 2 : })
1528 2 : })
1529 2 : );
1530 :
1531 2 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
1532 2 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
1533 2 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
1534 :
1535 2 : shard_a.intent.clear(&mut scheduler);
1536 2 : shard_b.intent.clear(&mut scheduler);
1537 2 :
1538 2 : Ok(())
1539 2 : }
1540 :
1541 : // Optimize til quiescent: this emulates what Service::optimize_all does, when
1542 : // called repeatedly in the background.
1543 2 : fn optimize_til_idle(
1544 2 : nodes: &HashMap<NodeId, Node>,
1545 2 : scheduler: &mut Scheduler,
1546 2 : shards: &mut [TenantShard],
1547 2 : ) {
1548 2 : let mut loop_n = 0;
1549 : loop {
1550 18 : let mut schedule_context = ScheduleContext::default();
1551 18 : let mut any_changed = false;
1552 :
1553 72 : for shard in shards.iter() {
1554 72 : schedule_context.avoid(&shard.intent.all_pageservers());
1555 72 : if let Some(attached) = shard.intent.get_attached() {
1556 72 : schedule_context.push_attached(*attached);
1557 72 : }
1558 : }
1559 :
1560 36 : for shard in shards.iter_mut() {
1561 36 : let optimization = shard.optimize_attachment(nodes, &schedule_context);
1562 36 : if let Some(optimization) = optimization {
1563 8 : shard.apply_optimization(scheduler, optimization);
1564 8 : any_changed = true;
1565 8 : break;
1566 28 : }
1567 28 :
1568 28 : let optimization = shard.optimize_secondary(scheduler, &schedule_context);
1569 28 : if let Some(optimization) = optimization {
1570 8 : shard.apply_optimization(scheduler, optimization);
1571 8 : any_changed = true;
1572 8 : break;
1573 20 : }
1574 : }
1575 :
1576 18 : if !any_changed {
1577 2 : break;
1578 16 : }
1579 16 :
1580 16 : // Assert no infinite loop
1581 16 : loop_n += 1;
1582 16 : assert!(loop_n < 1000);
1583 : }
1584 2 : }
1585 :
1586 : /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
1587 : /// that it converges.
1588 : #[test]
1589 2 : fn optimize_add_nodes() -> anyhow::Result<()> {
1590 2 : let nodes = make_test_nodes(4);
1591 2 :
1592 2 : // Only show the scheduler a couple of nodes
1593 2 : let mut scheduler = Scheduler::new([].iter());
1594 2 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1595 2 : scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
1596 2 :
1597 2 : let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
1598 2 : let mut schedule_context = ScheduleContext::default();
1599 10 : for shard in &mut shards {
1600 8 : assert!(shard
1601 8 : .schedule(&mut scheduler, &mut schedule_context)
1602 8 : .is_ok());
1603 : }
1604 :
1605 : // We should see equal number of locations on the two nodes.
1606 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
1607 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
1608 :
1609 : // Add another two nodes: we should see the shards spread out when their optimize
1610 : // methods are called
1611 2 : scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
1612 2 : scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
1613 2 : optimize_til_idle(&nodes, &mut scheduler, &mut shards);
1614 2 :
1615 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
1616 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
1617 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
1618 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
1619 :
1620 8 : for shard in shards.iter_mut() {
1621 8 : shard.intent.clear(&mut scheduler);
1622 8 : }
1623 :
1624 2 : Ok(())
1625 2 : }
1626 : }
|