Line data Source code
1 : use std::{collections::HashMap, sync::Arc, time::Duration};
2 :
3 : use control_plane::attachment_service::NodeAvailability;
4 : use pageserver_api::{
5 : models::{LocationConfig, LocationConfigMode, TenantConfig},
6 : shard::{ShardIdentity, TenantShardId},
7 : };
8 : use tokio::task::JoinHandle;
9 : use tokio_util::sync::CancellationToken;
10 : use utils::{
11 : generation::Generation,
12 : id::NodeId,
13 : seqwait::{SeqWait, SeqWaitError},
14 : };
15 :
16 : use crate::{
17 : compute_hook::ComputeHook,
18 : node::Node,
19 : persistence::Persistence,
20 : reconciler::{attached_location_conf, secondary_location_conf, ReconcileError, Reconciler},
21 : scheduler::{ScheduleError, Scheduler},
22 : service, PlacementPolicy, Sequence,
23 : };
24 :
25 : pub(crate) struct TenantState {
26 : pub(crate) tenant_shard_id: TenantShardId,
27 :
28 : pub(crate) shard: ShardIdentity,
29 :
30 : // Runtime only: sequence used to coordinate when updating this object while
31 : // with background reconcilers may be running. A reconciler runs to a particular
32 : // sequence.
33 : pub(crate) sequence: Sequence,
34 :
35 : // Latest generation number: next time we attach, increment this
36 : // and use the incremented number when attaching
37 : pub(crate) generation: Generation,
38 :
39 : // High level description of how the tenant should be set up. Provided
40 : // externally.
41 : pub(crate) policy: PlacementPolicy,
42 :
43 : // Low level description of exactly which pageservers should fulfil
44 : // which role. Generated by `Self::schedule`.
45 : pub(crate) intent: IntentState,
46 :
47 : // Low level description of how the tenant is configured on pageservers:
48 : // if this does not match `Self::intent` then the tenant needs reconciliation
49 : // with `Self::reconcile`.
50 : pub(crate) observed: ObservedState,
51 :
52 : // Tenant configuration, passed through opaquely to the pageserver. Identical
53 : // for all shards in a tenant.
54 : pub(crate) config: TenantConfig,
55 :
56 : /// If a reconcile task is currently in flight, it may be joined here (it is
57 : /// only safe to join if either the result has been received or the reconciler's
58 : /// cancellation token has been fired)
59 : pub(crate) reconciler: Option<ReconcilerHandle>,
60 :
61 : /// Optionally wait for reconciliation to complete up to a particular
62 : /// sequence number.
63 : pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
64 :
65 : /// Indicates sequence number for which we have encountered an error reconciling. If
66 : /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
67 : /// and callers should stop waiting for `waiter` and propagate the error.
68 : pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
69 :
70 : /// The most recent error from a reconcile on this tenant
71 : /// TODO: generalize to an array of recent events
72 : /// TOOD: use a ArcSwap instead of mutex for faster reads?
73 : pub(crate) last_error: std::sync::Arc<std::sync::Mutex<String>>,
74 :
75 : /// If we have a pending compute notification that for some reason we weren't able to send,
76 : /// set this to true. If this is set, calls to [`Self::maybe_reconcile`] will run a task to retry
77 : /// sending it. This is the mechanism by which compute notifications are included in the scope
78 : /// of state that we publish externally in an eventually consistent way.
79 : pub(crate) pending_compute_notification: bool,
80 : }
81 :
82 497 : #[derive(Default, Clone, Debug)]
83 : pub(crate) struct IntentState {
84 : pub(crate) attached: Option<NodeId>,
85 : pub(crate) secondary: Vec<NodeId>,
86 : }
87 :
88 497 : #[derive(Default, Clone)]
89 : pub(crate) struct ObservedState {
90 : pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
91 : }
92 :
93 : /// Our latest knowledge of how this tenant is configured in the outside world.
94 : ///
95 : /// Meaning:
96 : /// * No instance of this type exists for a node: we are certain that we have nothing configured on that
97 : /// node for this shard.
98 : /// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
99 : /// what it is (e.g. we failed partway through configuring it)
100 : /// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
101 : /// and that configuration will still be present unless something external interfered.
102 5 : #[derive(Clone)]
103 : pub(crate) struct ObservedStateLocation {
104 : /// If None, it means we do not know the status of this shard's location on this node, but
105 : /// we know that we might have some state on this node.
106 : pub(crate) conf: Option<LocationConfig>,
107 : }
108 : pub(crate) struct ReconcilerWaiter {
109 : // For observability purposes, remember the ID of the shard we're
110 : // waiting for.
111 : pub(crate) tenant_shard_id: TenantShardId,
112 :
113 : seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
114 : error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
115 : error: std::sync::Arc<std::sync::Mutex<String>>,
116 : seq: Sequence,
117 : }
118 :
119 2 : #[derive(thiserror::Error, Debug)]
120 : pub enum ReconcileWaitError {
121 : #[error("Timeout waiting for shard {0}")]
122 : Timeout(TenantShardId),
123 : #[error("shutting down")]
124 : Shutdown,
125 : #[error("Reconcile error on shard {0}: {1}")]
126 : Failed(TenantShardId, String),
127 : }
128 :
129 : impl ReconcilerWaiter {
130 484 : pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
131 953 : tokio::select! {
132 483 : result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
133 0 : result.map_err(|e| match e {
134 0 : SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
135 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
136 0 : })?;
137 : },
138 1 : result = self.error_seq_wait.wait_for(self.seq) => {
139 0 : result.map_err(|e| match e {
140 0 : SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
141 0 : SeqWaitError::Timeout => unreachable!()
142 0 : })?;
143 :
144 : return Err(ReconcileWaitError::Failed(self.tenant_shard_id, self.error.lock().unwrap().clone()))
145 : }
146 : }
147 :
148 483 : Ok(())
149 484 : }
150 : }
151 :
152 : /// Having spawned a reconciler task, the tenant shard's state will carry enough
153 : /// information to optionally cancel & await it later.
154 : pub(crate) struct ReconcilerHandle {
155 : sequence: Sequence,
156 : handle: JoinHandle<()>,
157 : cancel: CancellationToken,
158 : }
159 :
160 : /// When a reconcile task completes, it sends this result object
161 : /// to be applied to the primary TenantState.
162 : pub(crate) struct ReconcileResult {
163 : pub(crate) sequence: Sequence,
164 : /// On errors, `observed` should be treated as an incompleted description
165 : /// of state (i.e. any nodes present in the result should override nodes
166 : /// present in the parent tenant state, but any unmentioned nodes should
167 : /// not be removed from parent tenant state)
168 : pub(crate) result: Result<(), ReconcileError>,
169 :
170 : pub(crate) tenant_shard_id: TenantShardId,
171 : pub(crate) generation: Generation,
172 : pub(crate) observed: ObservedState,
173 :
174 : /// Set [`TenantState::pending_compute_notification`] from this flag
175 : pub(crate) pending_compute_notification: bool,
176 : }
177 :
178 : impl IntentState {
179 9 : pub(crate) fn new() -> Self {
180 9 : Self {
181 9 : attached: None,
182 9 : secondary: vec![],
183 9 : }
184 9 : }
185 1816 : pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
186 1816 : let mut result = Vec::new();
187 1816 : if let Some(p) = self.attached {
188 1326 : result.push(p)
189 490 : }
190 :
191 1816 : result.extend(self.secondary.iter().copied());
192 1816 :
193 1816 : result
194 1816 : }
195 :
196 : /// When a node goes offline, we update intents to avoid using it
197 : /// as their attached pageserver.
198 : ///
199 : /// Returns true if a change was made
200 13 : pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool {
201 13 : if self.attached == Some(node_id) {
202 5 : self.attached = None;
203 5 : self.secondary.push(node_id);
204 5 : true
205 : } else {
206 8 : false
207 : }
208 13 : }
209 : }
210 :
211 : impl ObservedState {
212 9 : pub(crate) fn new() -> Self {
213 9 : Self {
214 9 : locations: HashMap::new(),
215 9 : }
216 9 : }
217 : }
218 :
219 : impl TenantState {
220 497 : pub(crate) fn new(
221 497 : tenant_shard_id: TenantShardId,
222 497 : shard: ShardIdentity,
223 497 : policy: PlacementPolicy,
224 497 : ) -> Self {
225 497 : Self {
226 497 : tenant_shard_id,
227 497 : policy,
228 497 : intent: IntentState::default(),
229 497 : generation: Generation::new(0),
230 497 : shard,
231 497 : observed: ObservedState::default(),
232 497 : config: TenantConfig::default(),
233 497 : reconciler: None,
234 497 : sequence: Sequence(1),
235 497 : waiter: Arc::new(SeqWait::new(Sequence(0))),
236 497 : error_waiter: Arc::new(SeqWait::new(Sequence(0))),
237 497 : last_error: Arc::default(),
238 497 : pending_compute_notification: false,
239 497 : }
240 497 : }
241 :
242 : /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
243 : /// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
244 : /// to get an intent state that complies with placement policy. The overall goal is to do scheduling
245 : /// in a way that makes use of any configured locations that already exist in the outside world.
246 9 : pub(crate) fn intent_from_observed(&mut self) {
247 9 : // Choose an attached location by filtering observed locations, and then sorting to get the highest
248 9 : // generation
249 9 : let mut attached_locs = self
250 9 : .observed
251 9 : .locations
252 9 : .iter()
253 9 : .filter_map(|(node_id, l)| {
254 4 : if let Some(conf) = &l.conf {
255 4 : if conf.mode == LocationConfigMode::AttachedMulti
256 4 : || conf.mode == LocationConfigMode::AttachedSingle
257 0 : || conf.mode == LocationConfigMode::AttachedStale
258 : {
259 4 : Some((node_id, conf.generation))
260 : } else {
261 0 : None
262 : }
263 : } else {
264 0 : None
265 : }
266 9 : })
267 9 : .collect::<Vec<_>>();
268 9 :
269 9 : attached_locs.sort_by_key(|i| i.1);
270 9 : if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
271 4 : self.intent.attached = Some(*node_id);
272 5 : }
273 :
274 : // All remaining observed locations generate secondary intents. This includes None
275 : // observations, as these may well have some local content on disk that is usable (this
276 : // is an edge case that might occur if we restarted during a migration or other change)
277 9 : self.observed.locations.keys().for_each(|node_id| {
278 4 : if Some(*node_id) != self.intent.attached {
279 0 : self.intent.secondary.push(*node_id);
280 4 : }
281 9 : });
282 9 : }
283 :
284 1328 : pub(crate) fn schedule(&mut self, scheduler: &mut Scheduler) -> Result<(), ScheduleError> {
285 1328 : // TODO: before scheduling new nodes, check if any existing content in
286 1328 : // self.intent refers to pageservers that are offline, and pick other
287 1328 : // pageservers if so.
288 1328 :
289 1328 : // Build the set of pageservers already in use by this tenant, to avoid scheduling
290 1328 : // more work on the same pageservers we're already using.
291 1328 : let mut used_pageservers = self.intent.all_pageservers();
292 1328 : let mut modified = false;
293 1328 :
294 1328 : use PlacementPolicy::*;
295 1328 : match self.policy {
296 : Single => {
297 : // Should have exactly one attached, and zero secondaries
298 1327 : if self.intent.attached.is_none() {
299 489 : let node_id = scheduler.schedule_shard(&used_pageservers)?;
300 489 : self.intent.attached = Some(node_id);
301 489 : used_pageservers.push(node_id);
302 489 : modified = true;
303 838 : }
304 1327 : if !self.intent.secondary.is_empty() {
305 5 : self.intent.secondary.clear();
306 5 : modified = true;
307 1322 : }
308 : }
309 0 : Double(secondary_count) => {
310 0 : // Should have exactly one attached, and N secondaries
311 0 : if self.intent.attached.is_none() {
312 0 : let node_id = scheduler.schedule_shard(&used_pageservers)?;
313 0 : self.intent.attached = Some(node_id);
314 0 : used_pageservers.push(node_id);
315 0 : modified = true;
316 0 : }
317 :
318 0 : while self.intent.secondary.len() < secondary_count {
319 0 : let node_id = scheduler.schedule_shard(&used_pageservers)?;
320 0 : self.intent.secondary.push(node_id);
321 0 : used_pageservers.push(node_id);
322 0 : modified = true;
323 : }
324 : }
325 : Detached => {
326 : // Should have no attached or secondary pageservers
327 1 : if self.intent.attached.is_some() {
328 0 : self.intent.attached = None;
329 0 : modified = true;
330 1 : }
331 :
332 1 : if !self.intent.secondary.is_empty() {
333 0 : self.intent.secondary.clear();
334 0 : modified = true;
335 1 : }
336 : }
337 : }
338 :
339 1328 : if modified {
340 489 : self.sequence.0 += 1;
341 839 : }
342 :
343 1328 : Ok(())
344 1328 : }
345 :
346 : /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
347 : /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
348 : /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
349 : ///
350 : /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
351 : /// funciton should not be used to decide whether to reconcile.
352 9 : pub(crate) fn stably_attached(&self) -> Option<NodeId> {
353 9 : if let Some(attach_intent) = self.intent.attached {
354 8 : match self.observed.locations.get(&attach_intent) {
355 4 : Some(loc) => match &loc.conf {
356 4 : Some(conf) => match conf.mode {
357 : LocationConfigMode::AttachedMulti
358 : | LocationConfigMode::AttachedSingle
359 : | LocationConfigMode::AttachedStale => {
360 : // Our intent and observed state agree that this node is in an attached state.
361 4 : Some(attach_intent)
362 : }
363 : // Our observed config is not an attached state
364 0 : _ => None,
365 : },
366 : // Our observed state is None, i.e. in flux
367 0 : None => None,
368 : },
369 : // We have no observed state for this node
370 4 : None => None,
371 : }
372 : } else {
373 : // Our intent is not to attach
374 1 : None
375 : }
376 9 : }
377 :
378 1332 : fn dirty(&self) -> bool {
379 1332 : if let Some(node_id) = self.intent.attached {
380 1331 : let wanted_conf = attached_location_conf(self.generation, &self.shard, &self.config);
381 1331 : match self.observed.locations.get(&node_id) {
382 834 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
383 : Some(_) | None => {
384 497 : return true;
385 : }
386 : }
387 1 : }
388 :
389 835 : for node_id in &self.intent.secondary {
390 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
391 0 : match self.observed.locations.get(node_id) {
392 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
393 : Some(_) | None => {
394 0 : return true;
395 : }
396 : }
397 : }
398 :
399 : // Even if there is no pageserver work to be done, if we have a pending notification to computes,
400 : // wake up a reconciler to send it.
401 835 : if self.pending_compute_notification {
402 0 : return true;
403 835 : }
404 835 :
405 835 : false
406 1332 : }
407 :
408 1332 : pub(crate) fn maybe_reconcile(
409 1332 : &mut self,
410 1332 : result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
411 1332 : pageservers: &Arc<HashMap<NodeId, Node>>,
412 1332 : compute_hook: &Arc<ComputeHook>,
413 1332 : service_config: &service::Config,
414 1332 : persistence: &Arc<Persistence>,
415 1332 : ) -> Option<ReconcilerWaiter> {
416 1332 : // If there are any ambiguous observed states, and the nodes they refer to are available,
417 1332 : // we should reconcile to clean them up.
418 1332 : let mut dirty_observed = false;
419 2171 : for (node_id, observed_loc) in &self.observed.locations {
420 843 : let node = pageservers
421 843 : .get(node_id)
422 843 : .expect("Nodes may not be removed while referenced");
423 843 : if observed_loc.conf.is_none()
424 9 : && !matches!(node.availability, NodeAvailability::Offline)
425 : {
426 4 : dirty_observed = true;
427 4 : break;
428 839 : }
429 : }
430 :
431 1332 : if !self.dirty() && !dirty_observed {
432 835 : tracing::info!("Not dirty, no reconciliation needed.");
433 835 : return None;
434 497 : }
435 :
436 : // Reconcile already in flight for the current sequence?
437 497 : if let Some(handle) = &self.reconciler {
438 9 : if handle.sequence == self.sequence {
439 4 : return Some(ReconcilerWaiter {
440 4 : tenant_shard_id: self.tenant_shard_id,
441 4 : seq_wait: self.waiter.clone(),
442 4 : error_seq_wait: self.error_waiter.clone(),
443 4 : error: self.last_error.clone(),
444 4 : seq: self.sequence,
445 4 : });
446 5 : }
447 488 : }
448 :
449 : // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
450 : // doing our sequence's work.
451 493 : let old_handle = self.reconciler.take();
452 493 :
453 493 : let cancel = CancellationToken::new();
454 493 : let mut reconciler = Reconciler {
455 493 : tenant_shard_id: self.tenant_shard_id,
456 493 : shard: self.shard,
457 493 : generation: self.generation,
458 493 : intent: self.intent.clone(),
459 493 : config: self.config.clone(),
460 493 : observed: self.observed.clone(),
461 493 : pageservers: pageservers.clone(),
462 493 : compute_hook: compute_hook.clone(),
463 493 : service_config: service_config.clone(),
464 493 : cancel: cancel.clone(),
465 493 : persistence: persistence.clone(),
466 493 : compute_notify_failure: false,
467 493 : };
468 493 :
469 493 : let reconcile_seq = self.sequence;
470 493 :
471 493 : tracing::info!("Spawning Reconciler for sequence {}", self.sequence);
472 493 : let must_notify = self.pending_compute_notification;
473 493 : let join_handle = tokio::task::spawn(async move {
474 : // Wait for any previous reconcile task to complete before we start
475 493 : if let Some(old_handle) = old_handle {
476 5 : old_handle.cancel.cancel();
477 5 : if let Err(e) = old_handle.handle.await {
478 : // We can't do much with this other than log it: the task is done, so
479 : // we may proceed with our work.
480 0 : tracing::error!("Unexpected join error waiting for reconcile task: {e}");
481 5 : }
482 488 : }
483 :
484 : // Early check for cancellation before doing any work
485 : // TODO: wrap all remote API operations in cancellation check
486 : // as well.
487 493 : if reconciler.cancel.is_cancelled() {
488 0 : return;
489 493 : }
490 :
491 : // Attempt to make observed state match intent state
492 2366 : let result = reconciler.reconcile().await;
493 :
494 : // If we know we had a pending compute notification from some previous action, send a notification irrespective
495 : // of whether the above reconcile() did any work
496 491 : if result.is_ok() && must_notify {
497 : // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
498 0 : reconciler.compute_notify().await.ok();
499 491 : }
500 :
501 491 : result_tx
502 491 : .send(ReconcileResult {
503 491 : sequence: reconcile_seq,
504 491 : result,
505 491 : tenant_shard_id: reconciler.tenant_shard_id,
506 491 : generation: reconciler.generation,
507 491 : observed: reconciler.observed,
508 491 : pending_compute_notification: reconciler.compute_notify_failure,
509 491 : })
510 491 : .ok();
511 493 : });
512 493 :
513 493 : self.reconciler = Some(ReconcilerHandle {
514 493 : sequence: self.sequence,
515 493 : handle: join_handle,
516 493 : cancel,
517 493 : });
518 493 :
519 493 : Some(ReconcilerWaiter {
520 493 : tenant_shard_id: self.tenant_shard_id,
521 493 : seq_wait: self.waiter.clone(),
522 493 : error_seq_wait: self.error_waiter.clone(),
523 493 : error: self.last_error.clone(),
524 493 : seq: self.sequence,
525 493 : })
526 1332 : }
527 : }
|