LCOV - code coverage report
Current view: top level - control_plane/attachment_service/src - tenant_state.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 85.4 % 268 229
Test Date: 2024-02-07 07:37:29 Functions: 80.0 % 30 24

            Line data    Source code
       1              : use std::{collections::HashMap, sync::Arc, time::Duration};
       2              : 
       3              : use control_plane::attachment_service::NodeAvailability;
       4              : use pageserver_api::{
       5              :     models::{LocationConfig, LocationConfigMode, TenantConfig},
       6              :     shard::{ShardIdentity, TenantShardId},
       7              : };
       8              : use tokio::task::JoinHandle;
       9              : use tokio_util::sync::CancellationToken;
      10              : use utils::{
      11              :     generation::Generation,
      12              :     id::NodeId,
      13              :     seqwait::{SeqWait, SeqWaitError},
      14              : };
      15              : 
      16              : use crate::{
      17              :     compute_hook::ComputeHook,
      18              :     node::Node,
      19              :     persistence::Persistence,
      20              :     reconciler::{attached_location_conf, secondary_location_conf, ReconcileError, Reconciler},
      21              :     scheduler::{ScheduleError, Scheduler},
      22              :     service, PlacementPolicy, Sequence,
      23              : };
      24              : 
      25              : pub(crate) struct TenantState {
      26              :     pub(crate) tenant_shard_id: TenantShardId,
      27              : 
      28              :     pub(crate) shard: ShardIdentity,
      29              : 
      30              :     // Runtime only: sequence used to coordinate when updating this object while
      31              :     // with background reconcilers may be running.  A reconciler runs to a particular
      32              :     // sequence.
      33              :     pub(crate) sequence: Sequence,
      34              : 
      35              :     // Latest generation number: next time we attach, increment this
      36              :     // and use the incremented number when attaching
      37              :     pub(crate) generation: Generation,
      38              : 
      39              :     // High level description of how the tenant should be set up.  Provided
      40              :     // externally.
      41              :     pub(crate) policy: PlacementPolicy,
      42              : 
      43              :     // Low level description of exactly which pageservers should fulfil
      44              :     // which role.  Generated by `Self::schedule`.
      45              :     pub(crate) intent: IntentState,
      46              : 
      47              :     // Low level description of how the tenant is configured on pageservers:
      48              :     // if this does not match `Self::intent` then the tenant needs reconciliation
      49              :     // with `Self::reconcile`.
      50              :     pub(crate) observed: ObservedState,
      51              : 
      52              :     // Tenant configuration, passed through opaquely to the pageserver.  Identical
      53              :     // for all shards in a tenant.
      54              :     pub(crate) config: TenantConfig,
      55              : 
      56              :     /// If a reconcile task is currently in flight, it may be joined here (it is
      57              :     /// only safe to join if either the result has been received or the reconciler's
      58              :     /// cancellation token has been fired)
      59              :     pub(crate) reconciler: Option<ReconcilerHandle>,
      60              : 
      61              :     /// Optionally wait for reconciliation to complete up to a particular
      62              :     /// sequence number.
      63              :     pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
      64              : 
      65              :     /// Indicates sequence number for which we have encountered an error reconciling.  If
      66              :     /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
      67              :     /// and callers should stop waiting for `waiter` and propagate the error.
      68              :     pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
      69              : 
      70              :     /// The most recent error from a reconcile on this tenant
      71              :     /// TODO: generalize to an array of recent events
      72              :     /// TOOD: use a ArcSwap instead of mutex for faster reads?
      73              :     pub(crate) last_error: std::sync::Arc<std::sync::Mutex<String>>,
      74              : 
      75              :     /// If we have a pending compute notification that for some reason we weren't able to send,
      76              :     /// set this to true. If this is set, calls to [`Self::maybe_reconcile`] will run a task to retry
      77              :     /// sending it.  This is the mechanism by which compute notifications are included in the scope
      78              :     /// of state that we publish externally in an eventually consistent way.
      79              :     pub(crate) pending_compute_notification: bool,
      80              : }
      81              : 
      82          497 : #[derive(Default, Clone, Debug)]
      83              : pub(crate) struct IntentState {
      84              :     pub(crate) attached: Option<NodeId>,
      85              :     pub(crate) secondary: Vec<NodeId>,
      86              : }
      87              : 
      88          497 : #[derive(Default, Clone)]
      89              : pub(crate) struct ObservedState {
      90              :     pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
      91              : }
      92              : 
      93              : /// Our latest knowledge of how this tenant is configured in the outside world.
      94              : ///
      95              : /// Meaning:
      96              : ///     * No instance of this type exists for a node: we are certain that we have nothing configured on that
      97              : ///       node for this shard.
      98              : ///     * Instance exists with conf==None: we *might* have some state on that node, but we don't know
      99              : ///       what it is (e.g. we failed partway through configuring it)
     100              : ///     * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
     101              : ///       and that configuration will still be present unless something external interfered.
     102            5 : #[derive(Clone)]
     103              : pub(crate) struct ObservedStateLocation {
     104              :     /// If None, it means we do not know the status of this shard's location on this node, but
     105              :     /// we know that we might have some state on this node.
     106              :     pub(crate) conf: Option<LocationConfig>,
     107              : }
     108              : pub(crate) struct ReconcilerWaiter {
     109              :     // For observability purposes, remember the ID of the shard we're
     110              :     // waiting for.
     111              :     pub(crate) tenant_shard_id: TenantShardId,
     112              : 
     113              :     seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     114              :     error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     115              :     error: std::sync::Arc<std::sync::Mutex<String>>,
     116              :     seq: Sequence,
     117              : }
     118              : 
     119            2 : #[derive(thiserror::Error, Debug)]
     120              : pub enum ReconcileWaitError {
     121              :     #[error("Timeout waiting for shard {0}")]
     122              :     Timeout(TenantShardId),
     123              :     #[error("shutting down")]
     124              :     Shutdown,
     125              :     #[error("Reconcile error on shard {0}: {1}")]
     126              :     Failed(TenantShardId, String),
     127              : }
     128              : 
     129              : impl ReconcilerWaiter {
     130          484 :     pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
     131          953 :         tokio::select! {
     132          483 :             result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
     133            0 :                 result.map_err(|e| match e {
     134            0 :                     SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
     135            0 :                     SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
     136            0 :                 })?;
     137              :             },
     138            1 :             result = self.error_seq_wait.wait_for(self.seq) => {
     139            0 :                 result.map_err(|e| match e {
     140            0 :                     SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
     141            0 :                     SeqWaitError::Timeout => unreachable!()
     142            0 :                 })?;
     143              : 
     144              :                 return Err(ReconcileWaitError::Failed(self.tenant_shard_id, self.error.lock().unwrap().clone()))
     145              :             }
     146              :         }
     147              : 
     148          483 :         Ok(())
     149          484 :     }
     150              : }
     151              : 
     152              : /// Having spawned a reconciler task, the tenant shard's state will carry enough
     153              : /// information to optionally cancel & await it later.
     154              : pub(crate) struct ReconcilerHandle {
     155              :     sequence: Sequence,
     156              :     handle: JoinHandle<()>,
     157              :     cancel: CancellationToken,
     158              : }
     159              : 
     160              : /// When a reconcile task completes, it sends this result object
     161              : /// to be applied to the primary TenantState.
     162              : pub(crate) struct ReconcileResult {
     163              :     pub(crate) sequence: Sequence,
     164              :     /// On errors, `observed` should be treated as an incompleted description
     165              :     /// of state (i.e. any nodes present in the result should override nodes
     166              :     /// present in the parent tenant state, but any unmentioned nodes should
     167              :     /// not be removed from parent tenant state)
     168              :     pub(crate) result: Result<(), ReconcileError>,
     169              : 
     170              :     pub(crate) tenant_shard_id: TenantShardId,
     171              :     pub(crate) generation: Generation,
     172              :     pub(crate) observed: ObservedState,
     173              : 
     174              :     /// Set [`TenantState::pending_compute_notification`] from this flag
     175              :     pub(crate) pending_compute_notification: bool,
     176              : }
     177              : 
     178              : impl IntentState {
     179            9 :     pub(crate) fn new() -> Self {
     180            9 :         Self {
     181            9 :             attached: None,
     182            9 :             secondary: vec![],
     183            9 :         }
     184            9 :     }
     185         1816 :     pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
     186         1816 :         let mut result = Vec::new();
     187         1816 :         if let Some(p) = self.attached {
     188         1326 :             result.push(p)
     189          490 :         }
     190              : 
     191         1816 :         result.extend(self.secondary.iter().copied());
     192         1816 : 
     193         1816 :         result
     194         1816 :     }
     195              : 
     196              :     /// When a node goes offline, we update intents to avoid using it
     197              :     /// as their attached pageserver.
     198              :     ///
     199              :     /// Returns true if a change was made
     200           13 :     pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool {
     201           13 :         if self.attached == Some(node_id) {
     202            5 :             self.attached = None;
     203            5 :             self.secondary.push(node_id);
     204            5 :             true
     205              :         } else {
     206            8 :             false
     207              :         }
     208           13 :     }
     209              : }
     210              : 
     211              : impl ObservedState {
     212            9 :     pub(crate) fn new() -> Self {
     213            9 :         Self {
     214            9 :             locations: HashMap::new(),
     215            9 :         }
     216            9 :     }
     217              : }
     218              : 
     219              : impl TenantState {
     220          497 :     pub(crate) fn new(
     221          497 :         tenant_shard_id: TenantShardId,
     222          497 :         shard: ShardIdentity,
     223          497 :         policy: PlacementPolicy,
     224          497 :     ) -> Self {
     225          497 :         Self {
     226          497 :             tenant_shard_id,
     227          497 :             policy,
     228          497 :             intent: IntentState::default(),
     229          497 :             generation: Generation::new(0),
     230          497 :             shard,
     231          497 :             observed: ObservedState::default(),
     232          497 :             config: TenantConfig::default(),
     233          497 :             reconciler: None,
     234          497 :             sequence: Sequence(1),
     235          497 :             waiter: Arc::new(SeqWait::new(Sequence(0))),
     236          497 :             error_waiter: Arc::new(SeqWait::new(Sequence(0))),
     237          497 :             last_error: Arc::default(),
     238          497 :             pending_compute_notification: false,
     239          497 :         }
     240          497 :     }
     241              : 
     242              :     /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
     243              :     /// [`ObservedState`], even if it violates my [`PlacementPolicy`].  Call [`Self::schedule`] next,
     244              :     /// to get an intent state that complies with placement policy.  The overall goal is to do scheduling
     245              :     /// in a way that makes use of any configured locations that already exist in the outside world.
     246            9 :     pub(crate) fn intent_from_observed(&mut self) {
     247            9 :         // Choose an attached location by filtering observed locations, and then sorting to get the highest
     248            9 :         // generation
     249            9 :         let mut attached_locs = self
     250            9 :             .observed
     251            9 :             .locations
     252            9 :             .iter()
     253            9 :             .filter_map(|(node_id, l)| {
     254            4 :                 if let Some(conf) = &l.conf {
     255            4 :                     if conf.mode == LocationConfigMode::AttachedMulti
     256            4 :                         || conf.mode == LocationConfigMode::AttachedSingle
     257            0 :                         || conf.mode == LocationConfigMode::AttachedStale
     258              :                     {
     259            4 :                         Some((node_id, conf.generation))
     260              :                     } else {
     261            0 :                         None
     262              :                     }
     263              :                 } else {
     264            0 :                     None
     265              :                 }
     266            9 :             })
     267            9 :             .collect::<Vec<_>>();
     268            9 : 
     269            9 :         attached_locs.sort_by_key(|i| i.1);
     270            9 :         if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
     271            4 :             self.intent.attached = Some(*node_id);
     272            5 :         }
     273              : 
     274              :         // All remaining observed locations generate secondary intents.  This includes None
     275              :         // observations, as these may well have some local content on disk that is usable (this
     276              :         // is an edge case that might occur if we restarted during a migration or other change)
     277            9 :         self.observed.locations.keys().for_each(|node_id| {
     278            4 :             if Some(*node_id) != self.intent.attached {
     279            0 :                 self.intent.secondary.push(*node_id);
     280            4 :             }
     281            9 :         });
     282            9 :     }
     283              : 
     284         1328 :     pub(crate) fn schedule(&mut self, scheduler: &mut Scheduler) -> Result<(), ScheduleError> {
     285         1328 :         // TODO: before scheduling new nodes, check if any existing content in
     286         1328 :         // self.intent refers to pageservers that are offline, and pick other
     287         1328 :         // pageservers if so.
     288         1328 : 
     289         1328 :         // Build the set of pageservers already in use by this tenant, to avoid scheduling
     290         1328 :         // more work on the same pageservers we're already using.
     291         1328 :         let mut used_pageservers = self.intent.all_pageservers();
     292         1328 :         let mut modified = false;
     293         1328 : 
     294         1328 :         use PlacementPolicy::*;
     295         1328 :         match self.policy {
     296              :             Single => {
     297              :                 // Should have exactly one attached, and zero secondaries
     298         1327 :                 if self.intent.attached.is_none() {
     299          489 :                     let node_id = scheduler.schedule_shard(&used_pageservers)?;
     300          489 :                     self.intent.attached = Some(node_id);
     301          489 :                     used_pageservers.push(node_id);
     302          489 :                     modified = true;
     303          838 :                 }
     304         1327 :                 if !self.intent.secondary.is_empty() {
     305            5 :                     self.intent.secondary.clear();
     306            5 :                     modified = true;
     307         1322 :                 }
     308              :             }
     309            0 :             Double(secondary_count) => {
     310            0 :                 // Should have exactly one attached, and N secondaries
     311            0 :                 if self.intent.attached.is_none() {
     312            0 :                     let node_id = scheduler.schedule_shard(&used_pageservers)?;
     313            0 :                     self.intent.attached = Some(node_id);
     314            0 :                     used_pageservers.push(node_id);
     315            0 :                     modified = true;
     316            0 :                 }
     317              : 
     318            0 :                 while self.intent.secondary.len() < secondary_count {
     319            0 :                     let node_id = scheduler.schedule_shard(&used_pageservers)?;
     320            0 :                     self.intent.secondary.push(node_id);
     321            0 :                     used_pageservers.push(node_id);
     322            0 :                     modified = true;
     323              :                 }
     324              :             }
     325              :             Detached => {
     326              :                 // Should have no attached or secondary pageservers
     327            1 :                 if self.intent.attached.is_some() {
     328            0 :                     self.intent.attached = None;
     329            0 :                     modified = true;
     330            1 :                 }
     331              : 
     332            1 :                 if !self.intent.secondary.is_empty() {
     333            0 :                     self.intent.secondary.clear();
     334            0 :                     modified = true;
     335            1 :                 }
     336              :             }
     337              :         }
     338              : 
     339         1328 :         if modified {
     340          489 :             self.sequence.0 += 1;
     341          839 :         }
     342              : 
     343         1328 :         Ok(())
     344         1328 :     }
     345              : 
     346              :     /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
     347              :     /// yield the node ID.  This is appropriate for emitting compute hook notifications: we are checking that
     348              :     /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
     349              :     ///
     350              :     /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
     351              :     /// funciton should not be used to decide whether to reconcile.
     352            9 :     pub(crate) fn stably_attached(&self) -> Option<NodeId> {
     353            9 :         if let Some(attach_intent) = self.intent.attached {
     354            8 :             match self.observed.locations.get(&attach_intent) {
     355            4 :                 Some(loc) => match &loc.conf {
     356            4 :                     Some(conf) => match conf.mode {
     357              :                         LocationConfigMode::AttachedMulti
     358              :                         | LocationConfigMode::AttachedSingle
     359              :                         | LocationConfigMode::AttachedStale => {
     360              :                             // Our intent and observed state agree that this node is in an attached state.
     361            4 :                             Some(attach_intent)
     362              :                         }
     363              :                         // Our observed config is not an attached state
     364            0 :                         _ => None,
     365              :                     },
     366              :                     // Our observed state is None, i.e. in flux
     367            0 :                     None => None,
     368              :                 },
     369              :                 // We have no observed state for this node
     370            4 :                 None => None,
     371              :             }
     372              :         } else {
     373              :             // Our intent is not to attach
     374            1 :             None
     375              :         }
     376            9 :     }
     377              : 
     378         1332 :     fn dirty(&self) -> bool {
     379         1332 :         if let Some(node_id) = self.intent.attached {
     380         1331 :             let wanted_conf = attached_location_conf(self.generation, &self.shard, &self.config);
     381         1331 :             match self.observed.locations.get(&node_id) {
     382          834 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
     383              :                 Some(_) | None => {
     384          497 :                     return true;
     385              :                 }
     386              :             }
     387            1 :         }
     388              : 
     389          835 :         for node_id in &self.intent.secondary {
     390            0 :             let wanted_conf = secondary_location_conf(&self.shard, &self.config);
     391            0 :             match self.observed.locations.get(node_id) {
     392            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
     393              :                 Some(_) | None => {
     394            0 :                     return true;
     395              :                 }
     396              :             }
     397              :         }
     398              : 
     399              :         // Even if there is no pageserver work to be done, if we have a pending notification to computes,
     400              :         // wake up a reconciler to send it.
     401          835 :         if self.pending_compute_notification {
     402            0 :             return true;
     403          835 :         }
     404          835 : 
     405          835 :         false
     406         1332 :     }
     407              : 
     408         1332 :     pub(crate) fn maybe_reconcile(
     409         1332 :         &mut self,
     410         1332 :         result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
     411         1332 :         pageservers: &Arc<HashMap<NodeId, Node>>,
     412         1332 :         compute_hook: &Arc<ComputeHook>,
     413         1332 :         service_config: &service::Config,
     414         1332 :         persistence: &Arc<Persistence>,
     415         1332 :     ) -> Option<ReconcilerWaiter> {
     416         1332 :         // If there are any ambiguous observed states, and the nodes they refer to are available,
     417         1332 :         // we should reconcile to clean them up.
     418         1332 :         let mut dirty_observed = false;
     419         2171 :         for (node_id, observed_loc) in &self.observed.locations {
     420          843 :             let node = pageservers
     421          843 :                 .get(node_id)
     422          843 :                 .expect("Nodes may not be removed while referenced");
     423          843 :             if observed_loc.conf.is_none()
     424            9 :                 && !matches!(node.availability, NodeAvailability::Offline)
     425              :             {
     426            4 :                 dirty_observed = true;
     427            4 :                 break;
     428          839 :             }
     429              :         }
     430              : 
     431         1332 :         if !self.dirty() && !dirty_observed {
     432          835 :             tracing::info!("Not dirty, no reconciliation needed.");
     433          835 :             return None;
     434          497 :         }
     435              : 
     436              :         // Reconcile already in flight for the current sequence?
     437          497 :         if let Some(handle) = &self.reconciler {
     438            9 :             if handle.sequence == self.sequence {
     439            4 :                 return Some(ReconcilerWaiter {
     440            4 :                     tenant_shard_id: self.tenant_shard_id,
     441            4 :                     seq_wait: self.waiter.clone(),
     442            4 :                     error_seq_wait: self.error_waiter.clone(),
     443            4 :                     error: self.last_error.clone(),
     444            4 :                     seq: self.sequence,
     445            4 :                 });
     446            5 :             }
     447          488 :         }
     448              : 
     449              :         // Reconcile in flight for a stale sequence?  Our sequence's task will wait for it before
     450              :         // doing our sequence's work.
     451          493 :         let old_handle = self.reconciler.take();
     452          493 : 
     453          493 :         let cancel = CancellationToken::new();
     454          493 :         let mut reconciler = Reconciler {
     455          493 :             tenant_shard_id: self.tenant_shard_id,
     456          493 :             shard: self.shard,
     457          493 :             generation: self.generation,
     458          493 :             intent: self.intent.clone(),
     459          493 :             config: self.config.clone(),
     460          493 :             observed: self.observed.clone(),
     461          493 :             pageservers: pageservers.clone(),
     462          493 :             compute_hook: compute_hook.clone(),
     463          493 :             service_config: service_config.clone(),
     464          493 :             cancel: cancel.clone(),
     465          493 :             persistence: persistence.clone(),
     466          493 :             compute_notify_failure: false,
     467          493 :         };
     468          493 : 
     469          493 :         let reconcile_seq = self.sequence;
     470          493 : 
     471          493 :         tracing::info!("Spawning Reconciler for sequence {}", self.sequence);
     472          493 :         let must_notify = self.pending_compute_notification;
     473          493 :         let join_handle = tokio::task::spawn(async move {
     474              :             // Wait for any previous reconcile task to complete before we start
     475          493 :             if let Some(old_handle) = old_handle {
     476            5 :                 old_handle.cancel.cancel();
     477            5 :                 if let Err(e) = old_handle.handle.await {
     478              :                     // We can't do much with this other than log it: the task is done, so
     479              :                     // we may proceed with our work.
     480            0 :                     tracing::error!("Unexpected join error waiting for reconcile task: {e}");
     481            5 :                 }
     482          488 :             }
     483              : 
     484              :             // Early check for cancellation before doing any work
     485              :             // TODO: wrap all remote API operations in cancellation check
     486              :             // as well.
     487          493 :             if reconciler.cancel.is_cancelled() {
     488            0 :                 return;
     489          493 :             }
     490              : 
     491              :             // Attempt to make observed state match intent state
     492         2366 :             let result = reconciler.reconcile().await;
     493              : 
     494              :             // If we know we had a pending compute notification from some previous action, send a notification irrespective
     495              :             // of whether the above reconcile() did any work
     496          491 :             if result.is_ok() && must_notify {
     497              :                 // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
     498            0 :                 reconciler.compute_notify().await.ok();
     499          491 :             }
     500              : 
     501          491 :             result_tx
     502          491 :                 .send(ReconcileResult {
     503          491 :                     sequence: reconcile_seq,
     504          491 :                     result,
     505          491 :                     tenant_shard_id: reconciler.tenant_shard_id,
     506          491 :                     generation: reconciler.generation,
     507          491 :                     observed: reconciler.observed,
     508          491 :                     pending_compute_notification: reconciler.compute_notify_failure,
     509          491 :                 })
     510          491 :                 .ok();
     511          493 :         });
     512          493 : 
     513          493 :         self.reconciler = Some(ReconcilerHandle {
     514          493 :             sequence: self.sequence,
     515          493 :             handle: join_handle,
     516          493 :             cancel,
     517          493 :         });
     518          493 : 
     519          493 :         Some(ReconcilerWaiter {
     520          493 :             tenant_shard_id: self.tenant_shard_id,
     521          493 :             seq_wait: self.waiter.clone(),
     522          493 :             error_seq_wait: self.error_waiter.clone(),
     523          493 :             error: self.last_error.clone(),
     524          493 :             seq: self.sequence,
     525          493 :         })
     526         1332 :     }
     527              : }
        

Generated by: LCOV version 2.1-beta