Line data Source code
1 : use std::{
2 : collections::{HashMap, HashSet},
3 : sync::Arc,
4 : time::Duration,
5 : };
6 :
7 : use crate::{
8 : metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
9 : persistence::TenantShardPersistence,
10 : scheduler::{AffinityScore, MaySchedule, ScheduleContext},
11 : };
12 : use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy};
13 : use pageserver_api::{
14 : models::{LocationConfig, LocationConfigMode, TenantConfig},
15 : shard::{ShardIdentity, TenantShardId},
16 : };
17 : use serde::Serialize;
18 : use tokio::task::JoinHandle;
19 : use tokio_util::sync::CancellationToken;
20 : use tracing::{instrument, Instrument};
21 : use utils::{
22 : generation::Generation,
23 : id::NodeId,
24 : seqwait::{SeqWait, SeqWaitError},
25 : sync::gate::Gate,
26 : };
27 :
28 : use crate::{
29 : compute_hook::ComputeHook,
30 : node::Node,
31 : persistence::{split_state::SplitState, Persistence},
32 : reconciler::{
33 : attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
34 : },
35 : scheduler::{ScheduleError, Scheduler},
36 : service, Sequence,
37 : };
38 :
39 : /// Serialization helper
40 0 : fn read_mutex_content<S, T>(v: &std::sync::Mutex<T>, serializer: S) -> Result<S::Ok, S::Error>
41 0 : where
42 0 : S: serde::ser::Serializer,
43 0 : T: Clone + std::fmt::Display,
44 0 : {
45 0 : serializer.collect_str(&v.lock().unwrap())
46 0 : }
47 :
48 : /// In-memory state for a particular tenant shard.
49 : ///
50 : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
51 : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
52 0 : #[derive(Serialize)]
53 : pub(crate) struct TenantShard {
54 : pub(crate) tenant_shard_id: TenantShardId,
55 :
56 : pub(crate) shard: ShardIdentity,
57 :
58 : // Runtime only: sequence used to coordinate when updating this object while
59 : // with background reconcilers may be running. A reconciler runs to a particular
60 : // sequence.
61 : pub(crate) sequence: Sequence,
62 :
63 : // Latest generation number: next time we attach, increment this
64 : // and use the incremented number when attaching.
65 : //
66 : // None represents an incompletely onboarded tenant via the [`Service::location_config`]
67 : // API, where this tenant may only run in PlacementPolicy::Secondary.
68 : pub(crate) generation: Option<Generation>,
69 :
70 : // High level description of how the tenant should be set up. Provided
71 : // externally.
72 : pub(crate) policy: PlacementPolicy,
73 :
74 : // Low level description of exactly which pageservers should fulfil
75 : // which role. Generated by `Self::schedule`.
76 : pub(crate) intent: IntentState,
77 :
78 : // Low level description of how the tenant is configured on pageservers:
79 : // if this does not match `Self::intent` then the tenant needs reconciliation
80 : // with `Self::reconcile`.
81 : pub(crate) observed: ObservedState,
82 :
83 : // Tenant configuration, passed through opaquely to the pageserver. Identical
84 : // for all shards in a tenant.
85 : pub(crate) config: TenantConfig,
86 :
87 : /// If a reconcile task is currently in flight, it may be joined here (it is
88 : /// only safe to join if either the result has been received or the reconciler's
89 : /// cancellation token has been fired)
90 : #[serde(skip)]
91 : pub(crate) reconciler: Option<ReconcilerHandle>,
92 :
93 : /// If a tenant is being split, then all shards with that TenantId will have a
94 : /// SplitState set, this acts as a guard against other operations such as background
95 : /// reconciliation, and timeline creation.
96 : pub(crate) splitting: SplitState,
97 :
98 : /// Optionally wait for reconciliation to complete up to a particular
99 : /// sequence number.
100 : #[serde(skip)]
101 : pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
102 :
103 : /// Indicates sequence number for which we have encountered an error reconciling. If
104 : /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
105 : /// and callers should stop waiting for `waiter` and propagate the error.
106 : #[serde(skip)]
107 : pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
108 :
109 : /// The most recent error from a reconcile on this tenant
110 : /// TODO: generalize to an array of recent events
111 : /// TOOD: use a ArcSwap instead of mutex for faster reads?
112 : #[serde(serialize_with = "read_mutex_content")]
113 : pub(crate) last_error: std::sync::Arc<std::sync::Mutex<String>>,
114 :
115 : /// If we have a pending compute notification that for some reason we weren't able to send,
116 : /// set this to true. If this is set, calls to [`Self::maybe_reconcile`] will run a task to retry
117 : /// sending it. This is the mechanism by which compute notifications are included in the scope
118 : /// of state that we publish externally in an eventually consistent way.
119 : pub(crate) pending_compute_notification: bool,
120 :
121 : // Support/debug tool: if something is going wrong or flapping with scheduling, this may
122 : // be set to a non-active state to avoid making changes while the issue is fixed.
123 : scheduling_policy: ShardSchedulingPolicy,
124 : }
125 :
126 : #[derive(Default, Clone, Debug, Serialize)]
127 : pub(crate) struct IntentState {
128 : attached: Option<NodeId>,
129 : secondary: Vec<NodeId>,
130 : }
131 :
132 : impl IntentState {
133 4 : pub(crate) fn new() -> Self {
134 4 : Self {
135 4 : attached: None,
136 4 : secondary: vec![],
137 4 : }
138 4 : }
139 0 : pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
140 0 : if let Some(node_id) = node_id {
141 0 : scheduler.node_inc_ref(node_id);
142 0 : }
143 0 : Self {
144 0 : attached: node_id,
145 0 : secondary: vec![],
146 0 : }
147 0 : }
148 :
149 26 : pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
150 26 : if self.attached != new_attached {
151 26 : if let Some(old_attached) = self.attached.take() {
152 0 : scheduler.node_dec_ref(old_attached);
153 26 : }
154 26 : if let Some(new_attached) = &new_attached {
155 26 : scheduler.node_inc_ref(*new_attached);
156 26 : }
157 26 : self.attached = new_attached;
158 0 : }
159 26 : }
160 :
161 : /// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
162 : /// secondary to attached while maintaining the scheduler's reference counts.
163 14 : pub(crate) fn promote_attached(
164 14 : &mut self,
165 14 : _scheduler: &mut Scheduler,
166 14 : promote_secondary: NodeId,
167 14 : ) {
168 14 : // If we call this with a node that isn't in secondary, it would cause incorrect
169 14 : // scheduler reference counting, since we assume the node is already referenced as a secondary.
170 14 : debug_assert!(self.secondary.contains(&promote_secondary));
171 :
172 : // TODO: when scheduler starts tracking attached + secondary counts separately, we will
173 : // need to call into it here.
174 28 : self.secondary.retain(|n| n != &promote_secondary);
175 14 : self.attached = Some(promote_secondary);
176 14 : }
177 :
178 34 : pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
179 34 : debug_assert!(!self.secondary.contains(&new_secondary));
180 34 : scheduler.node_inc_ref(new_secondary);
181 34 : self.secondary.push(new_secondary);
182 34 : }
183 :
184 : /// It is legal to call this with a node that is not currently a secondary: that is a no-op
185 10 : pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
186 10 : let index = self.secondary.iter().position(|n| *n == node_id);
187 10 : if let Some(index) = index {
188 10 : scheduler.node_dec_ref(node_id);
189 10 : self.secondary.remove(index);
190 10 : }
191 10 : }
192 :
193 24 : pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
194 24 : for secondary in self.secondary.drain(..) {
195 24 : scheduler.node_dec_ref(secondary);
196 24 : }
197 24 : }
198 :
199 : /// Remove the last secondary node from the list of secondaries
200 0 : pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
201 0 : if let Some(node_id) = self.secondary.pop() {
202 0 : scheduler.node_dec_ref(node_id);
203 0 : }
204 0 : }
205 :
206 24 : pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
207 24 : if let Some(old_attached) = self.attached.take() {
208 24 : scheduler.node_dec_ref(old_attached);
209 24 : }
210 :
211 24 : self.clear_secondary(scheduler);
212 24 : }
213 :
214 172 : pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
215 172 : let mut result = Vec::new();
216 172 : if let Some(p) = self.attached {
217 168 : result.push(p)
218 4 : }
219 :
220 172 : result.extend(self.secondary.iter().copied());
221 172 :
222 172 : result
223 172 : }
224 :
225 144 : pub(crate) fn get_attached(&self) -> &Option<NodeId> {
226 144 : &self.attached
227 144 : }
228 :
229 38 : pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
230 38 : &self.secondary
231 38 : }
232 :
233 : /// If the node is in use as the attached location, demote it into
234 : /// the list of secondary locations. This is used when a node goes offline,
235 : /// and we want to use a different node for attachment, but not permanently
236 : /// forget the location on the offline node.
237 : ///
238 : /// Returns true if a change was made
239 14 : pub(crate) fn demote_attached(&mut self, node_id: NodeId) -> bool {
240 14 : if self.attached == Some(node_id) {
241 : // TODO: when scheduler starts tracking attached + secondary counts separately, we will
242 : // need to call into it here.
243 14 : self.attached = None;
244 14 : self.secondary.push(node_id);
245 14 : true
246 : } else {
247 0 : false
248 : }
249 14 : }
250 : }
251 :
252 : impl Drop for IntentState {
253 26 : fn drop(&mut self) {
254 26 : // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
255 26 : // We do not check this while panicking, to avoid polluting unit test failures or
256 26 : // other assertions with this assertion's output. It's still wrong to leak these,
257 26 : // but if we already have a panic then we don't need to independently flag this case.
258 26 : if !(std::thread::panicking()) {
259 26 : debug_assert!(self.attached.is_none() && self.secondary.is_empty());
260 0 : }
261 24 : }
262 : }
263 :
264 : #[derive(Default, Clone, Serialize)]
265 : pub(crate) struct ObservedState {
266 : pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
267 : }
268 :
269 : /// Our latest knowledge of how this tenant is configured in the outside world.
270 : ///
271 : /// Meaning:
272 : /// * No instance of this type exists for a node: we are certain that we have nothing configured on that
273 : /// node for this shard.
274 : /// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
275 : /// what it is (e.g. we failed partway through configuring it)
276 : /// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
277 : /// and that configuration will still be present unless something external interfered.
278 : #[derive(Clone, Serialize)]
279 : pub(crate) struct ObservedStateLocation {
280 : /// If None, it means we do not know the status of this shard's location on this node, but
281 : /// we know that we might have some state on this node.
282 : pub(crate) conf: Option<LocationConfig>,
283 : }
284 : pub(crate) struct ReconcilerWaiter {
285 : // For observability purposes, remember the ID of the shard we're
286 : // waiting for.
287 : pub(crate) tenant_shard_id: TenantShardId,
288 :
289 : seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
290 : error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
291 : error: std::sync::Arc<std::sync::Mutex<String>>,
292 : seq: Sequence,
293 : }
294 :
295 0 : #[derive(thiserror::Error, Debug)]
296 : pub enum ReconcileWaitError {
297 : #[error("Timeout waiting for shard {0}")]
298 : Timeout(TenantShardId),
299 : #[error("shutting down")]
300 : Shutdown,
301 : #[error("Reconcile error on shard {0}: {1}")]
302 : Failed(TenantShardId, String),
303 : }
304 :
305 : #[derive(Eq, PartialEq, Debug)]
306 : pub(crate) struct ReplaceSecondary {
307 : old_node_id: NodeId,
308 : new_node_id: NodeId,
309 : }
310 :
311 : #[derive(Eq, PartialEq, Debug)]
312 : pub(crate) struct MigrateAttachment {
313 : old_attached_node_id: NodeId,
314 : new_attached_node_id: NodeId,
315 : }
316 :
317 : #[derive(Eq, PartialEq, Debug)]
318 : pub(crate) enum ScheduleOptimization {
319 : // Replace one of our secondary locations with a different node
320 : ReplaceSecondary(ReplaceSecondary),
321 : // Migrate attachment to an existing secondary location
322 : MigrateAttachment(MigrateAttachment),
323 : }
324 :
325 : impl ReconcilerWaiter {
326 0 : pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
327 0 : tokio::select! {
328 0 : result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
329 0 : result.map_err(|e| match e {
330 0 : SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
331 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
332 0 : })?;
333 : },
334 0 : result = self.error_seq_wait.wait_for(self.seq) => {
335 0 : result.map_err(|e| match e {
336 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
337 0 : SeqWaitError::Timeout => unreachable!()
338 0 : })?;
339 :
340 : return Err(ReconcileWaitError::Failed(self.tenant_shard_id, self.error.lock().unwrap().clone()))
341 : }
342 : }
343 :
344 0 : Ok(())
345 0 : }
346 : }
347 :
348 : /// Having spawned a reconciler task, the tenant shard's state will carry enough
349 : /// information to optionally cancel & await it later.
350 : pub(crate) struct ReconcilerHandle {
351 : sequence: Sequence,
352 : handle: JoinHandle<()>,
353 : cancel: CancellationToken,
354 : }
355 :
356 : /// When a reconcile task completes, it sends this result object
357 : /// to be applied to the primary TenantShard.
358 : pub(crate) struct ReconcileResult {
359 : pub(crate) sequence: Sequence,
360 : /// On errors, `observed` should be treated as an incompleted description
361 : /// of state (i.e. any nodes present in the result should override nodes
362 : /// present in the parent tenant state, but any unmentioned nodes should
363 : /// not be removed from parent tenant state)
364 : pub(crate) result: Result<(), ReconcileError>,
365 :
366 : pub(crate) tenant_shard_id: TenantShardId,
367 : pub(crate) generation: Option<Generation>,
368 : pub(crate) observed: ObservedState,
369 :
370 : /// Set [`TenantShard::pending_compute_notification`] from this flag
371 : pub(crate) pending_compute_notification: bool,
372 : }
373 :
374 : impl ObservedState {
375 0 : pub(crate) fn new() -> Self {
376 0 : Self {
377 0 : locations: HashMap::new(),
378 0 : }
379 0 : }
380 : }
381 :
382 : impl TenantShard {
383 22 : pub(crate) fn new(
384 22 : tenant_shard_id: TenantShardId,
385 22 : shard: ShardIdentity,
386 22 : policy: PlacementPolicy,
387 22 : ) -> Self {
388 22 : Self {
389 22 : tenant_shard_id,
390 22 : policy,
391 22 : intent: IntentState::default(),
392 22 : generation: Some(Generation::new(0)),
393 22 : shard,
394 22 : observed: ObservedState::default(),
395 22 : config: TenantConfig::default(),
396 22 : reconciler: None,
397 22 : splitting: SplitState::Idle,
398 22 : sequence: Sequence(1),
399 22 : waiter: Arc::new(SeqWait::new(Sequence(0))),
400 22 : error_waiter: Arc::new(SeqWait::new(Sequence(0))),
401 22 : last_error: Arc::default(),
402 22 : pending_compute_notification: false,
403 22 : scheduling_policy: ShardSchedulingPolicy::default(),
404 22 : }
405 22 : }
406 :
407 : /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
408 : /// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
409 : /// to get an intent state that complies with placement policy. The overall goal is to do scheduling
410 : /// in a way that makes use of any configured locations that already exist in the outside world.
411 2 : pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
412 2 : // Choose an attached location by filtering observed locations, and then sorting to get the highest
413 2 : // generation
414 2 : let mut attached_locs = self
415 2 : .observed
416 2 : .locations
417 2 : .iter()
418 4 : .filter_map(|(node_id, l)| {
419 4 : if let Some(conf) = &l.conf {
420 4 : if conf.mode == LocationConfigMode::AttachedMulti
421 2 : || conf.mode == LocationConfigMode::AttachedSingle
422 2 : || conf.mode == LocationConfigMode::AttachedStale
423 : {
424 4 : Some((node_id, conf.generation))
425 : } else {
426 0 : None
427 : }
428 : } else {
429 0 : None
430 : }
431 4 : })
432 2 : .collect::<Vec<_>>();
433 2 :
434 4 : attached_locs.sort_by_key(|i| i.1);
435 2 : if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
436 2 : self.intent.set_attached(scheduler, Some(*node_id));
437 2 : }
438 :
439 : // All remaining observed locations generate secondary intents. This includes None
440 : // observations, as these may well have some local content on disk that is usable (this
441 : // is an edge case that might occur if we restarted during a migration or other change)
442 : //
443 : // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
444 : // will take care of promoting one of these secondaries to be attached.
445 4 : self.observed.locations.keys().for_each(|node_id| {
446 4 : if Some(*node_id) != self.intent.attached {
447 2 : self.intent.push_secondary(scheduler, *node_id);
448 2 : }
449 4 : });
450 2 : }
451 :
452 : /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
453 : /// attached pageserver for a shard.
454 : ///
455 : /// Returns whether we modified it, and the NodeId selected.
456 14 : fn schedule_attached(
457 14 : &mut self,
458 14 : scheduler: &mut Scheduler,
459 14 : context: &ScheduleContext,
460 14 : ) -> Result<(bool, NodeId), ScheduleError> {
461 : // No work to do if we already have an attached tenant
462 14 : if let Some(node_id) = self.intent.attached {
463 0 : return Ok((false, node_id));
464 14 : }
465 :
466 14 : if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
467 : // Promote a secondary
468 2 : tracing::debug!("Promoted secondary {} to attached", promote_secondary);
469 2 : self.intent.promote_attached(scheduler, promote_secondary);
470 2 : Ok((true, promote_secondary))
471 : } else {
472 : // Pick a fresh node: either we had no secondaries or none were schedulable
473 12 : let node_id = scheduler.schedule_shard(&self.intent.secondary, context)?;
474 12 : tracing::debug!("Selected {} as attached", node_id);
475 12 : self.intent.set_attached(scheduler, Some(node_id));
476 12 : Ok((true, node_id))
477 : }
478 14 : }
479 :
480 16 : pub(crate) fn schedule(
481 16 : &mut self,
482 16 : scheduler: &mut Scheduler,
483 16 : context: &mut ScheduleContext,
484 16 : ) -> Result<(), ScheduleError> {
485 16 : let r = self.do_schedule(scheduler, context);
486 16 :
487 16 : context.avoid(&self.intent.all_pageservers());
488 16 : if let Some(attached) = self.intent.get_attached() {
489 14 : context.push_attached(*attached);
490 14 : }
491 :
492 16 : r
493 16 : }
494 :
495 16 : pub(crate) fn do_schedule(
496 16 : &mut self,
497 16 : scheduler: &mut Scheduler,
498 16 : context: &ScheduleContext,
499 16 : ) -> Result<(), ScheduleError> {
500 16 : // TODO: before scheduling new nodes, check if any existing content in
501 16 : // self.intent refers to pageservers that are offline, and pick other
502 16 : // pageservers if so.
503 16 :
504 16 : // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
505 16 : // change their attach location.
506 16 :
507 16 : match self.scheduling_policy {
508 14 : ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
509 : ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
510 : // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
511 2 : tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
512 0 : "Scheduling is disabled by policy {:?}", self.scheduling_policy);
513 2 : return Ok(());
514 : }
515 : }
516 :
517 : // Build the set of pageservers already in use by this tenant, to avoid scheduling
518 : // more work on the same pageservers we're already using.
519 14 : let mut modified = false;
520 14 :
521 14 : // Add/remove nodes to fulfil policy
522 14 : use PlacementPolicy::*;
523 14 : match self.policy {
524 14 : Attached(secondary_count) => {
525 14 : let retain_secondaries = if self.intent.attached.is_none()
526 14 : && scheduler.node_preferred(&self.intent.secondary).is_some()
527 : {
528 : // If we have no attached, and one of the secondaries is elegible to be promoted, retain
529 : // one more secondary than we usually would, as one of them will become attached futher down this function.
530 2 : secondary_count + 1
531 : } else {
532 12 : secondary_count
533 : };
534 :
535 14 : while self.intent.secondary.len() > retain_secondaries {
536 0 : // We have no particular preference for one secondary location over another: just
537 0 : // arbitrarily drop from the end
538 0 : self.intent.pop_secondary(scheduler);
539 0 : modified = true;
540 0 : }
541 :
542 : // Should have exactly one attached, and N secondaries
543 14 : let (modified_attached, attached_node_id) =
544 14 : self.schedule_attached(scheduler, context)?;
545 14 : modified |= modified_attached;
546 14 :
547 14 : let mut used_pageservers = vec![attached_node_id];
548 26 : while self.intent.secondary.len() < secondary_count {
549 12 : let node_id = scheduler.schedule_shard(&used_pageservers, context)?;
550 12 : self.intent.push_secondary(scheduler, node_id);
551 12 : used_pageservers.push(node_id);
552 12 : modified = true;
553 : }
554 : }
555 : Secondary => {
556 0 : if let Some(node_id) = self.intent.get_attached() {
557 0 : // Populate secondary by demoting the attached node
558 0 : self.intent.demote_attached(*node_id);
559 0 : modified = true;
560 0 : } else if self.intent.secondary.is_empty() {
561 0 : // Populate secondary by scheduling a fresh node
562 0 : let node_id = scheduler.schedule_shard(&[], context)?;
563 0 : self.intent.push_secondary(scheduler, node_id);
564 0 : modified = true;
565 0 : }
566 0 : while self.intent.secondary.len() > 1 {
567 0 : // We have no particular preference for one secondary location over another: just
568 0 : // arbitrarily drop from the end
569 0 : self.intent.pop_secondary(scheduler);
570 0 : modified = true;
571 0 : }
572 : }
573 : Detached => {
574 : // Never add locations in this mode
575 0 : if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
576 0 : self.intent.clear(scheduler);
577 0 : modified = true;
578 0 : }
579 : }
580 : }
581 :
582 14 : if modified {
583 14 : self.sequence.0 += 1;
584 14 : }
585 :
586 14 : Ok(())
587 16 : }
588 :
589 : /// Optimize attachments: if a shard has a secondary location that is preferable to
590 : /// its primary location based on soft constraints, switch that secondary location
591 : /// to be attached.
592 80 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
593 : pub(crate) fn optimize_attachment(
594 : &self,
595 : nodes: &HashMap<NodeId, Node>,
596 : schedule_context: &ScheduleContext,
597 : ) -> Option<ScheduleOptimization> {
598 : let attached = (*self.intent.get_attached())?;
599 : if self.intent.secondary.is_empty() {
600 : // We can only do useful work if we have both attached and secondary locations: this
601 : // function doesn't schedule new locations, only swaps between attached and secondaries.
602 : return None;
603 : }
604 :
605 : let current_affinity_score = schedule_context.get_node_affinity(attached);
606 : let current_attachment_count = schedule_context.get_node_attachments(attached);
607 :
608 : // Generate score for each node, dropping any un-schedulable nodes.
609 : let all_pageservers = self.intent.all_pageservers();
610 : let mut scores = all_pageservers
611 : .iter()
612 80 : .flat_map(|node_id| {
613 80 : if matches!(
614 80 : nodes
615 80 : .get(node_id)
616 80 : .map(|n| n.may_schedule())
617 80 : .unwrap_or(MaySchedule::No),
618 : MaySchedule::No
619 : ) {
620 0 : None
621 : } else {
622 80 : let affinity_score = schedule_context.get_node_affinity(*node_id);
623 80 : let attachment_count = schedule_context.get_node_attachments(*node_id);
624 80 : Some((*node_id, affinity_score, attachment_count))
625 : }
626 80 : })
627 : .collect::<Vec<_>>();
628 :
629 : // Sort precedence:
630 : // 1st - prefer nodes with the lowest total affinity score
631 : // 2nd - prefer nodes with the lowest number of attachments in this context
632 : // 3rd - if all else is equal, sort by node ID for determinism in tests.
633 80 : scores.sort_by_key(|i| (i.1, i.2, i.0));
634 :
635 : if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
636 : scores.first()
637 : {
638 : if attached != *preferred_node {
639 : // The best alternative must be more than 1 better than us, otherwise we could end
640 : // up flapping back next time we're called (e.g. there's no point migrating from
641 : // a location with score 1 to a score zero, because on next location the situation
642 : // would be the same, but in reverse).
643 : if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
644 : || current_attachment_count > *preferred_attachment_count + 1
645 : {
646 0 : tracing::info!(
647 0 : "Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
648 0 : self.intent.get_secondary()
649 0 : );
650 : return Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
651 : old_attached_node_id: attached,
652 : new_attached_node_id: *preferred_node,
653 : }));
654 : }
655 : } else {
656 0 : tracing::debug!(
657 0 : "Node {} is already preferred (score {:?})",
658 0 : preferred_node,
659 0 : preferred_affinity_score
660 0 : );
661 : }
662 : }
663 :
664 : // Fall-through: we didn't find an optimization
665 : None
666 : }
667 :
668 60 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
669 : pub(crate) fn optimize_secondary(
670 : &self,
671 : scheduler: &Scheduler,
672 : schedule_context: &ScheduleContext,
673 : ) -> Option<ScheduleOptimization> {
674 : if self.intent.secondary.is_empty() {
675 : // We can only do useful work if we have both attached and secondary locations: this
676 : // function doesn't schedule new locations, only swaps between attached and secondaries.
677 : return None;
678 : }
679 :
680 : for secondary in self.intent.get_secondary() {
681 : let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
682 : // We're already on a node unaffected any affinity constraints,
683 : // so we won't change it.
684 : continue;
685 : };
686 :
687 : // Let the scheduler suggest a node, where it would put us if we were scheduling afresh
688 : // This implicitly limits the choice to nodes that are available, and prefers nodes
689 : // with lower utilization.
690 : let Ok(candidate_node) =
691 : scheduler.schedule_shard(&self.intent.all_pageservers(), schedule_context)
692 : else {
693 : // A scheduling error means we have no possible candidate replacements
694 : continue;
695 : };
696 :
697 : let candidate_affinity_score = schedule_context
698 : .nodes
699 : .get(&candidate_node)
700 : .unwrap_or(&AffinityScore::FREE);
701 :
702 : // The best alternative must be more than 1 better than us, otherwise we could end
703 : // up flapping back next time we're called.
704 : if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
705 : // If some other node is available and has a lower score than this node, then
706 : // that other node is a good place to migrate to.
707 0 : tracing::info!(
708 0 : "Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
709 0 : self.intent.get_secondary()
710 0 : );
711 : return Some(ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
712 : old_node_id: *secondary,
713 : new_node_id: candidate_node,
714 : }));
715 : }
716 : }
717 :
718 : None
719 : }
720 :
721 22 : pub(crate) fn apply_optimization(
722 22 : &mut self,
723 22 : scheduler: &mut Scheduler,
724 22 : optimization: ScheduleOptimization,
725 22 : ) {
726 22 : metrics::METRICS_REGISTRY
727 22 : .metrics_group
728 22 : .storage_controller_schedule_optimization
729 22 : .inc();
730 22 :
731 22 : match optimization {
732 : ScheduleOptimization::MigrateAttachment(MigrateAttachment {
733 12 : old_attached_node_id,
734 12 : new_attached_node_id,
735 12 : }) => {
736 12 : self.intent.demote_attached(old_attached_node_id);
737 12 : self.intent
738 12 : .promote_attached(scheduler, new_attached_node_id);
739 12 : }
740 : ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
741 10 : old_node_id,
742 10 : new_node_id,
743 10 : }) => {
744 10 : self.intent.remove_secondary(scheduler, old_node_id);
745 10 : self.intent.push_secondary(scheduler, new_node_id);
746 10 : }
747 : }
748 22 : }
749 :
750 : /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
751 : /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
752 : /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
753 : ///
754 : /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
755 : /// funciton should not be used to decide whether to reconcile.
756 0 : pub(crate) fn stably_attached(&self) -> Option<NodeId> {
757 0 : if let Some(attach_intent) = self.intent.attached {
758 0 : match self.observed.locations.get(&attach_intent) {
759 0 : Some(loc) => match &loc.conf {
760 0 : Some(conf) => match conf.mode {
761 : LocationConfigMode::AttachedMulti
762 : | LocationConfigMode::AttachedSingle
763 : | LocationConfigMode::AttachedStale => {
764 : // Our intent and observed state agree that this node is in an attached state.
765 0 : Some(attach_intent)
766 : }
767 : // Our observed config is not an attached state
768 0 : _ => None,
769 : },
770 : // Our observed state is None, i.e. in flux
771 0 : None => None,
772 : },
773 : // We have no observed state for this node
774 0 : None => None,
775 : }
776 : } else {
777 : // Our intent is not to attach
778 0 : None
779 : }
780 0 : }
781 :
782 0 : fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
783 0 : let mut dirty_nodes = HashSet::new();
784 :
785 0 : if let Some(node_id) = self.intent.attached {
786 : // Maybe panic: it is a severe bug if we try to attach while generation is null.
787 0 : let generation = self
788 0 : .generation
789 0 : .expect("Attempted to enter attached state without a generation");
790 0 :
791 0 : let wanted_conf = attached_location_conf(
792 0 : generation,
793 0 : &self.shard,
794 0 : &self.config,
795 0 : !self.intent.secondary.is_empty(),
796 0 : );
797 0 : match self.observed.locations.get(&node_id) {
798 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
799 0 : Some(_) | None => {
800 0 : dirty_nodes.insert(node_id);
801 0 : }
802 : }
803 0 : }
804 :
805 0 : for node_id in &self.intent.secondary {
806 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
807 0 : match self.observed.locations.get(node_id) {
808 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
809 0 : Some(_) | None => {
810 0 : dirty_nodes.insert(*node_id);
811 0 : }
812 : }
813 : }
814 :
815 0 : for node_id in self.observed.locations.keys() {
816 0 : if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
817 0 : // We have observed state that isn't part of our intent: need to clean it up.
818 0 : dirty_nodes.insert(*node_id);
819 0 : }
820 : }
821 :
822 0 : dirty_nodes.retain(|node_id| {
823 0 : nodes
824 0 : .get(node_id)
825 0 : .map(|n| n.is_available())
826 0 : .unwrap_or(false)
827 0 : });
828 0 :
829 0 : !dirty_nodes.is_empty()
830 0 : }
831 :
832 : #[allow(clippy::too_many_arguments)]
833 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
834 : pub(crate) fn maybe_reconcile(
835 : &mut self,
836 : result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
837 : pageservers: &Arc<HashMap<NodeId, Node>>,
838 : compute_hook: &Arc<ComputeHook>,
839 : service_config: &service::Config,
840 : persistence: &Arc<Persistence>,
841 : gate: &Gate,
842 : cancel: &CancellationToken,
843 : ) -> Option<ReconcilerWaiter> {
844 : // If there are any ambiguous observed states, and the nodes they refer to are available,
845 : // we should reconcile to clean them up.
846 : let mut dirty_observed = false;
847 : for (node_id, observed_loc) in &self.observed.locations {
848 : let node = pageservers
849 : .get(node_id)
850 : .expect("Nodes may not be removed while referenced");
851 : if observed_loc.conf.is_none() && node.is_available() {
852 : dirty_observed = true;
853 : break;
854 : }
855 : }
856 :
857 : let active_nodes_dirty = self.dirty(pageservers);
858 :
859 : // Even if there is no pageserver work to be done, if we have a pending notification to computes,
860 : // wake up a reconciler to send it.
861 : let do_reconcile =
862 : active_nodes_dirty || dirty_observed || self.pending_compute_notification;
863 :
864 : if !do_reconcile {
865 0 : tracing::info!("Not dirty, no reconciliation needed.");
866 : return None;
867 : }
868 :
869 : // If we are currently splitting, then never start a reconciler task: the splitting logic
870 : // requires that shards are not interfered with while it runs. Do this check here rather than
871 : // up top, so that we only log this message if we would otherwise have done a reconciliation.
872 : if !matches!(self.splitting, SplitState::Idle) {
873 0 : tracing::info!("Refusing to reconcile, splitting in progress");
874 : return None;
875 : }
876 :
877 : // Reconcile already in flight for the current sequence?
878 : if let Some(handle) = &self.reconciler {
879 : if handle.sequence == self.sequence {
880 0 : tracing::info!(
881 0 : "Reconciliation already in progress for sequence {:?}",
882 0 : self.sequence,
883 0 : );
884 : return Some(ReconcilerWaiter {
885 : tenant_shard_id: self.tenant_shard_id,
886 : seq_wait: self.waiter.clone(),
887 : error_seq_wait: self.error_waiter.clone(),
888 : error: self.last_error.clone(),
889 : seq: self.sequence,
890 : });
891 : }
892 : }
893 :
894 : // Pre-checks done: finally check whether we may actually do the work
895 : match self.scheduling_policy {
896 : ShardSchedulingPolicy::Active
897 : | ShardSchedulingPolicy::Essential
898 : | ShardSchedulingPolicy::Pause => {}
899 : ShardSchedulingPolicy::Stop => {
900 : // We only reach this point if there is work to do and we're going to skip
901 : // doing it: warn it obvious why this tenant isn't doing what it ought to.
902 0 : tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
903 : return None;
904 : }
905 : }
906 :
907 : // Build list of nodes from which the reconciler should detach
908 : let mut detach = Vec::new();
909 : for node_id in self.observed.locations.keys() {
910 : if self.intent.get_attached() != &Some(*node_id)
911 : && !self.intent.secondary.contains(node_id)
912 : {
913 : detach.push(
914 : pageservers
915 : .get(node_id)
916 : .expect("Intent references non-existent pageserver")
917 : .clone(),
918 : )
919 : }
920 : }
921 :
922 : // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
923 : // doing our sequence's work.
924 : let old_handle = self.reconciler.take();
925 :
926 : let Ok(gate_guard) = gate.enter() else {
927 : // Shutting down, don't start a reconciler
928 : return None;
929 : };
930 :
931 : // Advance the sequence before spawning a reconciler, so that sequence waiters
932 : // can distinguish between before+after the reconcile completes.
933 : self.sequence = self.sequence.next();
934 :
935 : let reconciler_cancel = cancel.child_token();
936 : let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
937 : let mut reconciler = Reconciler {
938 : tenant_shard_id: self.tenant_shard_id,
939 : shard: self.shard,
940 : generation: self.generation,
941 : intent: reconciler_intent,
942 : detach,
943 : config: self.config.clone(),
944 : observed: self.observed.clone(),
945 : compute_hook: compute_hook.clone(),
946 : service_config: service_config.clone(),
947 : _gate_guard: gate_guard,
948 : cancel: reconciler_cancel.clone(),
949 : persistence: persistence.clone(),
950 : compute_notify_failure: false,
951 : };
952 :
953 : let reconcile_seq = self.sequence;
954 :
955 0 : tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
956 : let must_notify = self.pending_compute_notification;
957 : let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
958 : tenant_id=%reconciler.tenant_shard_id.tenant_id,
959 : shard_id=%reconciler.tenant_shard_id.shard_slug());
960 : metrics::METRICS_REGISTRY
961 : .metrics_group
962 : .storage_controller_reconcile_spawn
963 : .inc();
964 : let result_tx = result_tx.clone();
965 : let join_handle = tokio::task::spawn(
966 0 : async move {
967 : // Wait for any previous reconcile task to complete before we start
968 0 : if let Some(old_handle) = old_handle {
969 0 : old_handle.cancel.cancel();
970 0 : if let Err(e) = old_handle.handle.await {
971 : // We can't do much with this other than log it: the task is done, so
972 : // we may proceed with our work.
973 0 : tracing::error!("Unexpected join error waiting for reconcile task: {e}");
974 0 : }
975 0 : }
976 :
977 : // Early check for cancellation before doing any work
978 : // TODO: wrap all remote API operations in cancellation check
979 : // as well.
980 0 : if reconciler.cancel.is_cancelled() {
981 0 : metrics::METRICS_REGISTRY
982 0 : .metrics_group
983 0 : .storage_controller_reconcile_complete
984 0 : .inc(ReconcileCompleteLabelGroup {
985 0 : status: ReconcileOutcome::Cancel,
986 0 : });
987 0 : return;
988 0 : }
989 :
990 : // Attempt to make observed state match intent state
991 0 : let result = reconciler.reconcile().await;
992 :
993 : // If we know we had a pending compute notification from some previous action, send a notification irrespective
994 : // of whether the above reconcile() did any work
995 0 : if result.is_ok() && must_notify {
996 : // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
997 0 : reconciler.compute_notify().await.ok();
998 0 : }
999 :
1000 : // Update result counter
1001 0 : let outcome_label = match &result {
1002 0 : Ok(_) => ReconcileOutcome::Success,
1003 0 : Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
1004 0 : Err(_) => ReconcileOutcome::Error,
1005 : };
1006 :
1007 0 : metrics::METRICS_REGISTRY
1008 0 : .metrics_group
1009 0 : .storage_controller_reconcile_complete
1010 0 : .inc(ReconcileCompleteLabelGroup {
1011 0 : status: outcome_label,
1012 0 : });
1013 0 :
1014 0 : result_tx
1015 0 : .send(ReconcileResult {
1016 0 : sequence: reconcile_seq,
1017 0 : result,
1018 0 : tenant_shard_id: reconciler.tenant_shard_id,
1019 0 : generation: reconciler.generation,
1020 0 : observed: reconciler.observed,
1021 0 : pending_compute_notification: reconciler.compute_notify_failure,
1022 0 : })
1023 0 : .ok();
1024 0 : }
1025 : .instrument(reconciler_span),
1026 : );
1027 :
1028 : self.reconciler = Some(ReconcilerHandle {
1029 : sequence: self.sequence,
1030 : handle: join_handle,
1031 : cancel: reconciler_cancel,
1032 : });
1033 :
1034 : Some(ReconcilerWaiter {
1035 : tenant_shard_id: self.tenant_shard_id,
1036 : seq_wait: self.waiter.clone(),
1037 : error_seq_wait: self.error_waiter.clone(),
1038 : error: self.last_error.clone(),
1039 : seq: self.sequence,
1040 : })
1041 : }
1042 :
1043 : /// Get a waiter for any reconciliation in flight, but do not start reconciliation
1044 : /// if it is not already running
1045 0 : pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
1046 0 : if self.reconciler.is_some() {
1047 0 : Some(ReconcilerWaiter {
1048 0 : tenant_shard_id: self.tenant_shard_id,
1049 0 : seq_wait: self.waiter.clone(),
1050 0 : error_seq_wait: self.error_waiter.clone(),
1051 0 : error: self.last_error.clone(),
1052 0 : seq: self.sequence,
1053 0 : })
1054 : } else {
1055 0 : None
1056 : }
1057 0 : }
1058 :
1059 : /// Called when a ReconcileResult has been emitted and the service is updating
1060 : /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
1061 : /// the handle to indicate there is no longer a reconciliation in progress.
1062 0 : pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
1063 0 : if let Some(reconcile_handle) = &self.reconciler {
1064 0 : if reconcile_handle.sequence <= sequence {
1065 0 : self.reconciler = None;
1066 0 : }
1067 0 : }
1068 0 : }
1069 :
1070 : // If we had any state at all referring to this node ID, drop it. Does not
1071 : // attempt to reschedule.
1072 0 : pub(crate) fn deref_node(&mut self, node_id: NodeId) {
1073 0 : if self.intent.attached == Some(node_id) {
1074 0 : self.intent.attached = None;
1075 0 : }
1076 :
1077 0 : self.intent.secondary.retain(|n| n != &node_id);
1078 0 :
1079 0 : self.observed.locations.remove(&node_id);
1080 0 :
1081 0 : debug_assert!(!self.intent.all_pageservers().contains(&node_id));
1082 0 : }
1083 :
1084 0 : pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
1085 0 : self.scheduling_policy = p;
1086 0 : }
1087 :
1088 0 : pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
1089 0 : &self.scheduling_policy
1090 0 : }
1091 :
1092 0 : pub(crate) fn from_persistent(
1093 0 : tsp: TenantShardPersistence,
1094 0 : intent: IntentState,
1095 0 : ) -> anyhow::Result<Self> {
1096 0 : let tenant_shard_id = tsp.get_tenant_shard_id()?;
1097 0 : let shard_identity = tsp.get_shard_identity()?;
1098 :
1099 0 : Ok(Self {
1100 0 : tenant_shard_id,
1101 0 : shard: shard_identity,
1102 0 : sequence: Sequence::initial(),
1103 0 : generation: tsp.generation.map(|g| Generation::new(g as u32)),
1104 0 : policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
1105 0 : intent,
1106 0 : observed: ObservedState::new(),
1107 0 : config: serde_json::from_str(&tsp.config).unwrap(),
1108 0 : reconciler: None,
1109 0 : splitting: tsp.splitting,
1110 0 : waiter: Arc::new(SeqWait::new(Sequence::initial())),
1111 0 : error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
1112 0 : last_error: Arc::default(),
1113 0 : pending_compute_notification: false,
1114 0 : scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
1115 0 : })
1116 0 : }
1117 :
1118 0 : pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
1119 0 : TenantShardPersistence {
1120 0 : tenant_id: self.tenant_shard_id.tenant_id.to_string(),
1121 0 : shard_number: self.tenant_shard_id.shard_number.0 as i32,
1122 0 : shard_count: self.tenant_shard_id.shard_count.literal() as i32,
1123 0 : shard_stripe_size: self.shard.stripe_size.0 as i32,
1124 0 : generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
1125 0 : generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
1126 0 : placement_policy: serde_json::to_string(&self.policy).unwrap(),
1127 0 : config: serde_json::to_string(&self.config).unwrap(),
1128 0 : splitting: SplitState::default(),
1129 0 : scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
1130 0 : }
1131 0 : }
1132 : }
1133 :
1134 : #[cfg(test)]
1135 : pub(crate) mod tests {
1136 : use pageserver_api::{
1137 : controller_api::NodeAvailability,
1138 : shard::{ShardCount, ShardNumber},
1139 : };
1140 : use utils::id::TenantId;
1141 :
1142 : use crate::scheduler::test_utils::make_test_nodes;
1143 :
1144 : use super::*;
1145 :
1146 14 : fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
1147 14 : let tenant_id = TenantId::generate();
1148 14 : let shard_number = ShardNumber(0);
1149 14 : let shard_count = ShardCount::new(1);
1150 14 :
1151 14 : let tenant_shard_id = TenantShardId {
1152 14 : tenant_id,
1153 14 : shard_number,
1154 14 : shard_count,
1155 14 : };
1156 14 : TenantShard::new(
1157 14 : tenant_shard_id,
1158 14 : ShardIdentity::new(
1159 14 : shard_number,
1160 14 : shard_count,
1161 14 : pageserver_api::shard::ShardStripeSize(32768),
1162 14 : )
1163 14 : .unwrap(),
1164 14 : policy,
1165 14 : )
1166 14 : }
1167 :
1168 2 : fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec<TenantShard> {
1169 2 : let tenant_id = TenantId::generate();
1170 2 :
1171 2 : (0..shard_count.count())
1172 8 : .map(|i| {
1173 8 : let shard_number = ShardNumber(i);
1174 8 :
1175 8 : let tenant_shard_id = TenantShardId {
1176 8 : tenant_id,
1177 8 : shard_number,
1178 8 : shard_count,
1179 8 : };
1180 8 : TenantShard::new(
1181 8 : tenant_shard_id,
1182 8 : ShardIdentity::new(
1183 8 : shard_number,
1184 8 : shard_count,
1185 8 : pageserver_api::shard::ShardStripeSize(32768),
1186 8 : )
1187 8 : .unwrap(),
1188 8 : policy.clone(),
1189 8 : )
1190 8 : })
1191 2 : .collect()
1192 2 : }
1193 :
1194 : /// Test the scheduling behaviors used when a tenant configured for HA is subject
1195 : /// to nodes being marked offline.
1196 : #[test]
1197 2 : fn tenant_ha_scheduling() -> anyhow::Result<()> {
1198 2 : // Start with three nodes. Our tenant will only use two. The third one is
1199 2 : // expected to remain unused.
1200 2 : let mut nodes = make_test_nodes(3);
1201 2 :
1202 2 : let mut scheduler = Scheduler::new(nodes.values());
1203 2 : let mut context = ScheduleContext::default();
1204 2 :
1205 2 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1206 2 : tenant_shard
1207 2 : .schedule(&mut scheduler, &mut context)
1208 2 : .expect("we have enough nodes, scheduling should work");
1209 2 :
1210 2 : // Expect to initially be schedule on to different nodes
1211 2 : assert_eq!(tenant_shard.intent.secondary.len(), 1);
1212 2 : assert!(tenant_shard.intent.attached.is_some());
1213 :
1214 2 : let attached_node_id = tenant_shard.intent.attached.unwrap();
1215 2 : let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
1216 2 : assert_ne!(attached_node_id, secondary_node_id);
1217 :
1218 : // Notifying the attached node is offline should demote it to a secondary
1219 2 : let changed = tenant_shard.intent.demote_attached(attached_node_id);
1220 2 : assert!(changed);
1221 2 : assert!(tenant_shard.intent.attached.is_none());
1222 2 : assert_eq!(tenant_shard.intent.secondary.len(), 2);
1223 :
1224 : // Update the scheduler state to indicate the node is offline
1225 2 : nodes
1226 2 : .get_mut(&attached_node_id)
1227 2 : .unwrap()
1228 2 : .set_availability(NodeAvailability::Offline);
1229 2 : scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
1230 2 :
1231 2 : // Scheduling the node should promote the still-available secondary node to attached
1232 2 : tenant_shard
1233 2 : .schedule(&mut scheduler, &mut context)
1234 2 : .expect("active nodes are available");
1235 2 : assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
1236 :
1237 : // The original attached node should have been retained as a secondary
1238 2 : assert_eq!(
1239 2 : *tenant_shard.intent.secondary.iter().last().unwrap(),
1240 2 : attached_node_id
1241 2 : );
1242 :
1243 2 : tenant_shard.intent.clear(&mut scheduler);
1244 2 :
1245 2 : Ok(())
1246 2 : }
1247 :
1248 : #[test]
1249 2 : fn intent_from_observed() -> anyhow::Result<()> {
1250 2 : let nodes = make_test_nodes(3);
1251 2 : let mut scheduler = Scheduler::new(nodes.values());
1252 2 :
1253 2 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1254 2 :
1255 2 : tenant_shard.observed.locations.insert(
1256 2 : NodeId(3),
1257 2 : ObservedStateLocation {
1258 2 : conf: Some(LocationConfig {
1259 2 : mode: LocationConfigMode::AttachedMulti,
1260 2 : generation: Some(2),
1261 2 : secondary_conf: None,
1262 2 : shard_number: tenant_shard.shard.number.0,
1263 2 : shard_count: tenant_shard.shard.count.literal(),
1264 2 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1265 2 : tenant_conf: TenantConfig::default(),
1266 2 : }),
1267 2 : },
1268 2 : );
1269 2 :
1270 2 : tenant_shard.observed.locations.insert(
1271 2 : NodeId(2),
1272 2 : ObservedStateLocation {
1273 2 : conf: Some(LocationConfig {
1274 2 : mode: LocationConfigMode::AttachedStale,
1275 2 : generation: Some(1),
1276 2 : secondary_conf: None,
1277 2 : shard_number: tenant_shard.shard.number.0,
1278 2 : shard_count: tenant_shard.shard.count.literal(),
1279 2 : shard_stripe_size: tenant_shard.shard.stripe_size.0,
1280 2 : tenant_conf: TenantConfig::default(),
1281 2 : }),
1282 2 : },
1283 2 : );
1284 2 :
1285 2 : tenant_shard.intent_from_observed(&mut scheduler);
1286 2 :
1287 2 : // The highest generationed attached location gets used as attached
1288 2 : assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
1289 : // Other locations get used as secondary
1290 2 : assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
1291 :
1292 2 : scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
1293 :
1294 2 : tenant_shard.intent.clear(&mut scheduler);
1295 2 : Ok(())
1296 2 : }
1297 :
1298 : #[test]
1299 2 : fn scheduling_mode() -> anyhow::Result<()> {
1300 2 : let nodes = make_test_nodes(3);
1301 2 : let mut scheduler = Scheduler::new(nodes.values());
1302 2 :
1303 2 : let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
1304 2 :
1305 2 : // In pause mode, schedule() shouldn't do anything
1306 2 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
1307 2 : assert!(tenant_shard
1308 2 : .schedule(&mut scheduler, &mut ScheduleContext::default())
1309 2 : .is_ok());
1310 2 : assert!(tenant_shard.intent.all_pageservers().is_empty());
1311 :
1312 : // In active mode, schedule() works
1313 2 : tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
1314 2 : assert!(tenant_shard
1315 2 : .schedule(&mut scheduler, &mut ScheduleContext::default())
1316 2 : .is_ok());
1317 2 : assert!(!tenant_shard.intent.all_pageservers().is_empty());
1318 :
1319 2 : tenant_shard.intent.clear(&mut scheduler);
1320 2 : Ok(())
1321 2 : }
1322 :
1323 : #[test]
1324 2 : fn optimize_attachment() -> anyhow::Result<()> {
1325 2 : let nodes = make_test_nodes(3);
1326 2 : let mut scheduler = Scheduler::new(nodes.values());
1327 2 :
1328 2 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
1329 2 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
1330 2 :
1331 2 : // Initially: both nodes attached on shard 1, and both have secondary locations
1332 2 : // on different nodes.
1333 2 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1334 2 : shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
1335 2 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1336 2 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
1337 2 :
1338 2 : let mut schedule_context = ScheduleContext::default();
1339 2 : schedule_context.avoid(&shard_a.intent.all_pageservers());
1340 2 : schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
1341 2 : schedule_context.avoid(&shard_b.intent.all_pageservers());
1342 2 : schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
1343 2 :
1344 2 : let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
1345 2 :
1346 2 : // Either shard should recognize that it has the option to switch to a secondary location where there
1347 2 : // would be no other shards from the same tenant, and request to do so.
1348 2 : assert_eq!(
1349 2 : optimization_a,
1350 2 : Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
1351 2 : old_attached_node_id: NodeId(1),
1352 2 : new_attached_node_id: NodeId(2)
1353 2 : }))
1354 2 : );
1355 :
1356 : // Note that these optimizing two shards in the same tenant with the same ScheduleContext is
1357 : // mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
1358 : // of [`Service::optimize_all`] to avoid trying
1359 : // to do optimizations for multiple shards in the same tenant at the same time. Generating
1360 : // both optimizations is just done for test purposes
1361 2 : let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
1362 2 : assert_eq!(
1363 2 : optimization_b,
1364 2 : Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
1365 2 : old_attached_node_id: NodeId(1),
1366 2 : new_attached_node_id: NodeId(3)
1367 2 : }))
1368 2 : );
1369 :
1370 : // Applying these optimizations should result in the end state proposed
1371 2 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
1372 2 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
1373 2 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
1374 2 : shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
1375 2 : assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
1376 2 : assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
1377 :
1378 2 : shard_a.intent.clear(&mut scheduler);
1379 2 : shard_b.intent.clear(&mut scheduler);
1380 2 :
1381 2 : Ok(())
1382 2 : }
1383 :
1384 : #[test]
1385 2 : fn optimize_secondary() -> anyhow::Result<()> {
1386 2 : let nodes = make_test_nodes(4);
1387 2 : let mut scheduler = Scheduler::new(nodes.values());
1388 2 :
1389 2 : let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
1390 2 : let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
1391 2 :
1392 2 : // Initially: both nodes attached on shard 1, and both have secondary locations
1393 2 : // on different nodes.
1394 2 : shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
1395 2 : shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
1396 2 : shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
1397 2 : shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
1398 2 :
1399 2 : let mut schedule_context = ScheduleContext::default();
1400 2 : schedule_context.avoid(&shard_a.intent.all_pageservers());
1401 2 : schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
1402 2 : schedule_context.avoid(&shard_b.intent.all_pageservers());
1403 2 : schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
1404 2 :
1405 2 : let optimization_a = shard_a.optimize_secondary(&scheduler, &schedule_context);
1406 2 :
1407 2 : // Since there is a node with no locations available, the node with two locations for the
1408 2 : // same tenant should generate an optimization to move one away
1409 2 : assert_eq!(
1410 2 : optimization_a,
1411 2 : Some(ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
1412 2 : old_node_id: NodeId(3),
1413 2 : new_node_id: NodeId(4)
1414 2 : }))
1415 2 : );
1416 :
1417 2 : shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
1418 2 : assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
1419 2 : assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
1420 :
1421 2 : shard_a.intent.clear(&mut scheduler);
1422 2 : shard_b.intent.clear(&mut scheduler);
1423 2 :
1424 2 : Ok(())
1425 2 : }
1426 :
1427 : // Optimize til quiescent: this emulates what Service::optimize_all does, when
1428 : // called repeatedly in the background.
1429 2 : fn optimize_til_idle(
1430 2 : nodes: &HashMap<NodeId, Node>,
1431 2 : scheduler: &mut Scheduler,
1432 2 : shards: &mut [TenantShard],
1433 2 : ) {
1434 2 : let mut loop_n = 0;
1435 : loop {
1436 18 : let mut schedule_context = ScheduleContext::default();
1437 18 : let mut any_changed = false;
1438 :
1439 72 : for shard in shards.iter() {
1440 72 : schedule_context.avoid(&shard.intent.all_pageservers());
1441 72 : if let Some(attached) = shard.intent.get_attached() {
1442 72 : schedule_context.push_attached(*attached);
1443 72 : }
1444 : }
1445 :
1446 36 : for shard in shards.iter_mut() {
1447 36 : let optimization = shard.optimize_attachment(nodes, &schedule_context);
1448 36 : if let Some(optimization) = optimization {
1449 8 : shard.apply_optimization(scheduler, optimization);
1450 8 : any_changed = true;
1451 8 : break;
1452 28 : }
1453 28 :
1454 28 : let optimization = shard.optimize_secondary(scheduler, &schedule_context);
1455 28 : if let Some(optimization) = optimization {
1456 8 : shard.apply_optimization(scheduler, optimization);
1457 8 : any_changed = true;
1458 8 : break;
1459 20 : }
1460 : }
1461 :
1462 18 : if !any_changed {
1463 2 : break;
1464 16 : }
1465 16 :
1466 16 : // Assert no infinite loop
1467 16 : loop_n += 1;
1468 16 : assert!(loop_n < 1000);
1469 : }
1470 2 : }
1471 :
1472 : /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
1473 : /// that it converges.
1474 : #[test]
1475 2 : fn optimize_add_nodes() -> anyhow::Result<()> {
1476 2 : let nodes = make_test_nodes(4);
1477 2 :
1478 2 : // Only show the scheduler a couple of nodes
1479 2 : let mut scheduler = Scheduler::new([].iter());
1480 2 : scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
1481 2 : scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
1482 2 :
1483 2 : let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
1484 2 : let mut schedule_context = ScheduleContext::default();
1485 10 : for shard in &mut shards {
1486 8 : assert!(shard
1487 8 : .schedule(&mut scheduler, &mut schedule_context)
1488 8 : .is_ok());
1489 : }
1490 :
1491 : // We should see equal number of locations on the two nodes.
1492 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
1493 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
1494 :
1495 : // Add another two nodes: we should see the shards spread out when their optimize
1496 : // methods are called
1497 2 : scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
1498 2 : scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
1499 2 : optimize_til_idle(&nodes, &mut scheduler, &mut shards);
1500 2 :
1501 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
1502 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
1503 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
1504 2 : assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
1505 :
1506 8 : for shard in shards.iter_mut() {
1507 8 : shard.intent.clear(&mut scheduler);
1508 8 : }
1509 :
1510 2 : Ok(())
1511 2 : }
1512 : }
|