Line data Source code
1 : use std::{collections::HashMap, sync::Arc, time::Duration};
2 :
3 : use control_plane::attachment_service::NodeAvailability;
4 : use pageserver_api::{
5 : models::{LocationConfig, LocationConfigMode, TenantConfig},
6 : shard::{ShardIdentity, TenantShardId},
7 : };
8 : use tokio::task::JoinHandle;
9 : use tokio_util::sync::CancellationToken;
10 : use utils::{
11 : generation::Generation,
12 : id::NodeId,
13 : seqwait::{SeqWait, SeqWaitError},
14 : };
15 :
16 : use crate::{
17 : compute_hook::ComputeHook,
18 : node::Node,
19 : persistence::Persistence,
20 : reconciler::{attached_location_conf, secondary_location_conf, ReconcileError, Reconciler},
21 : scheduler::{ScheduleError, Scheduler},
22 : service, PlacementPolicy, Sequence,
23 : };
24 :
25 : pub(crate) struct TenantState {
26 : pub(crate) tenant_shard_id: TenantShardId,
27 :
28 : pub(crate) shard: ShardIdentity,
29 :
30 : // Runtime only: sequence used to coordinate when updating this object while
31 : // with background reconcilers may be running. A reconciler runs to a particular
32 : // sequence.
33 : pub(crate) sequence: Sequence,
34 :
35 : // Latest generation number: next time we attach, increment this
36 : // and use the incremented number when attaching
37 : pub(crate) generation: Generation,
38 :
39 : // High level description of how the tenant should be set up. Provided
40 : // externally.
41 : pub(crate) policy: PlacementPolicy,
42 :
43 : // Low level description of exactly which pageservers should fulfil
44 : // which role. Generated by `Self::schedule`.
45 : pub(crate) intent: IntentState,
46 :
47 : // Low level description of how the tenant is configured on pageservers:
48 : // if this does not match `Self::intent` then the tenant needs reconciliation
49 : // with `Self::reconcile`.
50 : pub(crate) observed: ObservedState,
51 :
52 : // Tenant configuration, passed through opaquely to the pageserver. Identical
53 : // for all shards in a tenant.
54 : pub(crate) config: TenantConfig,
55 :
56 : /// If a reconcile task is currently in flight, it may be joined here (it is
57 : /// only safe to join if either the result has been received or the reconciler's
58 : /// cancellation token has been fired)
59 : pub(crate) reconciler: Option<ReconcilerHandle>,
60 :
61 : /// Optionally wait for reconciliation to complete up to a particular
62 : /// sequence number.
63 : pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
64 :
65 : /// Indicates sequence number for which we have encountered an error reconciling. If
66 : /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
67 : /// and callers should stop waiting for `waiter` and propagate the error.
68 : pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
69 :
70 : /// The most recent error from a reconcile on this tenant
71 : /// TODO: generalize to an array of recent events
72 : /// TOOD: use a ArcSwap instead of mutex for faster reads?
73 : pub(crate) last_error: std::sync::Arc<std::sync::Mutex<String>>,
74 :
75 : /// If we have a pending compute notification that for some reason we weren't able to send,
76 : /// set this to true. If this is set, calls to [`Self::maybe_reconcile`] will run a task to retry
77 : /// sending it. This is the mechanism by which compute notifications are included in the scope
78 : /// of state that we publish externally in an eventually consistent way.
79 : pub(crate) pending_compute_notification: bool,
80 : }
81 :
82 515 : #[derive(Default, Clone, Debug)]
83 : pub(crate) struct IntentState {
84 : pub(crate) attached: Option<NodeId>,
85 : pub(crate) secondary: Vec<NodeId>,
86 : }
87 :
88 515 : #[derive(Default, Clone)]
89 : pub(crate) struct ObservedState {
90 : pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
91 : }
92 :
93 : /// Our latest knowledge of how this tenant is configured in the outside world.
94 : ///
95 : /// Meaning:
96 : /// * No instance of this type exists for a node: we are certain that we have nothing configured on that
97 : /// node for this shard.
98 : /// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
99 : /// what it is (e.g. we failed partway through configuring it)
100 : /// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
101 : /// and that configuration will still be present unless something external interfered.
102 9 : #[derive(Clone)]
103 : pub(crate) struct ObservedStateLocation {
104 : /// If None, it means we do not know the status of this shard's location on this node, but
105 : /// we know that we might have some state on this node.
106 : pub(crate) conf: Option<LocationConfig>,
107 : }
108 : pub(crate) struct ReconcilerWaiter {
109 : // For observability purposes, remember the ID of the shard we're
110 : // waiting for.
111 : pub(crate) tenant_shard_id: TenantShardId,
112 :
113 : seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
114 : error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
115 : error: std::sync::Arc<std::sync::Mutex<String>>,
116 : seq: Sequence,
117 : }
118 :
119 2 : #[derive(thiserror::Error, Debug)]
120 : pub enum ReconcileWaitError {
121 : #[error("Timeout waiting for shard {0}")]
122 : Timeout(TenantShardId),
123 : #[error("shutting down")]
124 : Shutdown,
125 : #[error("Reconcile error on shard {0}: {1}")]
126 : Failed(TenantShardId, String),
127 : }
128 :
129 : impl ReconcilerWaiter {
130 496 : pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
131 973 : tokio::select! {
132 495 : result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
133 0 : result.map_err(|e| match e {
134 0 : SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
135 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
136 0 : })?;
137 : },
138 1 : result = self.error_seq_wait.wait_for(self.seq) => {
139 0 : result.map_err(|e| match e {
140 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
141 0 : SeqWaitError::Timeout => unreachable!()
142 0 : })?;
143 :
144 : return Err(ReconcileWaitError::Failed(self.tenant_shard_id, self.error.lock().unwrap().clone()))
145 : }
146 : }
147 :
148 495 : Ok(())
149 496 : }
150 : }
151 :
152 : /// Having spawned a reconciler task, the tenant shard's state will carry enough
153 : /// information to optionally cancel & await it later.
154 : pub(crate) struct ReconcilerHandle {
155 : sequence: Sequence,
156 : handle: JoinHandle<()>,
157 : cancel: CancellationToken,
158 : }
159 :
160 : /// When a reconcile task completes, it sends this result object
161 : /// to be applied to the primary TenantState.
162 : pub(crate) struct ReconcileResult {
163 : pub(crate) sequence: Sequence,
164 : /// On errors, `observed` should be treated as an incompleted description
165 : /// of state (i.e. any nodes present in the result should override nodes
166 : /// present in the parent tenant state, but any unmentioned nodes should
167 : /// not be removed from parent tenant state)
168 : pub(crate) result: Result<(), ReconcileError>,
169 :
170 : pub(crate) tenant_shard_id: TenantShardId,
171 : pub(crate) generation: Generation,
172 : pub(crate) observed: ObservedState,
173 :
174 : /// Set [`TenantState::pending_compute_notification`] from this flag
175 : pub(crate) pending_compute_notification: bool,
176 : }
177 :
178 : impl IntentState {
179 12 : pub(crate) fn new() -> Self {
180 12 : Self {
181 12 : attached: None,
182 12 : secondary: vec![],
183 12 : }
184 12 : }
185 1841 : pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
186 1841 : let mut result = Vec::new();
187 1841 : if let Some(p) = self.attached {
188 1342 : result.push(p)
189 499 : }
190 :
191 1841 : result.extend(self.secondary.iter().copied());
192 1841 :
193 1841 : result
194 1841 : }
195 :
196 10 : pub(crate) fn single(node_id: Option<NodeId>) -> Self {
197 10 : Self {
198 10 : attached: node_id,
199 10 : secondary: vec![],
200 10 : }
201 10 : }
202 :
203 : /// When a node goes offline, we update intents to avoid using it
204 : /// as their attached pageserver.
205 : ///
206 : /// Returns true if a change was made
207 13 : pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool {
208 13 : if self.attached == Some(node_id) {
209 5 : self.attached = None;
210 5 : self.secondary.push(node_id);
211 5 : true
212 : } else {
213 8 : false
214 : }
215 13 : }
216 : }
217 :
218 : impl ObservedState {
219 12 : pub(crate) fn new() -> Self {
220 12 : Self {
221 12 : locations: HashMap::new(),
222 12 : }
223 12 : }
224 : }
225 :
226 : impl TenantState {
227 515 : pub(crate) fn new(
228 515 : tenant_shard_id: TenantShardId,
229 515 : shard: ShardIdentity,
230 515 : policy: PlacementPolicy,
231 515 : ) -> Self {
232 515 : Self {
233 515 : tenant_shard_id,
234 515 : policy,
235 515 : intent: IntentState::default(),
236 515 : generation: Generation::new(0),
237 515 : shard,
238 515 : observed: ObservedState::default(),
239 515 : config: TenantConfig::default(),
240 515 : reconciler: None,
241 515 : sequence: Sequence(1),
242 515 : waiter: Arc::new(SeqWait::new(Sequence(0))),
243 515 : error_waiter: Arc::new(SeqWait::new(Sequence(0))),
244 515 : last_error: Arc::default(),
245 515 : pending_compute_notification: false,
246 515 : }
247 515 : }
248 :
249 : /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
250 : /// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
251 : /// to get an intent state that complies with placement policy. The overall goal is to do scheduling
252 : /// in a way that makes use of any configured locations that already exist in the outside world.
253 8 : pub(crate) fn intent_from_observed(&mut self) {
254 8 : // Choose an attached location by filtering observed locations, and then sorting to get the highest
255 8 : // generation
256 8 : let mut attached_locs = self
257 8 : .observed
258 8 : .locations
259 8 : .iter()
260 8 : .filter_map(|(node_id, l)| {
261 5 : if let Some(conf) = &l.conf {
262 5 : if conf.mode == LocationConfigMode::AttachedMulti
263 5 : || conf.mode == LocationConfigMode::AttachedSingle
264 0 : || conf.mode == LocationConfigMode::AttachedStale
265 : {
266 5 : Some((node_id, conf.generation))
267 : } else {
268 0 : None
269 : }
270 : } else {
271 0 : None
272 : }
273 8 : })
274 8 : .collect::<Vec<_>>();
275 8 :
276 8 : attached_locs.sort_by_key(|i| i.1);
277 8 : if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
278 5 : self.intent.attached = Some(*node_id);
279 5 : }
280 :
281 : // All remaining observed locations generate secondary intents. This includes None
282 : // observations, as these may well have some local content on disk that is usable (this
283 : // is an edge case that might occur if we restarted during a migration or other change)
284 8 : self.observed.locations.keys().for_each(|node_id| {
285 5 : if Some(*node_id) != self.intent.attached {
286 0 : self.intent.secondary.push(*node_id);
287 5 : }
288 8 : });
289 8 : }
290 :
291 1338 : pub(crate) fn schedule(&mut self, scheduler: &mut Scheduler) -> Result<(), ScheduleError> {
292 1338 : // TODO: before scheduling new nodes, check if any existing content in
293 1338 : // self.intent refers to pageservers that are offline, and pick other
294 1338 : // pageservers if so.
295 1338 :
296 1338 : // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
297 1338 : // change their attach location.
298 1338 :
299 1338 : // Build the set of pageservers already in use by this tenant, to avoid scheduling
300 1338 : // more work on the same pageservers we're already using.
301 1338 : let mut used_pageservers = self.intent.all_pageservers();
302 1338 : let mut modified = false;
303 1338 :
304 1338 : use PlacementPolicy::*;
305 1338 : match self.policy {
306 : Single => {
307 : // Should have exactly one attached, and zero secondaries
308 1337 : if self.intent.attached.is_none() {
309 497 : let node_id = scheduler.schedule_shard(&used_pageservers)?;
310 497 : self.intent.attached = Some(node_id);
311 497 : used_pageservers.push(node_id);
312 497 : modified = true;
313 840 : }
314 1337 : if !self.intent.secondary.is_empty() {
315 5 : self.intent.secondary.clear();
316 5 : modified = true;
317 1332 : }
318 : }
319 0 : Double(secondary_count) => {
320 0 : // Should have exactly one attached, and N secondaries
321 0 : if self.intent.attached.is_none() {
322 0 : let node_id = scheduler.schedule_shard(&used_pageservers)?;
323 0 : self.intent.attached = Some(node_id);
324 0 : used_pageservers.push(node_id);
325 0 : modified = true;
326 0 : }
327 :
328 0 : while self.intent.secondary.len() < secondary_count {
329 0 : let node_id = scheduler.schedule_shard(&used_pageservers)?;
330 0 : self.intent.secondary.push(node_id);
331 0 : used_pageservers.push(node_id);
332 0 : modified = true;
333 : }
334 : }
335 : Detached => {
336 : // Should have no attached or secondary pageservers
337 1 : if self.intent.attached.is_some() {
338 0 : self.intent.attached = None;
339 0 : modified = true;
340 1 : }
341 :
342 1 : if !self.intent.secondary.is_empty() {
343 0 : self.intent.secondary.clear();
344 0 : modified = true;
345 1 : }
346 : }
347 : }
348 :
349 1338 : if modified {
350 497 : self.sequence.0 += 1;
351 841 : }
352 :
353 1338 : Ok(())
354 1338 : }
355 :
356 : /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
357 : /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
358 : /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
359 : ///
360 : /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
361 : /// funciton should not be used to decide whether to reconcile.
362 8 : pub(crate) fn stably_attached(&self) -> Option<NodeId> {
363 8 : if let Some(attach_intent) = self.intent.attached {
364 7 : match self.observed.locations.get(&attach_intent) {
365 5 : Some(loc) => match &loc.conf {
366 5 : Some(conf) => match conf.mode {
367 : LocationConfigMode::AttachedMulti
368 : | LocationConfigMode::AttachedSingle
369 : | LocationConfigMode::AttachedStale => {
370 : // Our intent and observed state agree that this node is in an attached state.
371 5 : Some(attach_intent)
372 : }
373 : // Our observed config is not an attached state
374 0 : _ => None,
375 : },
376 : // Our observed state is None, i.e. in flux
377 0 : None => None,
378 : },
379 : // We have no observed state for this node
380 2 : None => None,
381 : }
382 : } else {
383 : // Our intent is not to attach
384 1 : None
385 : }
386 8 : }
387 :
388 1346 : fn dirty(&self) -> bool {
389 1346 : if let Some(node_id) = self.intent.attached {
390 1345 : let wanted_conf = attached_location_conf(self.generation, &self.shard, &self.config);
391 1345 : match self.observed.locations.get(&node_id) {
392 838 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
393 : Some(_) | None => {
394 507 : return true;
395 : }
396 : }
397 1 : }
398 :
399 839 : for node_id in &self.intent.secondary {
400 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
401 0 : match self.observed.locations.get(node_id) {
402 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
403 : Some(_) | None => {
404 0 : return true;
405 : }
406 : }
407 : }
408 :
409 : // Even if there is no pageserver work to be done, if we have a pending notification to computes,
410 : // wake up a reconciler to send it.
411 839 : if self.pending_compute_notification {
412 0 : return true;
413 839 : }
414 839 :
415 839 : false
416 1346 : }
417 :
418 1346 : pub(crate) fn maybe_reconcile(
419 1346 : &mut self,
420 1346 : result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
421 1346 : pageservers: &Arc<HashMap<NodeId, Node>>,
422 1346 : compute_hook: &Arc<ComputeHook>,
423 1346 : service_config: &service::Config,
424 1346 : persistence: &Arc<Persistence>,
425 1346 : ) -> Option<ReconcilerWaiter> {
426 1346 : // If there are any ambiguous observed states, and the nodes they refer to are available,
427 1346 : // we should reconcile to clean them up.
428 1346 : let mut dirty_observed = false;
429 2193 : for (node_id, observed_loc) in &self.observed.locations {
430 851 : let node = pageservers
431 851 : .get(node_id)
432 851 : .expect("Nodes may not be removed while referenced");
433 851 : if observed_loc.conf.is_none()
434 9 : && !matches!(node.availability, NodeAvailability::Offline)
435 : {
436 4 : dirty_observed = true;
437 4 : break;
438 847 : }
439 : }
440 :
441 1346 : if !self.dirty() && !dirty_observed {
442 839 : tracing::info!("Not dirty, no reconciliation needed.");
443 839 : return None;
444 507 : }
445 :
446 : // Reconcile already in flight for the current sequence?
447 507 : if let Some(handle) = &self.reconciler {
448 9 : if handle.sequence == self.sequence {
449 4 : return Some(ReconcilerWaiter {
450 4 : tenant_shard_id: self.tenant_shard_id,
451 4 : seq_wait: self.waiter.clone(),
452 4 : error_seq_wait: self.error_waiter.clone(),
453 4 : error: self.last_error.clone(),
454 4 : seq: self.sequence,
455 4 : });
456 5 : }
457 498 : }
458 :
459 : // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
460 : // doing our sequence's work.
461 503 : let old_handle = self.reconciler.take();
462 503 :
463 503 : let cancel = CancellationToken::new();
464 503 : let mut reconciler = Reconciler {
465 503 : tenant_shard_id: self.tenant_shard_id,
466 503 : shard: self.shard,
467 503 : generation: self.generation,
468 503 : intent: self.intent.clone(),
469 503 : config: self.config.clone(),
470 503 : observed: self.observed.clone(),
471 503 : pageservers: pageservers.clone(),
472 503 : compute_hook: compute_hook.clone(),
473 503 : service_config: service_config.clone(),
474 503 : cancel: cancel.clone(),
475 503 : persistence: persistence.clone(),
476 503 : compute_notify_failure: false,
477 503 : };
478 503 :
479 503 : let reconcile_seq = self.sequence;
480 503 :
481 503 : tracing::info!("Spawning Reconciler for sequence {}", self.sequence);
482 503 : let must_notify = self.pending_compute_notification;
483 503 : let join_handle = tokio::task::spawn(async move {
484 : // Wait for any previous reconcile task to complete before we start
485 503 : if let Some(old_handle) = old_handle {
486 5 : old_handle.cancel.cancel();
487 5 : if let Err(e) = old_handle.handle.await {
488 : // We can't do much with this other than log it: the task is done, so
489 : // we may proceed with our work.
490 0 : tracing::error!("Unexpected join error waiting for reconcile task: {e}");
491 5 : }
492 498 : }
493 :
494 : // Early check for cancellation before doing any work
495 : // TODO: wrap all remote API operations in cancellation check
496 : // as well.
497 503 : if reconciler.cancel.is_cancelled() {
498 0 : return;
499 503 : }
500 :
501 : // Attempt to make observed state match intent state
502 2631 : let result = reconciler.reconcile().await;
503 :
504 : // If we know we had a pending compute notification from some previous action, send a notification irrespective
505 : // of whether the above reconcile() did any work
506 501 : if result.is_ok() && must_notify {
507 : // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
508 0 : reconciler.compute_notify().await.ok();
509 501 : }
510 :
511 501 : result_tx
512 501 : .send(ReconcileResult {
513 501 : sequence: reconcile_seq,
514 501 : result,
515 501 : tenant_shard_id: reconciler.tenant_shard_id,
516 501 : generation: reconciler.generation,
517 501 : observed: reconciler.observed,
518 501 : pending_compute_notification: reconciler.compute_notify_failure,
519 501 : })
520 501 : .ok();
521 503 : });
522 503 :
523 503 : self.reconciler = Some(ReconcilerHandle {
524 503 : sequence: self.sequence,
525 503 : handle: join_handle,
526 503 : cancel,
527 503 : });
528 503 :
529 503 : Some(ReconcilerWaiter {
530 503 : tenant_shard_id: self.tenant_shard_id,
531 503 : seq_wait: self.waiter.clone(),
532 503 : error_seq_wait: self.error_waiter.clone(),
533 503 : error: self.last_error.clone(),
534 503 : seq: self.sequence,
535 503 : })
536 1346 : }
537 :
538 : // If we had any state at all referring to this node ID, drop it. Does not
539 : // attempt to reschedule.
540 3 : pub(crate) fn deref_node(&mut self, node_id: NodeId) {
541 3 : if self.intent.attached == Some(node_id) {
542 1 : self.intent.attached = None;
543 2 : }
544 :
545 3 : self.intent.secondary.retain(|n| n != &node_id);
546 3 :
547 3 : self.observed.locations.remove(&node_id);
548 :
549 3 : debug_assert!(!self.intent.all_pageservers().contains(&node_id));
550 3 : }
551 : }
|