Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : sync::Arc,
4 : time::Duration,
5 : };
6 :
7 : use crate::{
8 : metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
9 : persistence::TenantShardPersistence,
10 : reconciler::{ReconcileUnits, ReconcilerConfig},
11 : scheduler::{
12 : AffinityScore, AttachedShardTag, MaySchedule, RefCountUpdate, ScheduleContext,
13 : SecondaryShardTag,
14 : },
15 : service::ReconcileResultRequest,
16 : };
17 : use pageserver_api::controller_api::{
18 : NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
19 : };
20 : use pageserver_api::{
21 : models::{LocationConfig, LocationConfigMode, TenantConfig},
22 : shard::{ShardIdentity, TenantShardId},
23 : };
24 : use serde::{Deserialize, Serialize};
25 : use tokio::task::JoinHandle;
26 : use tokio_util::sync::CancellationToken;
27 : use tracing::{instrument, Instrument};
28 : use utils::{
29 : generation::Generation,
30 : id::NodeId,
31 : seqwait::{SeqWait, SeqWaitError},
32 : sync::gate::GateGuard,
33 : };
34 :
35 : use crate::{
36 : compute_hook::ComputeHook,
37 : node::Node,
38 : persistence::{split_state::SplitState, Persistence},
39 : reconciler::{
40 : attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
41 : },
42 : scheduler::{ScheduleError, Scheduler},
43 : service, Sequence,
44 : };
45 :
46 : /// Serialization helper
47 0 : fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
48 0 : where
49 0 : S: serde::ser::Serializer,
50 0 : T: std::fmt::Display,
51 0 : {
52 0 : serializer.collect_str(
53 0 : &v.lock()
54 0 : .unwrap()
55 0 : .as_ref()
56 0 : .map(|e| format!("{e}"))
57 0 : .unwrap_or("".to_string()),
58 0 : )
59 0 : }
60 :
61 : /// In-memory state for a particular tenant shard.
62 : ///
63 : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
64 : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
65 0 : #[derive(Serialize)]
66 : pub(crate) struct TenantShard {
67 : pub(crate) tenant_shard_id: TenantShardId,
68 :
69 : pub(crate) shard: ShardIdentity,
70 :
71 : // Runtime only: sequence used to coordinate when updating this object while
72 : // with background reconcilers may be running. A reconciler runs to a particular
73 : // sequence.
74 : pub(crate) sequence: Sequence,
75 :
76 : // Latest generation number: next time we attach, increment this
77 : // and use the incremented number when attaching.
78 : //
79 : // None represents an incompletely onboarded tenant via the [`Service::location_config`]
80 : // API, where this tenant may only run in PlacementPolicy::Secondary.
81 : pub(crate) generation: Option<Generation>,
82 :
83 : // High level description of how the tenant should be set up. Provided
84 : // externally.
85 : pub(crate) policy: PlacementPolicy,
86 :
87 : // Low level description of exactly which pageservers should fulfil
88 : // which role. Generated by `Self::schedule`.
89 : pub(crate) intent: IntentState,
90 :
91 : // Low level description of how the tenant is configured on pageservers:
92 : // if this does not match `Self::intent` then the tenant needs reconciliation
93 : // with `Self::reconcile`.
94 : pub(crate) observed: ObservedState,
95 :
96 : // Tenant configuration, passed through opaquely to the pageserver. Identical
97 : // for all shards in a tenant.
98 : pub(crate) config: TenantConfig,
99 :
100 : /// If a reconcile task is currently in flight, it may be joined here (it is
101 : /// only safe to join if either the result has been received or the reconciler's
102 : /// cancellation token has been fired)
103 : #[serde(skip)]
104 : pub(crate) reconciler: Option<ReconcilerHandle>,
105 :
106 : /// If a tenant is being split, then all shards with that TenantId will have a
107 : /// SplitState set, this acts as a guard against other operations such as background
108 : /// reconciliation, and timeline creation.
109 : pub(crate) splitting: SplitState,
110 :
111 : /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
112 : /// is set. This flag is cleared when the tenant is popped off the delay queue.
113 : pub(crate) delayed_reconcile: bool,
114 :
115 : /// Optionally wait for reconciliation to complete up to a particular
116 : /// sequence number.
117 : #[serde(skip)]
118 : pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
119 :
120 : /// Indicates sequence number for which we have encountered an error reconciling. If
121 : /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
122 : /// and callers should stop waiting for `waiter` and propagate the error.
123 : #[serde(skip)]
124 : pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
125 :
126 : /// The most recent error from a reconcile on this tenant. This is a nested Arc
127 : /// because:
128 : /// - ReconcileWaiters need to Arc-clone the overall object to read it later
129 : /// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
130 : /// many waiters for one shard, and the underlying error types are not Clone.
131 : ///
132 : /// TODO: generalize to an array of recent events
133 : /// TOOD: use a ArcSwap instead of mutex for faster reads?
134 : #[serde(serialize_with = "read_last_error")]
135 : pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
136 :
137 : /// If we have a pending compute notification that for some reason we weren't able to send,
138 : /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
139 : /// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
140 : /// of state that we publish externally in an eventually consistent way.
141 : pub(crate) pending_compute_notification: bool,
142 :
143 : // Support/debug tool: if something is going wrong or flapping with scheduling, this may
144 : // be set to a non-active state to avoid making changes while the issue is fixed.
145 : scheduling_policy: ShardSchedulingPolicy,
146 :
147 : // We should attempt to schedule this shard in the provided AZ to
148 : // decrease chances of cross-AZ compute.
149 : preferred_az_id: Option<String>,
150 : }
151 :
152 : #[derive(Default, Clone, Debug, Serialize)]
153 : pub(crate) struct IntentState {
154 : attached: Option<NodeId>,
155 : secondary: Vec<NodeId>,
156 : }
157 :
158 : impl IntentState {
159 13 : pub(crate) fn new() -> Self {
160 13 : Self {
161 13 : attached: None,
162 13 : secondary: vec![],
163 13 : }
164 13 : }
165 0 : pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
166 0 : if let Some(node_id) = node_id {
167 0 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach);
168 0 : }
169 0 : Self {
170 0 : attached: node_id,
171 0 : secondary: vec![],
172 0 : }
173 0 : }
174 :
175 32 : pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
176 32 : if self.attached != new_attached {
177 32 : if let Some(old_attached) = self.attached.take() {
178 0 : scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
179 32 : }
180 32 : if let Some(new_attached) = &new_attached {
181 32 : scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach);
182 32 : }
183 32 : self.attached = new_attached;
184 0 : }
185 32 : }
186 :
187 : /// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
188 : /// secondary to attached while maintaining the scheduler's reference counts.
189 5 : pub(crate) fn promote_attached(
190 5 : &mut self,
191 5 : scheduler: &mut Scheduler,
192 5 : promote_secondary: NodeId,
193 5 : ) {
194 5 : // If we call this with a node that isn't in secondary, it would cause incorrect
195 5 : // scheduler reference counting, since we assume the node is already referenced as a secondary.
196 5 : debug_assert!(self.secondary.contains(&promote_secondary));
197 :
198 10 : self.secondary.retain(|n| n != &promote_secondary);
199 5 :
200 5 : let demoted = self.attached;
201 5 : self.attached = Some(promote_secondary);
202 5 :
203 5 : scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary);
204 5 : if let Some(demoted) = demoted {
205 0 : scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached);
206 5 : }
207 5 : }
208 :
209 25 : pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
210 25 : debug_assert!(!self.secondary.contains(&new_secondary));
211 25 : scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary);
212 25 : self.secondary.push(new_secondary);
213 25 : }
214 :
215 : /// It is legal to call this with a node that is not currently a secondary: that is a no-op
216 5 : pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
217 5 : let index = self.secondary.iter().position(|n| *n == node_id);
218 5 : if let Some(index) = index {
219 5 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
220 5 : self.secondary.remove(index);
221 5 : }
222 5 : }
223 :
224 31 : pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
225 31 : for secondary in self.secondary.drain(..) {
226 20 : scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary);
227 20 : }
228 31 : }
229 :
230 : /// Remove the last secondary node from the list of secondaries
231 0 : pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
232 0 : if let Some(node_id) = self.secondary.pop() {
233 0 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
234 0 : }
235 0 : }
236 :
237 31 : pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
238 31 : if let Some(old_attached) = self.attached.take() {
239 31 : scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
240 31 : }
241 :
242 31 : self.clear_secondary(scheduler);
243 31 : }
244 :
245 102 : pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
246 102 : let mut result = Vec::new();
247 102 : if let Some(p) = self.attached {
248 100 : result.push(p)
249 2 : }
250 :
251 102 : result.extend(self.secondary.iter().copied());
252 102 :
253 102 : result
254 102 : }
255 :
256 83 : pub(crate) fn get_attached(&self) -> &Option<NodeId> {
257 83 : &self.attached
258 83 : }
259 :
260 24 : pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
261 24 : &self.secondary
262 24 : }
263 :
264 : /// If the node is in use as the attached location, demote it into
265 : /// the list of secondary locations. This is used when a node goes offline,
266 : /// and we want to use a different node for attachment, but not permanently
267 : /// forget the location on the offline node.
268 : ///
269 : /// Returns true if a change was made
270 5 : pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
271 5 : if self.attached == Some(node_id) {
272 5 : self.attached = None;
273 5 : self.secondary.push(node_id);
274 5 : scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached);
275 5 : true
276 : } else {
277 0 : false
278 : }
279 5 : }
280 : }
281 :
282 : impl Drop for IntentState {
283 32 : fn drop(&mut self) {
284 32 : // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
285 32 : // We do not check this while panicking, to avoid polluting unit test failures or
286 32 : // other assertions with this assertion's output. It's still wrong to leak these,
287 32 : // but if we already have a panic then we don't need to independently flag this case.
288 32 : if !(std::thread::panicking()) {
289 32 : debug_assert!(self.attached.is_none() && self.secondary.is_empty());
290 0 : }
291 31 : }
292 : }
293 :
294 0 : #[derive(Default, Clone, Serialize, Deserialize, Debug)]
295 : pub(crate) struct ObservedState {
296 : pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
297 : }
298 :
299 : /// Our latest knowledge of how this tenant is configured in the outside world.
300 : ///
301 : /// Meaning:
302 : /// * No instance of this type exists for a node: we are certain that we have nothing configured on that
303 : /// node for this shard.
304 : /// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
305 : /// what it is (e.g. we failed partway through configuring it)
306 : /// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
307 : /// and that configuration will still be present unless something external interfered.
308 0 : #[derive(Clone, Serialize, Deserialize, Debug)]
309 : pub(crate) struct ObservedStateLocation {
310 : /// If None, it means we do not know the status of this shard's location on this node, but
311 : /// we know that we might have some state on this node.
312 : pub(crate) conf: Option<LocationConfig>,
313 : }
314 : pub(crate) struct ReconcilerWaiter {
315 : // For observability purposes, remember the ID of the shard we're
316 : // waiting for.
317 : pub(crate) tenant_shard_id: TenantShardId,
318 :
319 : seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
320 : error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
321 : error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
322 : seq: Sequence,
323 : }
324 :
325 : pub(crate) enum ReconcilerStatus {
326 : Done,
327 : Failed,
328 : InProgress,
329 : }
330 :
331 0 : #[derive(thiserror::Error, Debug)]
332 : pub(crate) enum ReconcileWaitError {
333 : #[error("Timeout waiting for shard {0}")]
334 : Timeout(TenantShardId),
335 : #[error("shutting down")]
336 : Shutdown,
337 : #[error("Reconcile error on shard {0}: {1}")]
338 : Failed(TenantShardId, Arc<ReconcileError>),
339 : }
340 :
341 : #[derive(Eq, PartialEq, Debug, Clone)]
342 : pub(crate) struct ReplaceSecondary {
343 : old_node_id: NodeId,
344 : new_node_id: NodeId,
345 : }
346 :
347 : #[derive(Eq, PartialEq, Debug, Clone)]
348 : pub(crate) struct MigrateAttachment {
349 : pub(crate) old_attached_node_id: NodeId,
350 : pub(crate) new_attached_node_id: NodeId,
351 : }
352 :
353 : #[derive(Eq, PartialEq, Debug, Clone)]
354 : pub(crate) enum ScheduleOptimizationAction {
355 : // Replace one of our secondary locations with a different node
356 : ReplaceSecondary(ReplaceSecondary),
357 : // Migrate attachment to an existing secondary location
358 : MigrateAttachment(MigrateAttachment),
359 : }
360 :
361 : #[derive(Eq, PartialEq, Debug, Clone)]
362 : pub(crate) struct ScheduleOptimization {
363 : // What was the reconcile sequence when we generated this optimization? The optimization
364 : // should only be applied if the shard's sequence is still at this value, in case other changes
365 : // happened between planning the optimization and applying it.
366 : sequence: Sequence,
367 :
368 : pub(crate) action: ScheduleOptimizationAction,
369 : }
370 :
371 : impl ReconcilerWaiter {
372 0 : pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
373 0 : tokio::select! {
374 0 : result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
375 0 : result.map_err(|e| match e {
376 0 : SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
377 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
378 0 : })?;
379 : },
380 0 : result = self.error_seq_wait.wait_for(self.seq) => {
381 0 : result.map_err(|e| match e {
382 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
383 0 : SeqWaitError::Timeout => unreachable!()
384 0 : })?;
385 :
386 0 : return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
387 0 : self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
388 : }
389 : }
390 :
391 0 : Ok(())
392 0 : }
393 :
394 0 : pub(crate) fn get_status(&self) -> ReconcilerStatus {
395 0 : if self.seq_wait.would_wait_for(self.seq).is_ok() {
396 0 : ReconcilerStatus::Done
397 0 : } else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
398 0 : ReconcilerStatus::Failed
399 : } else {
400 0 : ReconcilerStatus::InProgress
401 : }
402 0 : }
403 : }
404 :
405 : /// Having spawned a reconciler task, the tenant shard's state will carry enough
406 : /// information to optionally cancel & await it later.
407 : pub(crate) struct ReconcilerHandle {
408 : sequence: Sequence,
409 : handle: JoinHandle<()>,
410 : cancel: CancellationToken,
411 : }
412 :
413 : pub(crate) enum ReconcileNeeded {
414 : /// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
415 : /// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
416 : No,
417 : /// shard has a reconciler running, and its intent hasn't changed since that one was
418 : /// spawned: wait for the existing reconciler rather than spawning a new one.
419 : WaitExisting(ReconcilerWaiter),
420 : /// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
421 : Yes,
422 : }
423 :
424 : /// When a reconcile task completes, it sends this result object
425 : /// to be applied to the primary TenantShard.
426 : pub(crate) struct ReconcileResult {
427 : pub(crate) sequence: Sequence,
428 : /// On errors, `observed` should be treated as an incompleted description
429 : /// of state (i.e. any nodes present in the result should override nodes
430 : /// present in the parent tenant state, but any unmentioned nodes should
431 : /// not be removed from parent tenant state)
432 : pub(crate) result: Result<(), ReconcileError>,
433 :
434 : pub(crate) tenant_shard_id: TenantShardId,
435 : pub(crate) generation: Option<Generation>,
436 : pub(crate) observed: ObservedState,
437 :
438 : /// Set [`TenantShard::pending_compute_notification`] from this flag
439 : pub(crate) pending_compute_notification: bool,
440 : }
441 :
442 : impl ObservedState {
443 0 : pub(crate) fn new() -> Self {
444 0 : Self {
445 0 : locations: HashMap::new(),
446 0 : }
447 0 : }
448 : }
449 :
450 : impl TenantShard {
451 19 : pub(crate) fn new(
452 19 : tenant_shard_id: TenantShardId,
453 19 : shard: ShardIdentity,
454 19 : policy: PlacementPolicy,
455 19 : ) -> Self {
456 19 : Self {
457 19 : tenant_shard_id,
458 19 : policy,
459 19 : intent: IntentState::default(),
460 19 : generation: Some(Generation::new(0)),
461 19 : shard,
462 19 : observed: ObservedState::default(),
463 19 : config: TenantConfig::default(),
464 19 : reconciler: None,
465 19 : splitting: SplitState::Idle,
466 19 : sequence: Sequence(1),
467 19 : delayed_reconcile: false,
468 19 : waiter: Arc::new(SeqWait::new(Sequence(0))),
469 19 : error_waiter: Arc::new(SeqWait::new(Sequence(0))),
470 19 : last_error: Arc::default(),
471 19 : pending_compute_notification: false,
472 19 : scheduling_policy: ShardSchedulingPolicy::default(),
473 19 : preferred_az_id: None,
474 19 : }
475 19 : }
476 :
477 : /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
478 : /// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
479 : /// to get an intent state that complies with placement policy. The overall goal is to do scheduling
480 : /// in a way that makes use of any configured locations that already exist in the outside world.
481 1 : pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
482 1 : // Choose an attached location by filtering observed locations, and then sorting to get the highest
483 1 : // generation
484 1 : let mut attached_locs = self
485 1 : .observed
486 1 : .locations
487 1 : .iter()
488 2 : .filter_map(|(node_id, l)| {
489 2 : if let Some(conf) = &l.conf {
490 2 : if conf.mode == LocationConfigMode::AttachedMulti
491 1 : || conf.mode == LocationConfigMode::AttachedSingle
492 1 : || conf.mode == LocationConfigMode::AttachedStale
493 : {
494 2 : Some((node_id, conf.generation))
495 : } else {
496 0 : None
497 : }
498 : } else {
499 0 : None
500 : }
501 2 : })
502 1 : .collect::<Vec<_>>();
503 1 :
504 2 : attached_locs.sort_by_key(|i| i.1);
505 1 : if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
506 1 : self.intent.set_attached(scheduler, Some(*node_id));
507 1 : }
508 :
509 : // All remaining observed locations generate secondary intents. This includes None
510 : // observations, as these may well have some local content on disk that is usable (this
511 : // is an edge case that might occur if we restarted during a migration or other change)
512 : //
513 : // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
514 : // will take care of promoting one of these secondaries to be attached.
515 2 : self.observed.locations.keys().for_each(|node_id| {
516 2 : if Some(*node_id) != self.intent.attached {
517 1 : self.intent.push_secondary(scheduler, *node_id);
518 1 : }
519 2 : });
520 1 : }
521 :
522 : /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
523 : /// attached pageserver for a shard.
524 : ///
525 : /// Returns whether we modified it, and the NodeId selected.
526 15 : fn schedule_attached(
527 15 : &mut self,
528 15 : scheduler: &mut Scheduler,
529 15 : context: &ScheduleContext,
530 15 : ) -> Result<(bool, NodeId), ScheduleError> {
531 : // No work to do if we already have an attached tenant
532 15 : if let Some(node_id) = self.intent.attached {
533 0 : return Ok((false, node_id));
534 15 : }
535 :
536 15 : if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
537 : // Promote a secondary
538 1 : tracing::debug!("Promoted secondary {} to attached", promote_secondary);
539 1 : self.intent.promote_attached(scheduler, promote_secondary);
540 1 : Ok((true, promote_secondary))
541 : } else {
542 : // Pick a fresh node: either we had no secondaries or none were schedulable
543 14 : let node_id =
544 14 : scheduler.schedule_shard::<AttachedShardTag>(&self.intent.secondary, context)?;
545 14 : tracing::debug!("Selected {} as attached", node_id);
546 14 : self.intent.set_attached(scheduler, Some(node_id));
547 14 : Ok((true, node_id))
548 : }
549 15 : }
550 :
551 16 : pub(crate) fn schedule(
552 16 : &mut self,
553 16 : scheduler: &mut Scheduler,
554 16 : context: &mut ScheduleContext,
555 16 : ) -> Result<(), ScheduleError> {
556 16 : let r = self.do_schedule(scheduler, context);
557 16 :
558 16 : context.avoid(&self.intent.all_pageservers());
559 16 : if let Some(attached) = self.intent.get_attached() {
560 15 : context.push_attached(*attached);
561 15 : }
562 :
563 16 : r
564 16 : }
565 :
566 16 : pub(crate) fn do_schedule(
567 16 : &mut self,
568 16 : scheduler: &mut Scheduler,
569 16 : context: &ScheduleContext,
570 16 : ) -> Result<(), ScheduleError> {
571 16 : // TODO: before scheduling new nodes, check if any existing content in
572 16 : // self.intent refers to pageservers that are offline, and pick other
573 16 : // pageservers if so.
574 16 :
575 16 : // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
576 16 : // change their attach location.
577 16 :
578 16 : match self.scheduling_policy {
579 15 : ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
580 : ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
581 : // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
582 1 : tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
583 0 : "Scheduling is disabled by policy {:?}", self.scheduling_policy);
584 1 : return Ok(());
585 : }
586 : }
587 :
588 : // Build the set of pageservers already in use by this tenant, to avoid scheduling
589 : // more work on the same pageservers we're already using.
590 15 : let mut modified = false;
591 :
592 : // Add/remove nodes to fulfil policy
593 : use PlacementPolicy::*;
594 15 : match self.policy {
595 15 : Attached(secondary_count) => {
596 15 : let retain_secondaries = if self.intent.attached.is_none()
597 15 : && scheduler.node_preferred(&self.intent.secondary).is_some()
598 : {
599 : // If we have no attached, and one of the secondaries is elegible to be promoted, retain
600 : // one more secondary than we usually would, as one of them will become attached futher down this function.
601 1 : secondary_count + 1
602 : } else {
603 14 : secondary_count
604 : };
605 :
606 15 : while self.intent.secondary.len() > retain_secondaries {
607 0 : // We have no particular preference for one secondary location over another: just
608 0 : // arbitrarily drop from the end
609 0 : self.intent.pop_secondary(scheduler);
610 0 : modified = true;
611 0 : }
612 :
613 : // Should have exactly one attached, and N secondaries
614 15 : let (modified_attached, attached_node_id) =
615 15 : self.schedule_attached(scheduler, context)?;
616 15 : modified |= modified_attached;
617 15 :
618 15 : let mut used_pageservers = vec![attached_node_id];
619 29 : while self.intent.secondary.len() < secondary_count {
620 14 : let node_id = scheduler
621 14 : .schedule_shard::<SecondaryShardTag>(&used_pageservers, context)?;
622 14 : self.intent.push_secondary(scheduler, node_id);
623 14 : used_pageservers.push(node_id);
624 14 : modified = true;
625 : }
626 : }
627 : Secondary => {
628 0 : if let Some(node_id) = self.intent.get_attached() {
629 0 : // Populate secondary by demoting the attached node
630 0 : self.intent.demote_attached(scheduler, *node_id);
631 0 : modified = true;
632 0 : } else if self.intent.secondary.is_empty() {
633 : // Populate secondary by scheduling a fresh node
634 0 : let node_id = scheduler.schedule_shard::<SecondaryShardTag>(&[], context)?;
635 0 : self.intent.push_secondary(scheduler, node_id);
636 0 : modified = true;
637 0 : }
638 0 : while self.intent.secondary.len() > 1 {
639 0 : // We have no particular preference for one secondary location over another: just
640 0 : // arbitrarily drop from the end
641 0 : self.intent.pop_secondary(scheduler);
642 0 : modified = true;
643 0 : }
644 : }
645 : Detached => {
646 : // Never add locations in this mode
647 0 : if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
648 0 : self.intent.clear(scheduler);
649 0 : modified = true;
650 0 : }
651 : }
652 : }
653 :
654 15 : if modified {
655 15 : self.sequence.0 += 1;
656 15 : }
657 :
658 15 : Ok(())
659 16 : }
660 :
661 : /// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
662 : /// if the swap is not possible and leaves the intent state in its original state.
663 : ///
664 : /// Arguments:
665 : /// `attached_to`: the currently attached location matching the intent state (may be None if the
666 : /// shard is not attached)
667 : /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
668 : /// the scheduler to recommend a node
669 0 : pub(crate) fn reschedule_to_secondary(
670 0 : &mut self,
671 0 : promote_to: Option<NodeId>,
672 0 : scheduler: &mut Scheduler,
673 0 : ) -> Result<(), ScheduleError> {
674 0 : let promote_to = match promote_to {
675 0 : Some(node) => node,
676 0 : None => match scheduler.node_preferred(self.intent.get_secondary()) {
677 0 : Some(node) => node,
678 : None => {
679 0 : return Err(ScheduleError::ImpossibleConstraint);
680 : }
681 : },
682 : };
683 :
684 0 : assert!(self.intent.get_secondary().contains(&promote_to));
685 :
686 0 : if let Some(node) = self.intent.get_attached() {
687 0 : let demoted = self.intent.demote_attached(scheduler, *node);
688 0 : if !demoted {
689 0 : return Err(ScheduleError::ImpossibleConstraint);
690 0 : }
691 0 : }
692 :
693 0 : self.intent.promote_attached(scheduler, promote_to);
694 0 :
695 0 : // Increment the sequence number for the edge case where a
696 0 : // reconciler is already running to avoid waiting on the
697 0 : // current reconcile instead of spawning a new one.
698 0 : self.sequence = self.sequence.next();
699 0 :
700 0 : Ok(())
701 0 : }
702 :
703 : /// Optimize attachments: if a shard has a secondary location that is preferable to
704 : /// its primary location based on soft constraints, switch that secondary location
705 : /// to be attached.
706 23 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
707 : pub(crate) fn optimize_attachment(
708 : &self,
709 : nodes: &HashMap<NodeId, Node>,
710 : schedule_context: &ScheduleContext,
711 : ) -> Option<ScheduleOptimization> {
712 : let attached = (*self.intent.get_attached())?;
713 : if self.intent.secondary.is_empty() {
714 : // We can only do useful work if we have both attached and secondary locations: this
715 : // function doesn't schedule new locations, only swaps between attached and secondaries.
716 : return None;
717 : }
718 :
719 : let current_affinity_score = schedule_context.get_node_affinity(attached);
720 : let current_attachment_count = schedule_context.get_node_attachments(attached);
721 :
722 : // Generate score for each node, dropping any un-schedulable nodes.
723 : let all_pageservers = self.intent.all_pageservers();
724 : let mut scores = all_pageservers
725 : .iter()
726 46 : .flat_map(|node_id| {
727 46 : let node = nodes.get(node_id);
728 46 : if node.is_none() {
729 0 : None
730 46 : } else if matches!(
731 46 : node.unwrap().get_scheduling(),
732 : NodeSchedulingPolicy::Filling
733 : ) {
734 : // If the node is currently filling, don't count it as a candidate to avoid,
735 : // racing with the background fill.
736 0 : None
737 46 : } else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
738 0 : None
739 : } else {
740 46 : let affinity_score = schedule_context.get_node_affinity(*node_id);
741 46 : let attachment_count = schedule_context.get_node_attachments(*node_id);
742 46 : Some((*node_id, affinity_score, attachment_count))
743 : }
744 46 : })
745 : .collect::<Vec<_>>();
746 :
747 : // Sort precedence:
748 : // 1st - prefer nodes with the lowest total affinity score
749 : // 2nd - prefer nodes with the lowest number of attachments in this context
750 : // 3rd - if all else is equal, sort by node ID for determinism in tests.
751 46 : scores.sort_by_key(|i| (i.1, i.2, i.0));
752 :
753 : if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
754 : scores.first()
755 : {
756 : if attached != *preferred_node {
757 : // The best alternative must be more than 1 better than us, otherwise we could end
758 : // up flapping back next time we're called (e.g. there's no point migrating from
759 : // a location with score 1 to a score zero, because on next location the situation
760 : // would be the same, but in reverse).
761 : if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
762 : || current_attachment_count > *preferred_attachment_count + 1
763 : {
764 : tracing::info!(
765 : "Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
766 : self.intent.get_secondary()
767 : );
768 : return Some(ScheduleOptimization {
769 : sequence: self.sequence,
770 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
771 : old_attached_node_id: attached,
772 : new_attached_node_id: *preferred_node,
773 : }),
774 : });
775 : }
776 : } else {
777 : tracing::debug!(
778 : "Node {} is already preferred (score {:?})",
779 : preferred_node,
780 : preferred_affinity_score
781 : );
782 : }
783 : }
784 :
785 : // Fall-through: we didn't find an optimization
786 : None
787 : }
788 :
789 20 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
790 : pub(crate) fn optimize_secondary(
791 : &self,
792 : scheduler: &mut Scheduler,
793 : schedule_context: &ScheduleContext,
794 : ) -> Option<ScheduleOptimization> {
795 : if self.intent.secondary.is_empty() {
796 : // We can only do useful work if we have both attached and secondary locations: this
797 : // function doesn't schedule new locations, only swaps between attached and secondaries.
798 : return None;
799 : }
800 :
801 : for secondary in self.intent.get_secondary() {
802 : let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
803 : // We're already on a node unaffected any affinity constraints,
804 : // so we won't change it.
805 : continue;
806 : };
807 :
808 : // Let the scheduler suggest a node, where it would put us if we were scheduling afresh
809 : // This implicitly limits the choice to nodes that are available, and prefers nodes
810 : // with lower utilization.
811 : let Ok(candidate_node) = scheduler.schedule_shard::<SecondaryShardTag>(
812 : &self.intent.all_pageservers(),
813 : schedule_context,
814 : ) else {
815 : // A scheduling error means we have no possible candidate replacements
816 : continue;
817 : };
818 :
819 : let candidate_affinity_score = schedule_context
820 : .nodes
821 : .get(&candidate_node)
822 : .unwrap_or(&AffinityScore::FREE);
823 :
824 : // The best alternative must be more than 1 better than us, otherwise we could end
825 : // up flapping back next time we're called.
826 : if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
827 : // If some other node is available and has a lower score than this node, then
828 : // that other node is a good place to migrate to.
829 : tracing::info!(
830 : "Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
831 : self.intent.get_secondary()
832 : );
833 : return Some(ScheduleOptimization {
834 : sequence: self.sequence,
835 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
836 : old_node_id: *secondary,
837 : new_node_id: candidate_node,
838 : }),
839 : });
840 : }
841 : }
842 :
843 : None
844 : }
845 :
846 : /// Return true if the optimization was really applied: it will not be applied if the optimization's
847 : /// sequence is behind this tenant shard's
848 9 : pub(crate) fn apply_optimization(
849 9 : &mut self,
850 9 : scheduler: &mut Scheduler,
851 9 : optimization: ScheduleOptimization,
852 9 : ) -> bool {
853 9 : if optimization.sequence != self.sequence {
854 0 : return false;
855 9 : }
856 9 :
857 9 : metrics::METRICS_REGISTRY
858 9 : .metrics_group
859 9 : .storage_controller_schedule_optimization
860 9 : .inc();
861 9 :
862 9 : match optimization.action {
863 : ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
864 4 : old_attached_node_id,
865 4 : new_attached_node_id,
866 4 : }) => {
867 4 : self.intent.demote_attached(scheduler, old_attached_node_id);
868 4 : self.intent
869 4 : .promote_attached(scheduler, new_attached_node_id);
870 4 : }
871 : ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
872 5 : old_node_id,
873 5 : new_node_id,
874 5 : }) => {
875 5 : self.intent.remove_secondary(scheduler, old_node_id);
876 5 : self.intent.push_secondary(scheduler, new_node_id);
877 5 : }
878 : }
879 :
880 9 : true
881 9 : }
882 :
883 : /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
884 : /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
885 : /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
886 : ///
887 : /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
888 : /// funciton should not be used to decide whether to reconcile.
889 0 : pub(crate) fn stably_attached(&self) -> Option<NodeId> {
890 0 : if let Some(attach_intent) = self.intent.attached {
891 0 : match self.observed.locations.get(&attach_intent) {
892 0 : Some(loc) => match &loc.conf {
893 0 : Some(conf) => match conf.mode {
894 : LocationConfigMode::AttachedMulti
895 : | LocationConfigMode::AttachedSingle
896 : | LocationConfigMode::AttachedStale => {
897 : // Our intent and observed state agree that this node is in an attached state.
898 0 : Some(attach_intent)
899 : }
900 : // Our observed config is not an attached state
901 0 : _ => None,
902 : },
903 : // Our observed state is None, i.e. in flux
904 0 : None => None,
905 : },
906 : // We have no observed state for this node
907 0 : None => None,
908 : }
909 : } else {
910 : // Our intent is not to attach
911 0 : None
912 : }
913 0 : }
914 :
915 0 : fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
916 0 : let mut dirty_nodes = HashSet::new();
917 :
918 0 : if let Some(node_id) = self.intent.attached {
919 : // Maybe panic: it is a severe bug if we try to attach while generation is null.
920 0 : let generation = self
921 0 : .generation
922 0 : .expect("Attempted to enter attached state without a generation");
923 0 :
924 0 : let wanted_conf =
925 0 : attached_location_conf(generation, &self.shard, &self.config, &self.policy);
926 0 : match self.observed.locations.get(&node_id) {
927 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
928 0 : Some(_) | None => {
929 0 : dirty_nodes.insert(node_id);
930 0 : }
931 : }
932 0 : }
933 :
934 0 : for node_id in &self.intent.secondary {
935 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
936 0 : match self.observed.locations.get(node_id) {
937 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
938 0 : Some(_) | None => {
939 0 : dirty_nodes.insert(*node_id);
940 0 : }
941 : }
942 : }
943 :
944 0 : for node_id in self.observed.locations.keys() {
945 0 : if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
946 0 : // We have observed state that isn't part of our intent: need to clean it up.
947 0 : dirty_nodes.insert(*node_id);
948 0 : }
949 : }
950 :
951 0 : dirty_nodes.retain(|node_id| {
952 0 : nodes
953 0 : .get(node_id)
954 0 : .map(|n| n.is_available())
955 0 : .unwrap_or(false)
956 0 : });
957 0 :
958 0 : !dirty_nodes.is_empty()
959 0 : }
960 :
961 : #[allow(clippy::too_many_arguments)]
962 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
963 : pub(crate) fn get_reconcile_needed(
964 : &mut self,
965 : pageservers: &Arc<HashMap<NodeId, Node>>,
966 : ) -> ReconcileNeeded {
967 : // If there are any ambiguous observed states, and the nodes they refer to are available,
968 : // we should reconcile to clean them up.
969 : let mut dirty_observed = false;
970 : for (node_id, observed_loc) in &self.observed.locations {
971 : let node = pageservers
972 : .get(node_id)
973 : .expect("Nodes may not be removed while referenced");
974 : if observed_loc.conf.is_none() && node.is_available() {
975 : dirty_observed = true;
976 : break;
977 : }
978 : }
979 :
980 : let active_nodes_dirty = self.dirty(pageservers);
981 :
982 : // Even if there is no pageserver work to be done, if we have a pending notification to computes,
983 : // wake up a reconciler to send it.
984 : let do_reconcile =
985 : active_nodes_dirty || dirty_observed || self.pending_compute_notification;
986 :
987 : if !do_reconcile {
988 : tracing::debug!("Not dirty, no reconciliation needed.");
989 : return ReconcileNeeded::No;
990 : }
991 :
992 : // If we are currently splitting, then never start a reconciler task: the splitting logic
993 : // requires that shards are not interfered with while it runs. Do this check here rather than
994 : // up top, so that we only log this message if we would otherwise have done a reconciliation.
995 : if !matches!(self.splitting, SplitState::Idle) {
996 : tracing::info!("Refusing to reconcile, splitting in progress");
997 : return ReconcileNeeded::No;
998 : }
999 :
1000 : // Reconcile already in flight for the current sequence?
1001 : if let Some(handle) = &self.reconciler {
1002 : if handle.sequence == self.sequence {
1003 : tracing::info!(
1004 : "Reconciliation already in progress for sequence {:?}",
1005 : self.sequence,
1006 : );
1007 : return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
1008 : tenant_shard_id: self.tenant_shard_id,
1009 : seq_wait: self.waiter.clone(),
1010 : error_seq_wait: self.error_waiter.clone(),
1011 : error: self.last_error.clone(),
1012 : seq: self.sequence,
1013 : });
1014 : }
1015 : }
1016 :
1017 : // Pre-checks done: finally check whether we may actually do the work
1018 : match self.scheduling_policy {
1019 : ShardSchedulingPolicy::Active
1020 : | ShardSchedulingPolicy::Essential
1021 : | ShardSchedulingPolicy::Pause => {}
1022 : ShardSchedulingPolicy::Stop => {
1023 : // We only reach this point if there is work to do and we're going to skip
1024 : // doing it: warn it obvious why this tenant isn't doing what it ought to.
1025 : tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
1026 : return ReconcileNeeded::No;
1027 : }
1028 : }
1029 :
1030 : ReconcileNeeded::Yes
1031 : }
1032 :
1033 : /// Ensure the sequence number is set to a value where waiting for this value will make us wait
1034 : /// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
1035 : ///
1036 : /// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
1037 : /// that the waiter will not complete until some future Reconciler is constructed and run.
1038 0 : fn ensure_sequence_ahead(&mut self) {
1039 0 : // Find the highest sequence for which a Reconciler has previously run or is currently
1040 0 : // running
1041 0 : let max_seen = std::cmp::max(
1042 0 : self.reconciler
1043 0 : .as_ref()
1044 0 : .map(|r| r.sequence)
1045 0 : .unwrap_or(Sequence(0)),
1046 0 : std::cmp::max(self.waiter.load(), self.error_waiter.load()),
1047 0 : );
1048 0 :
1049 0 : if self.sequence <= max_seen {
1050 0 : self.sequence = max_seen.next();
1051 0 : }
1052 0 : }
1053 :
1054 : /// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
1055 : ///
1056 : /// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
1057 : /// you would like to wait on the next reconciler that gets spawned in the background.
1058 0 : pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
1059 0 : self.ensure_sequence_ahead();
1060 0 :
1061 0 : ReconcilerWaiter {
1062 0 : tenant_shard_id: self.tenant_shard_id,
1063 0 : seq_wait: self.waiter.clone(),
1064 0 : error_seq_wait: self.error_waiter.clone(),
1065 0 : error: self.last_error.clone(),
1066 0 : seq: self.sequence,
1067 0 : }
1068 0 : }
1069 :
1070 : #[allow(clippy::too_many_arguments)]
1071 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
1072 : pub(crate) fn spawn_reconciler(
1073 : &mut self,
1074 : result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
1075 : pageservers: &Arc<HashMap<NodeId, Node>>,
1076 : compute_hook: &Arc<ComputeHook>,
1077 : reconciler_config: ReconcilerConfig,
1078 : service_config: &service::Config,
1079 : persistence: &Arc<Persistence>,
1080 : units: ReconcileUnits,
1081 : gate_guard: GateGuard,
1082 : cancel: &CancellationToken,
1083 : ) -> Option<ReconcilerWaiter> {
1084 : // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
1085 : // doing our sequence's work.
1086 : let old_handle = self.reconciler.take();
1087 :
1088 : // Build list of nodes from which the reconciler should detach
1089 : let mut detach = Vec::new();
1090 : for node_id in self.observed.locations.keys() {
1091 : if self.intent.get_attached() != &Some(*node_id)
1092 : && !self.intent.secondary.contains(node_id)
1093 : {
1094 : detach.push(
1095 : pageservers
1096 : .get(node_id)
1097 : .expect("Intent references non-existent pageserver")
1098 : .clone(),
1099 : )
1100 : }
1101 : }
1102 :
1103 : // Advance the sequence before spawning a reconciler, so that sequence waiters
1104 : // can distinguish between before+after the reconcile completes.
1105 : self.ensure_sequence_ahead();
1106 :
1107 : let reconciler_cancel = cancel.child_token();
1108 : let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
1109 : let mut reconciler = Reconciler {
1110 : tenant_shard_id: self.tenant_shard_id,
1111 : shard: self.shard,
1112 : placement_policy: self.policy.clone(),
1113 : generation: self.generation,
1114 : intent: reconciler_intent,
1115 : detach,
1116 : reconciler_config,
1117 : config: self.config.clone(),
1118 : observed: self.observed.clone(),
1119 : compute_hook: compute_hook.clone(),
1120 : service_config: service_config.clone(),
1121 : _gate_guard: gate_guard,
1122 : _resource_units: units,
1123 : cancel: reconciler_cancel.clone(),
1124 : persistence: persistence.clone(),
1125 : compute_notify_failure: false,
1126 : };
1127 :
1128 : let reconcile_seq = self.sequence;
1129 :
1130 : tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
1131 : let must_notify = self.pending_compute_notification;
1132 : let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
1133 : tenant_id=%reconciler.tenant_shard_id.tenant_id,
1134 : shard_id=%reconciler.tenant_shard_id.shard_slug());
1135 : metrics::METRICS_REGISTRY
1136 : .metrics_group
1137 : .storage_controller_reconcile_spawn
1138 : .inc();
1139 : let result_tx = result_tx.clone();
1140 : let join_handle = tokio::task::spawn(
1141 0 : async move {
1142 : // Wait for any previous reconcile task to complete before we start
1143 0 : if let Some(old_handle) = old_handle {
1144 0 : old_handle.cancel.cancel();
1145 0 : if let Err(e) = old_handle.handle.await {
1146 : // We can't do much with this other than log it: the task is done, so
1147 : // we may proceed with our work.
1148 0 : tracing::error!("Unexpected join error waiting for reconcile task: {e}");
1149 0 : }
1150 0 : }
1151 :
1152 : // Early check for cancellation before doing any work
1153 : // TODO: wrap all remote API operations in cancellation check
1154 : // as well.
1155 0 : if reconciler.cancel.is_cancelled() {
1156 0 : metrics::METRICS_REGISTRY
1157 0 : .metrics_group
1158 0 : .storage_controller_reconcile_complete
1159 0 : .inc(ReconcileCompleteLabelGroup {
1160 0 : status: ReconcileOutcome::Cancel,
1161 0 : });
1162 0 : return;
1163 0 : }
1164 :
1165 : // Attempt to make observed state match intent state
1166 0 : let result = reconciler.reconcile().await;
1167 :
1168 : // If we know we had a pending compute notification from some previous action, send a notification irrespective
1169 : // of whether the above reconcile() did any work
1170 0 : if result.is_ok() && must_notify {
1171 : // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
1172 0 : reconciler.compute_notify().await.ok();
1173 0 : }
1174 :
1175 : // Update result counter
1176 0 : let outcome_label = match &result {
1177 0 : Ok(_) => ReconcileOutcome::Success,
1178 0 : Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
1179 0 : Err(_) => ReconcileOutcome::Error,
1180 : };
1181 :
1182 0 : metrics::METRICS_REGISTRY
1183 0 : .metrics_group
1184 0 : .storage_controller_reconcile_complete
1185 0 : .inc(ReconcileCompleteLabelGroup {
1186 0 : status: outcome_label,
1187 0 : });
1188 0 :
1189 0 : // Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
1190 0 : // try and schedule more work in response to our result.
1191 0 : let result = ReconcileResult {
1192 0 : sequence: reconcile_seq,
1193 0 : result,
1194 0 : tenant_shard_id: reconciler.tenant_shard_id,
1195 0 : generation: reconciler.generation,
1196 0 : observed: reconciler.observed,
1197 0 : pending_compute_notification: reconciler.compute_notify_failure,
1198 0 : };
1199 0 :
1200 0 : result_tx
1201 0 : .send(ReconcileResultRequest::ReconcileResult(result))
1202 0 : .ok();
1203 0 : }
1204 : .instrument(reconciler_span),
1205 : );
1206 :
1207 : self.reconciler = Some(ReconcilerHandle {
1208 : sequence: self.sequence,
1209 : handle: join_handle,
1210 : cancel: reconciler_cancel,
1211 : });
1212 :
1213 : Some(ReconcilerWaiter {
1214 : tenant_shard_id: self.tenant_shard_id,
1215 : seq_wait: self.waiter.clone(),
1216 : error_seq_wait: self.error_waiter.clone(),
1217 : error: self.last_error.clone(),
1218 : seq: self.sequence,
1219 : })
1220 : }
1221 :
1222 : /// Get a waiter for any reconciliation in flight, but do not start reconciliation
1223 : /// if it is not already running
1224 0 : pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
1225 0 : if self.reconciler.is_some() {
1226 0 : Some(ReconcilerWaiter {
1227 0 : tenant_shard_id: self.tenant_shard_id,
1228 0 : seq_wait: self.waiter.clone(),
1229 0 : error_seq_wait: self.error_waiter.clone(),
1230 0 : error: self.last_error.clone(),
1231 0 : seq: self.sequence,
1232 0 : })
1233 : } else {
1234 0 : None
1235 : }
1236 0 : }
1237 :
1238 : /// Called when a ReconcileResult has been emitted and the service is updating
1239 : /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
1240 : /// the handle to indicate there is no longer a reconciliation in progress.
1241 0 : pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
1242 0 : if let Some(reconcile_handle) = &self.reconciler {
1243 0 : if reconcile_handle.sequence <= sequence {
1244 0 : self.reconciler = None;
1245 0 : }
1246 0 : }
1247 0 : }
1248 :
1249 : /// If we had any state at all referring to this node ID, drop it. Does not
1250 : /// attempt to reschedule.
1251 : ///
1252 : /// Returns true if we modified the node's intent state.
1253 0 : pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
1254 0 : let mut intent_modified = false;
1255 0 :
1256 0 : // Drop if this node was our attached intent
1257 0 : if self.intent.attached == Some(node_id) {
1258 0 : self.intent.attached = None;
1259 0 : intent_modified = true;
1260 0 : }
1261 :
1262 : // Drop from the list of secondaries, and check if we modified it
1263 0 : let had_secondaries = self.intent.secondary.len();
1264 0 : self.intent.secondary.retain(|n| n != &node_id);
1265 0 : intent_modified |= self.intent.secondary.len() != had_secondaries;
1266 0 :
1267 0 : debug_assert!(!self.intent.all_pageservers().contains(&node_id));
1268 :
1269 0 : intent_modified
1270 0 : }
1271 :
1272 0 : pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
1273 0 : self.scheduling_policy = p;
1274 0 : }
1275 :
1276 0 : pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
1277 0 : &self.scheduling_policy
1278 0 : }
1279 :
1280 0 : pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
1281 0 : // Ordering: always set last_error before advancing sequence, so that sequence
1282 0 : // waiters are guaranteed to see a Some value when they see an error.
1283 0 : *(self.last_error.lock().unwrap()) = Some(Arc::new(error));
1284 0 : self.error_waiter.advance(sequence);
1285 0 : }
1286 :
1287 0 : pub(crate) fn from_persistent(
1288 0 : tsp: TenantShardPersistence,
1289 0 : intent: IntentState,
1290 0 : ) -> anyhow::Result<Self> {
1291 0 : let tenant_shard_id = tsp.get_tenant_shard_id()?;
1292 0 : let shard_identity = tsp.get_shard_identity()?;
1293 :
1294 0 : Ok(Self {
1295 0 : tenant_shard_id,
1296 0 : shard: shard_identity,
1297 0 : sequence: Sequence::initial(),
1298 0 : generation: tsp.generation.map(|g| Generation::new(g as u32)),
1299 0 : policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
1300 0 : intent,
1301 0 : observed: ObservedState::new(),
1302 0 : config: serde_json::from_str(&tsp.config).unwrap(),
1303 0 : reconciler: None,
1304 0 : splitting: tsp.splitting,
1305 0 : waiter: Arc::new(SeqWait::new(Sequence::initial())),
1306 0 : error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
1307 0 : last_error: Arc::default(),
1308 0 : pending_compute_notification: false,
1309 0 : delayed_reconcile: false,
1310 0 : scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
1311 0 : preferred_az_id: tsp.preferred_az_id,
1312 0 : })
1313 0 : }
1314 :
1315 0 : pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
1316 0 : TenantShardPersistence {
1317 0 : tenant_id: self.tenant_shard_id.tenant_id.to_string(),
1318 0 : shard_number: self.tenant_shard_id.shard_number.0 as i32,
1319 0 : shard_count: self.tenant_shard_id.shard_count.literal() as i32,
1320 0 : shard_stripe_size: self.shard.stripe_size.0 as i32,
1321 0 : generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
1322 0 : generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
1323 0 : placement_policy: serde_json::to_string(&self.policy).unwrap(),
1324 0 : config: serde_json::to_string(&self.config).unwrap(),
1325 0 : splitting: SplitState::default(),
1326 0 : scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
1327 0 : preferred_az_id: self.preferred_az_id.clone(),
1328 0 : }
1329 0 : }
1330 :
1331 0 : pub(crate) fn preferred_az(&self) -> Option<&str> {
1332 0 : self.preferred_az_id.as_deref()
1333 0 : }
1334 :
1335 0 : pub(crate) fn set_preferred_az(&mut self, preferred_az_id: String) {
1336 0 : self.preferred_az_id = Some(preferred_az_id);
1337 0 : }
1338 : }
1339 :
1340 : #[cfg(test)]
1341 : pub(crate) mod tests {
1342 : use std::{cell::RefCell, rc::Rc};
1343 :
1344 : use pageserver_api::{
1345 : controller_api::NodeAvailability,
1346 : shard::{ShardCount, ShardNumber},
1347 : };
1348 : use utils::id::TenantId;
1349 :
1350 : use crate::scheduler::test_utils::make_test_nodes;
1351 :
1352 : use super::*;
1353 :
1354 7 : fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
1355 7 : let tenant_id = TenantId::generate();
1356 7 : let shard_number = ShardNumber(0);
1357 7 : let shard_count = ShardCount::new(1);
1358 7 :
1359 7 : let tenant_shard_id = TenantShardId {
1360 7 : tenant_id,
1361 7 : shard_number,
1362 7 : shard_count,
1363 7 : };
1364 7 : TenantShard::new(
1365 7 : tenant_shard_id,
1366 7 : ShardIdentity::new(
1367 7 : shard_number,
1368 7 : shard_count,
1369 7 : pageserver_api::shard::ShardStripeSize(32768),
1370 7 : )
1371 7 : .unwrap(),
1372 7 : policy,
1373 7 : )
1374 7 : }
1375 :
1376 3 : fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec<TenantShard> {
1377 3 : let tenant_id = TenantId::generate();
1378 3 :
1379 3 : (0..shard_count.count())
1380 12 : .map(|i| {
1381 12 : let shard_number = ShardNumber(i);
1382 12 :
1383 12 : let tenant_shard_id = TenantShardId {
1384 12 : tenant_id,
1385 12 : shard_number,
1386 12 : shard_count,
1387 12 : };
1388 12 : TenantShard::new(
1389 12 : tenant_shard_id,
1390 12 : ShardIdentity::new(
1391 12 : shard_number,
1392 12 : shard_count,
1393 12 : pageserver_api::shard::ShardStripeSize(32768),
1394 12 : )
1395 12 : .unwrap(),
1396 12 : policy.clone(),
1397 12 : )
1398 12 : })
1399 3 : .collect()
1400 3 : }
1401 :
1402 : /// Test the scheduling behaviors used when a tenant configured for HA is subject
1403 : /// to nodes being marked offline.
1404 : #[test]
1405 1 : fn tenant_ha_scheduling() -> anyhow::Result<()> {
1406 1 : // Start with three nodes. Our tenant will only use two. The third one is
1407 1 : // expected to remain unused.
1408 1 : let mut nodes = make_test_nodes(3);
1409 1 :
1410 1 : let mut scheduler = Scheduler::new(nodes.values());
1411 1 : let mut context = ScheduleContext::default();
1412 1 :
1413 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1414 1 : tenant_shard
1415 1 : .schedule(&mut scheduler, &mut context)
1416 1 : .expect("we have enough nodes, scheduling should work");
1417 1 :
1418 1 : // Expect to initially be schedule on to different nodes
1419 1 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
1420 1 : assert!(tenant_shard.intent.attached.is_some());
1421 :
1422 1 : let attached_node_id = tenant_shard.intent.attached.unwrap();
1423 1 : let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
1424 1 : assert_ne!(attached_node_id, secondary_node_id);
1425 :
1426 : // Notifying the attached node is offline should demote it to a secondary
1427 1 : let changed = tenant_shard
1428 1 : .intent
1429 1 : .demote_attached(&mut scheduler, attached_node_id);
1430 1 : assert!(changed);
1431 1 : assert!(tenant_shard.intent.attached.is_none());
1432 1 : assert_eq!(tenant_shard.intent.secondary.len(), 2);
1433 :
1434 : // Update the scheduler state to indicate the node is offline
1435 1 : nodes
1436 1 : .get_mut(&attached_node_id)
1437 1 : .unwrap()
1438 1 : .set_availability(NodeAvailability::Offline);
1439 1 : scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
1440 1 :
1441 1 : // Scheduling the node should promote the still-available secondary node to attached
1442 1 : tenant_shard
1443 1 : .schedule(&mut scheduler, &mut context)
1444 1 : .expect("active nodes are available");
1445 1 : assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
1446 :
1447 : // The original attached node should have been retained as a secondary
1448 1 : assert_eq!(
1449 1 : *tenant_shard.intent.secondary.iter().last().unwrap(),
1450 1 : attached_node_id
1451 1 : );
1452 :
1453 1 : tenant_shard.intent.clear(&mut scheduler);
1454 1 :
1455 1 : Ok(())
1456 1 : }
1457 :
1458 : #[test]
1459 1 : fn intent_from_observed() -> anyhow::Result<()> {
1460 1 : let nodes = make_test_nodes(3);
1461 1 : let mut scheduler = Scheduler::new(nodes.values());
1462 1 :
1463 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1464 1 :
1465 1 : tenant_shard.observed.locations.insert(
1466 1 : NodeId(3),
1467 1 : ObservedStateLocation {
1468 1 : conf: Some(LocationConfig {
1469 1 : mode: LocationConfigMode::AttachedMulti,
1470 1 : generation: Some(2),
1471 1 : secondary_conf: None,
1472 1 : shard_number: tenant_shard.shard.number.0,
1473 1 : shard_count: tenant_shard.shard.count.literal(),
1474 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1475 1 : tenant_conf: TenantConfig::default(),
1476 1 : }),
1477 1 : },
1478 1 : );
1479 1 :
1480 1 : tenant_shard.observed.locations.insert(
1481 1 : NodeId(2),
1482 1 : ObservedStateLocation {
1483 1 : conf: Some(LocationConfig {
1484 1 : mode: LocationConfigMode::AttachedStale,
1485 1 : generation: Some(1),
1486 1 : secondary_conf: None,
1487 1 : shard_number: tenant_shard.shard.number.0,
1488 1 : shard_count: tenant_shard.shard.count.literal(),
1489 1 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1490 1 : tenant_conf: TenantConfig::default(),
1491 1 : }),
1492 1 : },
1493 1 : );
1494 1 :
1495 1 : tenant_shard.intent_from_observed(&mut scheduler);
1496 1 :
1497 1 : // The highest generationed attached location gets used as attached
1498 1 : assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
1499 : // Other locations get used as secondary
1500 1 : assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
1501 :
1502 1 : scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
1503 :
1504 1 : tenant_shard.intent.clear(&mut scheduler);
1505 1 : Ok(())
1506 1 : }
1507 :
1508 : #[test]
1509 1 : fn scheduling_mode() -> anyhow::Result<()> {
1510 1 : let nodes = make_test_nodes(3);
1511 1 : let mut scheduler = Scheduler::new(nodes.values());
1512 1 :
1513 1 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1514 1 :
1515 1 : // In pause mode, schedule() shouldn't do anything
1516 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
1517 1 : assert!(tenant_shard
1518 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
1519 1 : .is_ok());
1520 1 : assert!(tenant_shard.intent.all_pageservers().is_empty());
1521 :
1522 : // In active mode, schedule() works
1523 1 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
1524 1 : assert!(tenant_shard
1525 1 : .schedule(&mut scheduler, &mut ScheduleContext::default())
1526 1 : .is_ok());
1527 1 : assert!(!tenant_shard.intent.all_pageservers().is_empty());
1528 :
1529 1 : tenant_shard.intent.clear(&mut scheduler);
1530 1 : Ok(())
1531 1 : }
1532 :
1533 : #[test]
1534 1 : fn optimize_attachment() -> anyhow::Result<()> {
1535 1 : let nodes = make_test_nodes(3);
1536 1 : let mut scheduler = Scheduler::new(nodes.values());
1537 1 :
1538 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
1539 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
1540 1 :
1541 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
1542 1 : // on different nodes.
1543 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1544 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
1545 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1546 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
1547 1 :
1548 1 : let mut schedule_context = ScheduleContext::default();
1549 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
1550 1 : schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
1551 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
1552 1 : schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
1553 1 :
1554 1 : let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
1555 1 :
1556 1 : // Either shard should recognize that it has the option to switch to a secondary location where there
1557 1 : // would be no other shards from the same tenant, and request to do so.
1558 1 : assert_eq!(
1559 1 : optimization_a,
1560 1 : Some(ScheduleOptimization {
1561 1 : sequence: shard_a.sequence,
1562 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1563 1 : old_attached_node_id: NodeId(1),
1564 1 : new_attached_node_id: NodeId(2)
1565 1 : })
1566 1 : })
1567 1 : );
1568 :
1569 : // Note that these optimizing two shards in the same tenant with the same ScheduleContext is
1570 : // mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
1571 : // of [`Service::optimize_all`] to avoid trying
1572 : // to do optimizations for multiple shards in the same tenant at the same time. Generating
1573 : // both optimizations is just done for test purposes
1574 1 : let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
1575 1 : assert_eq!(
1576 1 : optimization_b,
1577 1 : Some(ScheduleOptimization {
1578 1 : sequence: shard_b.sequence,
1579 1 : action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
1580 1 : old_attached_node_id: NodeId(1),
1581 1 : new_attached_node_id: NodeId(3)
1582 1 : })
1583 1 : })
1584 1 : );
1585 :
1586 : // Applying these optimizations should result in the end state proposed
1587 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
1588 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
1589 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
1590 1 : shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
1591 1 : assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
1592 1 : assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
1593 :
1594 1 : shard_a.intent.clear(&mut scheduler);
1595 1 : shard_b.intent.clear(&mut scheduler);
1596 1 :
1597 1 : Ok(())
1598 1 : }
1599 :
1600 : #[test]
1601 1 : fn optimize_secondary() -> anyhow::Result<()> {
1602 1 : let nodes = make_test_nodes(4);
1603 1 : let mut scheduler = Scheduler::new(nodes.values());
1604 1 :
1605 1 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
1606 1 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
1607 1 :
1608 1 : // Initially: both nodes attached on shard 1, and both have secondary locations
1609 1 : // on different nodes.
1610 1 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1611 1 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
1612 1 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
1613 1 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
1614 1 :
1615 1 : let mut schedule_context = ScheduleContext::default();
1616 1 : schedule_context.avoid(&shard_a.intent.all_pageservers());
1617 1 : schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
1618 1 : schedule_context.avoid(&shard_b.intent.all_pageservers());
1619 1 : schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
1620 1 :
1621 1 : let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);
1622 1 :
1623 1 : // Since there is a node with no locations available, the node with two locations for the
1624 1 : // same tenant should generate an optimization to move one away
1625 1 : assert_eq!(
1626 1 : optimization_a,
1627 1 : Some(ScheduleOptimization {
1628 1 : sequence: shard_a.sequence,
1629 1 : action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
1630 1 : old_node_id: NodeId(3),
1631 1 : new_node_id: NodeId(4)
1632 1 : })
1633 1 : })
1634 1 : );
1635 :
1636 1 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
1637 1 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
1638 1 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
1639 :
1640 1 : shard_a.intent.clear(&mut scheduler);
1641 1 : shard_b.intent.clear(&mut scheduler);
1642 1 :
1643 1 : Ok(())
1644 1 : }
1645 :
1646 : // Optimize til quiescent: this emulates what Service::optimize_all does, when
1647 : // called repeatedly in the background.
1648 : // Returns the applied optimizations
1649 3 : fn optimize_til_idle(
1650 3 : nodes: &HashMap<NodeId, Node>,
1651 3 : scheduler: &mut Scheduler,
1652 3 : shards: &mut [TenantShard],
1653 3 : ) -> Vec<ScheduleOptimization> {
1654 3 : let mut loop_n = 0;
1655 3 : let mut optimizations = Vec::default();
1656 : loop {
1657 9 : let mut schedule_context = ScheduleContext::default();
1658 9 : let mut any_changed = false;
1659 :
1660 36 : for shard in shards.iter() {
1661 36 : schedule_context.avoid(&shard.intent.all_pageservers());
1662 36 : if let Some(attached) = shard.intent.get_attached() {
1663 36 : schedule_context.push_attached(*attached);
1664 36 : }
1665 : }
1666 :
1667 21 : for shard in shards.iter_mut() {
1668 21 : let optimization = shard.optimize_attachment(nodes, &schedule_context);
1669 21 : if let Some(optimization) = optimization {
1670 2 : optimizations.push(optimization.clone());
1671 2 : shard.apply_optimization(scheduler, optimization);
1672 2 : any_changed = true;
1673 2 : break;
1674 19 : }
1675 19 :
1676 19 : let optimization = shard.optimize_secondary(scheduler, &schedule_context);
1677 19 : if let Some(optimization) = optimization {
1678 4 : optimizations.push(optimization.clone());
1679 4 : shard.apply_optimization(scheduler, optimization);
1680 4 : any_changed = true;
1681 4 : break;
1682 15 : }
1683 : }
1684 :
1685 9 : if !any_changed {
1686 3 : break;
1687 6 : }
1688 6 :
1689 6 : // Assert no infinite loop
1690 6 : loop_n += 1;
1691 6 : assert!(loop_n < 1000);
1692 : }
1693 :
1694 3 : optimizations
1695 3 : }
1696 :
1697 : /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
1698 : /// that it converges.
1699 : #[test]
1700 1 : fn optimize_add_nodes() -> anyhow::Result<()> {
1701 1 : let nodes = make_test_nodes(4);
1702 1 :
1703 1 : // Only show the scheduler a couple of nodes
1704 1 : let mut scheduler = Scheduler::new([].iter());
1705 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1706 1 : scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
1707 1 :
1708 1 : let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
1709 1 : let mut schedule_context = ScheduleContext::default();
1710 5 : for shard in &mut shards {
1711 4 : assert!(shard
1712 4 : .schedule(&mut scheduler, &mut schedule_context)
1713 4 : .is_ok());
1714 : }
1715 :
1716 : // We should see equal number of locations on the two nodes.
1717 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
1718 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
1719 :
1720 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
1721 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
1722 :
1723 : // Add another two nodes: we should see the shards spread out when their optimize
1724 : // methods are called
1725 1 : scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
1726 1 : scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
1727 1 : optimize_til_idle(&nodes, &mut scheduler, &mut shards);
1728 1 :
1729 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
1730 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
1731 :
1732 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
1733 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
1734 :
1735 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
1736 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1);
1737 :
1738 1 : assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
1739 1 : assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1);
1740 :
1741 4 : for shard in shards.iter_mut() {
1742 4 : shard.intent.clear(&mut scheduler);
1743 4 : }
1744 :
1745 1 : Ok(())
1746 1 : }
1747 :
1748 : /// Test that initial shard scheduling is optimal. By optimal we mean
1749 : /// that the optimizer cannot find a way to improve it.
1750 : ///
1751 : /// This test is an example of the scheduling issue described in
1752 : /// https://github.com/neondatabase/neon/issues/8969
1753 : #[test]
1754 1 : fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
1755 : use itertools::Itertools;
1756 :
1757 1 : let nodes = make_test_nodes(2);
1758 1 :
1759 1 : let mut scheduler = Scheduler::new([].iter());
1760 1 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1761 1 : scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
1762 1 :
1763 1 : let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
1764 1 : let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
1765 1 :
1766 1 : let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
1767 1 : let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
1768 1 :
1769 4 : let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
1770 4 : let b_shards_with_context = b.iter_mut().map(|shard| (shard, b_context.clone()));
1771 1 :
1772 1 : let schedule_order = a_shards_with_context.interleave(b_shards_with_context);
1773 :
1774 9 : for (shard, context) in schedule_order {
1775 8 : let context = &mut *context.borrow_mut();
1776 8 : shard.schedule(&mut scheduler, context).unwrap();
1777 8 : }
1778 :
1779 1 : let applied_to_a = optimize_til_idle(&nodes, &mut scheduler, &mut a);
1780 1 : assert_eq!(applied_to_a, vec![]);
1781 :
1782 1 : let applied_to_b = optimize_til_idle(&nodes, &mut scheduler, &mut b);
1783 1 : assert_eq!(applied_to_b, vec![]);
1784 :
1785 8 : for shard in a.iter_mut().chain(b.iter_mut()) {
1786 8 : shard.intent.clear(&mut scheduler);
1787 8 : }
1788 :
1789 1 : Ok(())
1790 1 : }
1791 : }
|