Line data Source code
1 : use std::{collections::HashMap, sync::Arc, time::Duration};
2 :
3 : use crate::{metrics, persistence::TenantShardPersistence};
4 : use pageserver_api::controller_api::NodeAvailability;
5 : use pageserver_api::{
6 : models::{LocationConfig, LocationConfigMode, TenantConfig},
7 : shard::{ShardIdentity, TenantShardId},
8 : };
9 : use serde::Serialize;
10 : use tokio::task::JoinHandle;
11 : use tokio_util::sync::CancellationToken;
12 : use tracing::{instrument, Instrument};
13 : use utils::{
14 : generation::Generation,
15 : id::NodeId,
16 : seqwait::{SeqWait, SeqWaitError},
17 : sync::gate::Gate,
18 : };
19 :
20 : use crate::{
21 : compute_hook::ComputeHook,
22 : node::Node,
23 : persistence::{split_state::SplitState, Persistence},
24 : reconciler::{
25 : attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
26 : },
27 : scheduler::{ScheduleError, Scheduler},
28 : service, PlacementPolicy, Sequence,
29 : };
30 :
31 : /// Serialization helper
32 0 : fn read_mutex_content<S, T>(v: &std::sync::Mutex<T>, serializer: S) -> Result<S::Ok, S::Error>
33 0 : where
34 0 : S: serde::ser::Serializer,
35 0 : T: Clone + std::fmt::Display,
36 0 : {
37 0 : serializer.collect_str(&v.lock().unwrap())
38 0 : }
39 :
40 : /// In-memory state for a particular tenant shard.
41 : ///
42 : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
43 : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
44 0 : #[derive(Serialize)]
45 : pub(crate) struct TenantState {
46 : pub(crate) tenant_shard_id: TenantShardId,
47 :
48 : pub(crate) shard: ShardIdentity,
49 :
50 : // Runtime only: sequence used to coordinate when updating this object while
51 : // with background reconcilers may be running. A reconciler runs to a particular
52 : // sequence.
53 : pub(crate) sequence: Sequence,
54 :
55 : // Latest generation number: next time we attach, increment this
56 : // and use the incremented number when attaching
57 : pub(crate) generation: Generation,
58 :
59 : // High level description of how the tenant should be set up. Provided
60 : // externally.
61 : pub(crate) policy: PlacementPolicy,
62 :
63 : // Low level description of exactly which pageservers should fulfil
64 : // which role. Generated by `Self::schedule`.
65 : pub(crate) intent: IntentState,
66 :
67 : // Low level description of how the tenant is configured on pageservers:
68 : // if this does not match `Self::intent` then the tenant needs reconciliation
69 : // with `Self::reconcile`.
70 : pub(crate) observed: ObservedState,
71 :
72 : // Tenant configuration, passed through opaquely to the pageserver. Identical
73 : // for all shards in a tenant.
74 : pub(crate) config: TenantConfig,
75 :
76 : /// If a reconcile task is currently in flight, it may be joined here (it is
77 : /// only safe to join if either the result has been received or the reconciler's
78 : /// cancellation token has been fired)
79 : #[serde(skip)]
80 : pub(crate) reconciler: Option<ReconcilerHandle>,
81 :
82 : /// If a tenant is being split, then all shards with that TenantId will have a
83 : /// SplitState set, this acts as a guard against other operations such as background
84 : /// reconciliation, and timeline creation.
85 : pub(crate) splitting: SplitState,
86 :
87 : /// Optionally wait for reconciliation to complete up to a particular
88 : /// sequence number.
89 : #[serde(skip)]
90 : pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
91 :
92 : /// Indicates sequence number for which we have encountered an error reconciling. If
93 : /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
94 : /// and callers should stop waiting for `waiter` and propagate the error.
95 : #[serde(skip)]
96 : pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
97 :
98 : /// The most recent error from a reconcile on this tenant
99 : /// TODO: generalize to an array of recent events
100 : /// TOOD: use a ArcSwap instead of mutex for faster reads?
101 : #[serde(serialize_with = "read_mutex_content")]
102 : pub(crate) last_error: std::sync::Arc<std::sync::Mutex<String>>,
103 :
104 : /// If we have a pending compute notification that for some reason we weren't able to send,
105 : /// set this to true. If this is set, calls to [`Self::maybe_reconcile`] will run a task to retry
106 : /// sending it. This is the mechanism by which compute notifications are included in the scope
107 : /// of state that we publish externally in an eventually consistent way.
108 : pub(crate) pending_compute_notification: bool,
109 : }
110 :
111 2 : #[derive(Default, Clone, Debug, Serialize)]
112 : pub(crate) struct IntentState {
113 : attached: Option<NodeId>,
114 : secondary: Vec<NodeId>,
115 : }
116 :
117 : impl IntentState {
118 4 : pub(crate) fn new() -> Self {
119 4 : Self {
120 4 : attached: None,
121 4 : secondary: vec![],
122 4 : }
123 4 : }
124 0 : pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
125 0 : if let Some(node_id) = node_id {
126 0 : scheduler.node_inc_ref(node_id);
127 0 : }
128 0 : Self {
129 0 : attached: node_id,
130 0 : secondary: vec![],
131 0 : }
132 0 : }
133 :
134 6 : pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
135 6 : if self.attached != new_attached {
136 6 : if let Some(old_attached) = self.attached.take() {
137 0 : scheduler.node_dec_ref(old_attached);
138 6 : }
139 6 : if let Some(new_attached) = &new_attached {
140 6 : scheduler.node_inc_ref(*new_attached);
141 6 : }
142 6 : self.attached = new_attached;
143 0 : }
144 6 : }
145 :
146 : /// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
147 : /// secondary to attached while maintaining the scheduler's reference counts.
148 2 : pub(crate) fn promote_attached(
149 2 : &mut self,
150 2 : _scheduler: &mut Scheduler,
151 2 : promote_secondary: NodeId,
152 2 : ) {
153 : // If we call this with a node that isn't in secondary, it would cause incorrect
154 : // scheduler reference counting, since we assume the node is already referenced as a secondary.
155 2 : debug_assert!(self.secondary.contains(&promote_secondary));
156 :
157 : // TODO: when scheduler starts tracking attached + secondary counts separately, we will
158 : // need to call into it here.
159 4 : self.secondary.retain(|n| n != &promote_secondary);
160 2 : self.attached = Some(promote_secondary);
161 2 : }
162 :
163 4 : pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
164 4 : debug_assert!(!self.secondary.contains(&new_secondary));
165 4 : scheduler.node_inc_ref(new_secondary);
166 4 : self.secondary.push(new_secondary);
167 4 : }
168 :
169 : /// It is legal to call this with a node that is not currently a secondary: that is a no-op
170 0 : pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
171 0 : let index = self.secondary.iter().position(|n| *n == node_id);
172 0 : if let Some(index) = index {
173 0 : scheduler.node_dec_ref(node_id);
174 0 : self.secondary.remove(index);
175 0 : }
176 0 : }
177 :
178 4 : pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
179 4 : for secondary in self.secondary.drain(..) {
180 4 : scheduler.node_dec_ref(secondary);
181 4 : }
182 4 : }
183 :
184 4 : pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
185 4 : if let Some(old_attached) = self.attached.take() {
186 4 : scheduler.node_dec_ref(old_attached);
187 4 : }
188 :
189 4 : self.clear_secondary(scheduler);
190 4 : }
191 :
192 2 : pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
193 2 : let mut result = Vec::new();
194 2 : if let Some(p) = self.attached {
195 2 : result.push(p)
196 0 : }
197 :
198 2 : result.extend(self.secondary.iter().copied());
199 2 :
200 2 : result
201 2 : }
202 :
203 0 : pub(crate) fn get_attached(&self) -> &Option<NodeId> {
204 0 : &self.attached
205 0 : }
206 :
207 0 : pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
208 0 : &self.secondary
209 0 : }
210 :
211 : /// When a node goes offline, we update intents to avoid using it
212 : /// as their attached pageserver.
213 : ///
214 : /// Returns true if a change was made
215 2 : pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool {
216 2 : if self.attached == Some(node_id) {
217 : // TODO: when scheduler starts tracking attached + secondary counts separately, we will
218 : // need to call into it here.
219 2 : self.attached = None;
220 2 : self.secondary.push(node_id);
221 2 : true
222 : } else {
223 0 : false
224 : }
225 2 : }
226 : }
227 :
228 : impl Drop for IntentState {
229 6 : fn drop(&mut self) {
230 : // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler
231 6 : debug_assert!(self.attached.is_none() && self.secondary.is_empty());
232 4 : }
233 : }
234 :
235 2 : #[derive(Default, Clone, Serialize)]
236 : pub(crate) struct ObservedState {
237 : pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
238 : }
239 :
240 : /// Our latest knowledge of how this tenant is configured in the outside world.
241 : ///
242 : /// Meaning:
243 : /// * No instance of this type exists for a node: we are certain that we have nothing configured on that
244 : /// node for this shard.
245 : /// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
246 : /// what it is (e.g. we failed partway through configuring it)
247 : /// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
248 : /// and that configuration will still be present unless something external interfered.
249 0 : #[derive(Clone, Serialize)]
250 : pub(crate) struct ObservedStateLocation {
251 : /// If None, it means we do not know the status of this shard's location on this node, but
252 : /// we know that we might have some state on this node.
253 : pub(crate) conf: Option<LocationConfig>,
254 : }
255 : pub(crate) struct ReconcilerWaiter {
256 : // For observability purposes, remember the ID of the shard we're
257 : // waiting for.
258 : pub(crate) tenant_shard_id: TenantShardId,
259 :
260 : seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
261 : error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
262 : error: std::sync::Arc<std::sync::Mutex<String>>,
263 : seq: Sequence,
264 : }
265 :
266 0 : #[derive(thiserror::Error, Debug)]
267 : pub enum ReconcileWaitError {
268 : #[error("Timeout waiting for shard {0}")]
269 : Timeout(TenantShardId),
270 : #[error("shutting down")]
271 : Shutdown,
272 : #[error("Reconcile error on shard {0}: {1}")]
273 : Failed(TenantShardId, String),
274 : }
275 :
276 : impl ReconcilerWaiter {
277 0 : pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
278 0 : tokio::select! {
279 0 : result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
280 0 : result.map_err(|e| match e {
281 0 : SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
282 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
283 0 : })?;
284 : },
285 0 : result = self.error_seq_wait.wait_for(self.seq) => {
286 0 : result.map_err(|e| match e {
287 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
288 0 : SeqWaitError::Timeout => unreachable!()
289 0 : })?;
290 :
291 : return Err(ReconcileWaitError::Failed(self.tenant_shard_id, self.error.lock().unwrap().clone()))
292 : }
293 : }
294 :
295 0 : Ok(())
296 0 : }
297 : }
298 :
299 : /// Having spawned a reconciler task, the tenant shard's state will carry enough
300 : /// information to optionally cancel & await it later.
301 : pub(crate) struct ReconcilerHandle {
302 : sequence: Sequence,
303 : handle: JoinHandle<()>,
304 : cancel: CancellationToken,
305 : }
306 :
307 : /// When a reconcile task completes, it sends this result object
308 : /// to be applied to the primary TenantState.
309 : pub(crate) struct ReconcileResult {
310 : pub(crate) sequence: Sequence,
311 : /// On errors, `observed` should be treated as an incompleted description
312 : /// of state (i.e. any nodes present in the result should override nodes
313 : /// present in the parent tenant state, but any unmentioned nodes should
314 : /// not be removed from parent tenant state)
315 : pub(crate) result: Result<(), ReconcileError>,
316 :
317 : pub(crate) tenant_shard_id: TenantShardId,
318 : pub(crate) generation: Generation,
319 : pub(crate) observed: ObservedState,
320 :
321 : /// Set [`TenantState::pending_compute_notification`] from this flag
322 : pub(crate) pending_compute_notification: bool,
323 : }
324 :
325 : impl ObservedState {
326 0 : pub(crate) fn new() -> Self {
327 0 : Self {
328 0 : locations: HashMap::new(),
329 0 : }
330 0 : }
331 : }
332 :
333 : impl TenantState {
334 2 : pub(crate) fn new(
335 2 : tenant_shard_id: TenantShardId,
336 2 : shard: ShardIdentity,
337 2 : policy: PlacementPolicy,
338 2 : ) -> Self {
339 2 : Self {
340 2 : tenant_shard_id,
341 2 : policy,
342 2 : intent: IntentState::default(),
343 2 : generation: Generation::new(0),
344 2 : shard,
345 2 : observed: ObservedState::default(),
346 2 : config: TenantConfig::default(),
347 2 : reconciler: None,
348 2 : splitting: SplitState::Idle,
349 2 : sequence: Sequence(1),
350 2 : waiter: Arc::new(SeqWait::new(Sequence(0))),
351 2 : error_waiter: Arc::new(SeqWait::new(Sequence(0))),
352 2 : last_error: Arc::default(),
353 2 : pending_compute_notification: false,
354 2 : }
355 2 : }
356 :
357 : /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
358 : /// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
359 : /// to get an intent state that complies with placement policy. The overall goal is to do scheduling
360 : /// in a way that makes use of any configured locations that already exist in the outside world.
361 0 : pub(crate) fn intent_from_observed(&mut self) {
362 0 : // Choose an attached location by filtering observed locations, and then sorting to get the highest
363 0 : // generation
364 0 : let mut attached_locs = self
365 0 : .observed
366 0 : .locations
367 0 : .iter()
368 0 : .filter_map(|(node_id, l)| {
369 0 : if let Some(conf) = &l.conf {
370 0 : if conf.mode == LocationConfigMode::AttachedMulti
371 0 : || conf.mode == LocationConfigMode::AttachedSingle
372 0 : || conf.mode == LocationConfigMode::AttachedStale
373 : {
374 0 : Some((node_id, conf.generation))
375 : } else {
376 0 : None
377 : }
378 : } else {
379 0 : None
380 : }
381 0 : })
382 0 : .collect::<Vec<_>>();
383 0 :
384 0 : attached_locs.sort_by_key(|i| i.1);
385 0 : if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
386 0 : self.intent.attached = Some(*node_id);
387 0 : }
388 :
389 : // All remaining observed locations generate secondary intents. This includes None
390 : // observations, as these may well have some local content on disk that is usable (this
391 : // is an edge case that might occur if we restarted during a migration or other change)
392 : //
393 : // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
394 : // will take care of promoting one of these secondaries to be attached.
395 0 : self.observed.locations.keys().for_each(|node_id| {
396 0 : if Some(*node_id) != self.intent.attached {
397 0 : self.intent.secondary.push(*node_id);
398 0 : }
399 0 : });
400 0 : }
401 :
402 : /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
403 : /// attached pageserver for a shard.
404 : ///
405 : /// Returns whether we modified it, and the NodeId selected.
406 4 : fn schedule_attached(
407 4 : &mut self,
408 4 : scheduler: &mut Scheduler,
409 4 : ) -> Result<(bool, NodeId), ScheduleError> {
410 : // No work to do if we already have an attached tenant
411 4 : if let Some(node_id) = self.intent.attached {
412 0 : return Ok((false, node_id));
413 4 : }
414 :
415 4 : if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
416 : // Promote a secondary
417 2 : tracing::debug!("Promoted secondary {} to attached", promote_secondary);
418 2 : self.intent.promote_attached(scheduler, promote_secondary);
419 2 : Ok((true, promote_secondary))
420 : } else {
421 : // Pick a fresh node: either we had no secondaries or none were schedulable
422 2 : let node_id = scheduler.schedule_shard(&self.intent.secondary)?;
423 2 : tracing::debug!("Selected {} as attached", node_id);
424 2 : self.intent.set_attached(scheduler, Some(node_id));
425 2 : Ok((true, node_id))
426 : }
427 4 : }
428 :
429 4 : pub(crate) fn schedule(&mut self, scheduler: &mut Scheduler) -> Result<(), ScheduleError> {
430 4 : // TODO: before scheduling new nodes, check if any existing content in
431 4 : // self.intent refers to pageservers that are offline, and pick other
432 4 : // pageservers if so.
433 4 :
434 4 : // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
435 4 : // change their attach location.
436 4 :
437 4 : // Build the set of pageservers already in use by this tenant, to avoid scheduling
438 4 : // more work on the same pageservers we're already using.
439 4 : let mut modified = false;
440 4 :
441 4 : use PlacementPolicy::*;
442 4 : match self.policy {
443 : Single => {
444 : // Should have exactly one attached, and zero secondaries
445 0 : let (modified_attached, _attached_node_id) = self.schedule_attached(scheduler)?;
446 0 : modified |= modified_attached;
447 0 :
448 0 : if !self.intent.secondary.is_empty() {
449 0 : self.intent.clear_secondary(scheduler);
450 0 : modified = true;
451 0 : }
452 : }
453 4 : Double(secondary_count) => {
454 : // Should have exactly one attached, and N secondaries
455 4 : let (modified_attached, attached_node_id) = self.schedule_attached(scheduler)?;
456 4 : modified |= modified_attached;
457 4 :
458 4 : let mut used_pageservers = vec![attached_node_id];
459 6 : while self.intent.secondary.len() < secondary_count {
460 2 : let node_id = scheduler.schedule_shard(&used_pageservers)?;
461 2 : self.intent.push_secondary(scheduler, node_id);
462 2 : used_pageservers.push(node_id);
463 2 : modified = true;
464 : }
465 : }
466 : Detached => {
467 : // Should have no attached or secondary pageservers
468 0 : if self.intent.attached.is_some() {
469 0 : self.intent.set_attached(scheduler, None);
470 0 : modified = true;
471 0 : }
472 :
473 0 : if !self.intent.secondary.is_empty() {
474 0 : self.intent.clear_secondary(scheduler);
475 0 : modified = true;
476 0 : }
477 : }
478 : }
479 :
480 4 : if modified {
481 4 : self.sequence.0 += 1;
482 4 : }
483 :
484 4 : Ok(())
485 4 : }
486 :
487 : /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
488 : /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
489 : /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
490 : ///
491 : /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
492 : /// funciton should not be used to decide whether to reconcile.
493 0 : pub(crate) fn stably_attached(&self) -> Option<NodeId> {
494 0 : if let Some(attach_intent) = self.intent.attached {
495 0 : match self.observed.locations.get(&attach_intent) {
496 0 : Some(loc) => match &loc.conf {
497 0 : Some(conf) => match conf.mode {
498 : LocationConfigMode::AttachedMulti
499 : | LocationConfigMode::AttachedSingle
500 : | LocationConfigMode::AttachedStale => {
501 : // Our intent and observed state agree that this node is in an attached state.
502 0 : Some(attach_intent)
503 : }
504 : // Our observed config is not an attached state
505 0 : _ => None,
506 : },
507 : // Our observed state is None, i.e. in flux
508 0 : None => None,
509 : },
510 : // We have no observed state for this node
511 0 : None => None,
512 : }
513 : } else {
514 : // Our intent is not to attach
515 0 : None
516 : }
517 0 : }
518 :
519 0 : fn dirty(&self) -> bool {
520 0 : if let Some(node_id) = self.intent.attached {
521 0 : let wanted_conf = attached_location_conf(self.generation, &self.shard, &self.config);
522 0 : match self.observed.locations.get(&node_id) {
523 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
524 : Some(_) | None => {
525 0 : return true;
526 : }
527 : }
528 0 : }
529 :
530 0 : for node_id in &self.intent.secondary {
531 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
532 0 : match self.observed.locations.get(node_id) {
533 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
534 : Some(_) | None => {
535 0 : return true;
536 : }
537 : }
538 : }
539 :
540 0 : for node_id in self.observed.locations.keys() {
541 0 : if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
542 : // We have observed state that isn't part of our intent: need to clean it up.
543 0 : return true;
544 0 : }
545 : }
546 :
547 : // Even if there is no pageserver work to be done, if we have a pending notification to computes,
548 : // wake up a reconciler to send it.
549 0 : if self.pending_compute_notification {
550 0 : return true;
551 0 : }
552 0 :
553 0 : false
554 0 : }
555 :
556 : #[allow(clippy::too_many_arguments)]
557 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
558 : pub(crate) fn maybe_reconcile(
559 : &mut self,
560 : result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
561 : pageservers: &Arc<HashMap<NodeId, Node>>,
562 : compute_hook: &Arc<ComputeHook>,
563 : service_config: &service::Config,
564 : persistence: &Arc<Persistence>,
565 : gate: &Gate,
566 : cancel: &CancellationToken,
567 : ) -> Option<ReconcilerWaiter> {
568 : // If there are any ambiguous observed states, and the nodes they refer to are available,
569 : // we should reconcile to clean them up.
570 : let mut dirty_observed = false;
571 : for (node_id, observed_loc) in &self.observed.locations {
572 : let node = pageservers
573 : .get(node_id)
574 : .expect("Nodes may not be removed while referenced");
575 : if observed_loc.conf.is_none()
576 : && !matches!(node.availability, NodeAvailability::Offline)
577 : {
578 : dirty_observed = true;
579 : break;
580 : }
581 : }
582 :
583 : if !self.dirty() && !dirty_observed {
584 0 : tracing::info!("Not dirty, no reconciliation needed.");
585 : return None;
586 : }
587 :
588 : // If we are currently splitting, then never start a reconciler task: the splitting logic
589 : // requires that shards are not interfered with while it runs. Do this check here rather than
590 : // up top, so that we only log this message if we would otherwise have done a reconciliation.
591 : if !matches!(self.splitting, SplitState::Idle) {
592 0 : tracing::info!("Refusing to reconcile, splitting in progress");
593 : return None;
594 : }
595 :
596 : // Reconcile already in flight for the current sequence?
597 : if let Some(handle) = &self.reconciler {
598 : if handle.sequence == self.sequence {
599 : return Some(ReconcilerWaiter {
600 : tenant_shard_id: self.tenant_shard_id,
601 : seq_wait: self.waiter.clone(),
602 : error_seq_wait: self.error_waiter.clone(),
603 : error: self.last_error.clone(),
604 : seq: self.sequence,
605 : });
606 : }
607 : }
608 :
609 : // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
610 : // doing our sequence's work.
611 : let old_handle = self.reconciler.take();
612 :
613 : let Ok(gate_guard) = gate.enter() else {
614 : // Shutting down, don't start a reconciler
615 : return None;
616 : };
617 :
618 : let reconciler_cancel = cancel.child_token();
619 : let mut reconciler = Reconciler {
620 : tenant_shard_id: self.tenant_shard_id,
621 : shard: self.shard,
622 : generation: self.generation,
623 : intent: TargetState::from_intent(&self.intent),
624 : config: self.config.clone(),
625 : observed: self.observed.clone(),
626 : pageservers: pageservers.clone(),
627 : compute_hook: compute_hook.clone(),
628 : service_config: service_config.clone(),
629 : _gate_guard: gate_guard,
630 : cancel: reconciler_cancel.clone(),
631 : persistence: persistence.clone(),
632 : compute_notify_failure: false,
633 : };
634 :
635 : let reconcile_seq = self.sequence;
636 :
637 0 : tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
638 : let must_notify = self.pending_compute_notification;
639 : let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
640 : tenant_id=%reconciler.tenant_shard_id.tenant_id,
641 : shard_id=%reconciler.tenant_shard_id.shard_slug());
642 : metrics::RECONCILER.spawned.inc();
643 : let join_handle = tokio::task::spawn(
644 0 : async move {
645 : // Wait for any previous reconcile task to complete before we start
646 0 : if let Some(old_handle) = old_handle {
647 0 : old_handle.cancel.cancel();
648 0 : if let Err(e) = old_handle.handle.await {
649 : // We can't do much with this other than log it: the task is done, so
650 : // we may proceed with our work.
651 0 : tracing::error!("Unexpected join error waiting for reconcile task: {e}");
652 0 : }
653 0 : }
654 :
655 : // Early check for cancellation before doing any work
656 : // TODO: wrap all remote API operations in cancellation check
657 : // as well.
658 0 : if reconciler.cancel.is_cancelled() {
659 0 : metrics::RECONCILER
660 0 : .complete
661 0 : .with_label_values(&[metrics::ReconcilerMetrics::CANCEL])
662 0 : .inc();
663 0 : return;
664 0 : }
665 :
666 : // Attempt to make observed state match intent state
667 0 : let result = reconciler.reconcile().await;
668 :
669 : // If we know we had a pending compute notification from some previous action, send a notification irrespective
670 : // of whether the above reconcile() did any work
671 0 : if result.is_ok() && must_notify {
672 : // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
673 0 : reconciler.compute_notify().await.ok();
674 0 : }
675 :
676 : // Update result counter
677 0 : match &result {
678 0 : Ok(_) => metrics::RECONCILER
679 0 : .complete
680 0 : .with_label_values(&[metrics::ReconcilerMetrics::SUCCESS]),
681 0 : Err(ReconcileError::Cancel) => metrics::RECONCILER
682 0 : .complete
683 0 : .with_label_values(&[metrics::ReconcilerMetrics::CANCEL]),
684 0 : Err(_) => metrics::RECONCILER
685 0 : .complete
686 0 : .with_label_values(&[metrics::ReconcilerMetrics::ERROR]),
687 : }
688 0 : .inc();
689 0 :
690 0 : result_tx
691 0 : .send(ReconcileResult {
692 0 : sequence: reconcile_seq,
693 0 : result,
694 0 : tenant_shard_id: reconciler.tenant_shard_id,
695 0 : generation: reconciler.generation,
696 0 : observed: reconciler.observed,
697 0 : pending_compute_notification: reconciler.compute_notify_failure,
698 0 : })
699 0 : .ok();
700 0 : }
701 : .instrument(reconciler_span),
702 : );
703 :
704 : self.reconciler = Some(ReconcilerHandle {
705 : sequence: self.sequence,
706 : handle: join_handle,
707 : cancel: reconciler_cancel,
708 : });
709 :
710 : Some(ReconcilerWaiter {
711 : tenant_shard_id: self.tenant_shard_id,
712 : seq_wait: self.waiter.clone(),
713 : error_seq_wait: self.error_waiter.clone(),
714 : error: self.last_error.clone(),
715 : seq: self.sequence,
716 : })
717 : }
718 :
719 : // If we had any state at all referring to this node ID, drop it. Does not
720 : // attempt to reschedule.
721 0 : pub(crate) fn deref_node(&mut self, node_id: NodeId) {
722 0 : if self.intent.attached == Some(node_id) {
723 0 : self.intent.attached = None;
724 0 : }
725 :
726 0 : self.intent.secondary.retain(|n| n != &node_id);
727 0 :
728 0 : self.observed.locations.remove(&node_id);
729 :
730 0 : debug_assert!(!self.intent.all_pageservers().contains(&node_id));
731 0 : }
732 :
733 0 : pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
734 0 : TenantShardPersistence {
735 0 : tenant_id: self.tenant_shard_id.tenant_id.to_string(),
736 0 : shard_number: self.tenant_shard_id.shard_number.0 as i32,
737 0 : shard_count: self.tenant_shard_id.shard_count.literal() as i32,
738 0 : shard_stripe_size: self.shard.stripe_size.0 as i32,
739 0 : generation: self.generation.into().unwrap_or(0) as i32,
740 0 : generation_pageserver: self
741 0 : .intent
742 0 : .get_attached()
743 0 : .map(|n| n.0 as i64)
744 0 : .unwrap_or(i64::MAX),
745 0 :
746 0 : placement_policy: serde_json::to_string(&self.policy).unwrap(),
747 0 : config: serde_json::to_string(&self.config).unwrap(),
748 0 : splitting: SplitState::default(),
749 0 : }
750 0 : }
751 : }
752 :
753 : #[cfg(test)]
754 : pub(crate) mod tests {
755 : use pageserver_api::shard::{ShardCount, ShardNumber};
756 : use utils::id::TenantId;
757 :
758 : use crate::scheduler::test_utils::make_test_nodes;
759 :
760 : use super::*;
761 :
762 2 : fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantState {
763 2 : let tenant_id = TenantId::generate();
764 2 : let shard_number = ShardNumber(0);
765 2 : let shard_count = ShardCount::new(1);
766 2 :
767 2 : let tenant_shard_id = TenantShardId {
768 2 : tenant_id,
769 2 : shard_number,
770 2 : shard_count,
771 2 : };
772 2 : TenantState::new(
773 2 : tenant_shard_id,
774 2 : ShardIdentity::new(
775 2 : shard_number,
776 2 : shard_count,
777 2 : pageserver_api::shard::ShardStripeSize(32768),
778 2 : )
779 2 : .unwrap(),
780 2 : policy,
781 2 : )
782 2 : }
783 :
784 : /// Test the scheduling behaviors used when a tenant configured for HA is subject
785 : /// to nodes being marked offline.
786 2 : #[test]
787 2 : fn tenant_ha_scheduling() -> anyhow::Result<()> {
788 2 : // Start with three nodes. Our tenant will only use two. The third one is
789 2 : // expected to remain unused.
790 2 : let mut nodes = make_test_nodes(3);
791 2 :
792 2 : let mut scheduler = Scheduler::new(nodes.values());
793 2 :
794 2 : let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Double(1));
795 2 : tenant_state
796 2 : .schedule(&mut scheduler)
797 2 : .expect("we have enough nodes, scheduling should work");
798 2 :
799 2 : // Expect to initially be schedule on to different nodes
800 2 : assert_eq!(tenant_state.intent.secondary.len(), 1);
801 2 : assert!(tenant_state.intent.attached.is_some());
802 :
803 2 : let attached_node_id = tenant_state.intent.attached.unwrap();
804 2 : let secondary_node_id = *tenant_state.intent.secondary.iter().last().unwrap();
805 2 : assert_ne!(attached_node_id, secondary_node_id);
806 :
807 : // Notifying the attached node is offline should demote it to a secondary
808 2 : let changed = tenant_state.intent.notify_offline(attached_node_id);
809 2 : assert!(changed);
810 :
811 : // Update the scheduler state to indicate the node is offline
812 2 : nodes.get_mut(&attached_node_id).unwrap().availability = NodeAvailability::Offline;
813 2 : scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
814 2 :
815 2 : // Scheduling the node should promote the still-available secondary node to attached
816 2 : tenant_state
817 2 : .schedule(&mut scheduler)
818 2 : .expect("active nodes are available");
819 2 : assert_eq!(tenant_state.intent.attached.unwrap(), secondary_node_id);
820 :
821 : // The original attached node should have been retained as a secondary
822 2 : assert_eq!(
823 2 : *tenant_state.intent.secondary.iter().last().unwrap(),
824 2 : attached_node_id
825 2 : );
826 :
827 2 : tenant_state.intent.clear(&mut scheduler);
828 2 :
829 2 : Ok(())
830 2 : }
831 : }
|