LCOV - code coverage report
Current view: top level - control_plane/attachment_service/src - tenant_state.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 47.3 % 387 183
Test Date: 2024-02-29 11:57:12 Functions: 28.6 % 63 18

            Line data    Source code
       1              : use std::{collections::HashMap, sync::Arc, time::Duration};
       2              : 
       3              : use crate::{metrics, persistence::TenantShardPersistence};
       4              : use pageserver_api::controller_api::NodeAvailability;
       5              : use pageserver_api::{
       6              :     models::{LocationConfig, LocationConfigMode, TenantConfig},
       7              :     shard::{ShardIdentity, TenantShardId},
       8              : };
       9              : use serde::Serialize;
      10              : use tokio::task::JoinHandle;
      11              : use tokio_util::sync::CancellationToken;
      12              : use tracing::{instrument, Instrument};
      13              : use utils::{
      14              :     generation::Generation,
      15              :     id::NodeId,
      16              :     seqwait::{SeqWait, SeqWaitError},
      17              :     sync::gate::Gate,
      18              : };
      19              : 
      20              : use crate::{
      21              :     compute_hook::ComputeHook,
      22              :     node::Node,
      23              :     persistence::{split_state::SplitState, Persistence},
      24              :     reconciler::{
      25              :         attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
      26              :     },
      27              :     scheduler::{ScheduleError, Scheduler},
      28              :     service, PlacementPolicy, Sequence,
      29              : };
      30              : 
      31              : /// Serialization helper
      32            0 : fn read_mutex_content<S, T>(v: &std::sync::Mutex<T>, serializer: S) -> Result<S::Ok, S::Error>
      33            0 : where
      34            0 :     S: serde::ser::Serializer,
      35            0 :     T: Clone + std::fmt::Display,
      36            0 : {
      37            0 :     serializer.collect_str(&v.lock().unwrap())
      38            0 : }
      39              : 
      40              : /// In-memory state for a particular tenant shard.
      41              : ///
      42              : /// This struct implement Serialize for debugging purposes, but is _not_ persisted
      43              : /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
      44            0 : #[derive(Serialize)]
      45              : pub(crate) struct TenantState {
      46              :     pub(crate) tenant_shard_id: TenantShardId,
      47              : 
      48              :     pub(crate) shard: ShardIdentity,
      49              : 
      50              :     // Runtime only: sequence used to coordinate when updating this object while
      51              :     // with background reconcilers may be running.  A reconciler runs to a particular
      52              :     // sequence.
      53              :     pub(crate) sequence: Sequence,
      54              : 
      55              :     // Latest generation number: next time we attach, increment this
      56              :     // and use the incremented number when attaching
      57              :     pub(crate) generation: Generation,
      58              : 
      59              :     // High level description of how the tenant should be set up.  Provided
      60              :     // externally.
      61              :     pub(crate) policy: PlacementPolicy,
      62              : 
      63              :     // Low level description of exactly which pageservers should fulfil
      64              :     // which role.  Generated by `Self::schedule`.
      65              :     pub(crate) intent: IntentState,
      66              : 
      67              :     // Low level description of how the tenant is configured on pageservers:
      68              :     // if this does not match `Self::intent` then the tenant needs reconciliation
      69              :     // with `Self::reconcile`.
      70              :     pub(crate) observed: ObservedState,
      71              : 
      72              :     // Tenant configuration, passed through opaquely to the pageserver.  Identical
      73              :     // for all shards in a tenant.
      74              :     pub(crate) config: TenantConfig,
      75              : 
      76              :     /// If a reconcile task is currently in flight, it may be joined here (it is
      77              :     /// only safe to join if either the result has been received or the reconciler's
      78              :     /// cancellation token has been fired)
      79              :     #[serde(skip)]
      80              :     pub(crate) reconciler: Option<ReconcilerHandle>,
      81              : 
      82              :     /// If a tenant is being split, then all shards with that TenantId will have a
      83              :     /// SplitState set, this acts as a guard against other operations such as background
      84              :     /// reconciliation, and timeline creation.
      85              :     pub(crate) splitting: SplitState,
      86              : 
      87              :     /// Optionally wait for reconciliation to complete up to a particular
      88              :     /// sequence number.
      89              :     #[serde(skip)]
      90              :     pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
      91              : 
      92              :     /// Indicates sequence number for which we have encountered an error reconciling.  If
      93              :     /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
      94              :     /// and callers should stop waiting for `waiter` and propagate the error.
      95              :     #[serde(skip)]
      96              :     pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
      97              : 
      98              :     /// The most recent error from a reconcile on this tenant
      99              :     /// TODO: generalize to an array of recent events
     100              :     /// TOOD: use a ArcSwap instead of mutex for faster reads?
     101              :     #[serde(serialize_with = "read_mutex_content")]
     102              :     pub(crate) last_error: std::sync::Arc<std::sync::Mutex<String>>,
     103              : 
     104              :     /// If we have a pending compute notification that for some reason we weren't able to send,
     105              :     /// set this to true. If this is set, calls to [`Self::maybe_reconcile`] will run a task to retry
     106              :     /// sending it.  This is the mechanism by which compute notifications are included in the scope
     107              :     /// of state that we publish externally in an eventually consistent way.
     108              :     pub(crate) pending_compute_notification: bool,
     109              : }
     110              : 
     111            2 : #[derive(Default, Clone, Debug, Serialize)]
     112              : pub(crate) struct IntentState {
     113              :     attached: Option<NodeId>,
     114              :     secondary: Vec<NodeId>,
     115              : }
     116              : 
     117              : impl IntentState {
     118            4 :     pub(crate) fn new() -> Self {
     119            4 :         Self {
     120            4 :             attached: None,
     121            4 :             secondary: vec![],
     122            4 :         }
     123            4 :     }
     124            0 :     pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
     125            0 :         if let Some(node_id) = node_id {
     126            0 :             scheduler.node_inc_ref(node_id);
     127            0 :         }
     128            0 :         Self {
     129            0 :             attached: node_id,
     130            0 :             secondary: vec![],
     131            0 :         }
     132            0 :     }
     133              : 
     134            6 :     pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
     135            6 :         if self.attached != new_attached {
     136            6 :             if let Some(old_attached) = self.attached.take() {
     137            0 :                 scheduler.node_dec_ref(old_attached);
     138            6 :             }
     139            6 :             if let Some(new_attached) = &new_attached {
     140            6 :                 scheduler.node_inc_ref(*new_attached);
     141            6 :             }
     142            6 :             self.attached = new_attached;
     143            0 :         }
     144            6 :     }
     145              : 
     146              :     /// Like set_attached, but the node is from [`Self::secondary`].  This swaps the node from
     147              :     /// secondary to attached while maintaining the scheduler's reference counts.
     148            2 :     pub(crate) fn promote_attached(
     149            2 :         &mut self,
     150            2 :         _scheduler: &mut Scheduler,
     151            2 :         promote_secondary: NodeId,
     152            2 :     ) {
     153              :         // If we call this with a node that isn't in secondary, it would cause incorrect
     154              :         // scheduler reference counting, since we assume the node is already referenced as a secondary.
     155            2 :         debug_assert!(self.secondary.contains(&promote_secondary));
     156              : 
     157              :         // TODO: when scheduler starts tracking attached + secondary counts separately, we will
     158              :         // need to call into it here.
     159            4 :         self.secondary.retain(|n| n != &promote_secondary);
     160            2 :         self.attached = Some(promote_secondary);
     161            2 :     }
     162              : 
     163            4 :     pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
     164            4 :         debug_assert!(!self.secondary.contains(&new_secondary));
     165            4 :         scheduler.node_inc_ref(new_secondary);
     166            4 :         self.secondary.push(new_secondary);
     167            4 :     }
     168              : 
     169              :     /// It is legal to call this with a node that is not currently a secondary: that is a no-op
     170            0 :     pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
     171            0 :         let index = self.secondary.iter().position(|n| *n == node_id);
     172            0 :         if let Some(index) = index {
     173            0 :             scheduler.node_dec_ref(node_id);
     174            0 :             self.secondary.remove(index);
     175            0 :         }
     176            0 :     }
     177              : 
     178            4 :     pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
     179            4 :         for secondary in self.secondary.drain(..) {
     180            4 :             scheduler.node_dec_ref(secondary);
     181            4 :         }
     182            4 :     }
     183              : 
     184            4 :     pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
     185            4 :         if let Some(old_attached) = self.attached.take() {
     186            4 :             scheduler.node_dec_ref(old_attached);
     187            4 :         }
     188              : 
     189            4 :         self.clear_secondary(scheduler);
     190            4 :     }
     191              : 
     192            2 :     pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
     193            2 :         let mut result = Vec::new();
     194            2 :         if let Some(p) = self.attached {
     195            2 :             result.push(p)
     196            0 :         }
     197              : 
     198            2 :         result.extend(self.secondary.iter().copied());
     199            2 : 
     200            2 :         result
     201            2 :     }
     202              : 
     203            0 :     pub(crate) fn get_attached(&self) -> &Option<NodeId> {
     204            0 :         &self.attached
     205            0 :     }
     206              : 
     207            0 :     pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
     208            0 :         &self.secondary
     209            0 :     }
     210              : 
     211              :     /// When a node goes offline, we update intents to avoid using it
     212              :     /// as their attached pageserver.
     213              :     ///
     214              :     /// Returns true if a change was made
     215            2 :     pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool {
     216            2 :         if self.attached == Some(node_id) {
     217              :             // TODO: when scheduler starts tracking attached + secondary counts separately, we will
     218              :             // need to call into it here.
     219            2 :             self.attached = None;
     220            2 :             self.secondary.push(node_id);
     221            2 :             true
     222              :         } else {
     223            0 :             false
     224              :         }
     225            2 :     }
     226              : }
     227              : 
     228              : impl Drop for IntentState {
     229            6 :     fn drop(&mut self) {
     230              :         // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler
     231            6 :         debug_assert!(self.attached.is_none() && self.secondary.is_empty());
     232            4 :     }
     233              : }
     234              : 
     235            2 : #[derive(Default, Clone, Serialize)]
     236              : pub(crate) struct ObservedState {
     237              :     pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
     238              : }
     239              : 
     240              : /// Our latest knowledge of how this tenant is configured in the outside world.
     241              : ///
     242              : /// Meaning:
     243              : ///     * No instance of this type exists for a node: we are certain that we have nothing configured on that
     244              : ///       node for this shard.
     245              : ///     * Instance exists with conf==None: we *might* have some state on that node, but we don't know
     246              : ///       what it is (e.g. we failed partway through configuring it)
     247              : ///     * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
     248              : ///       and that configuration will still be present unless something external interfered.
     249            0 : #[derive(Clone, Serialize)]
     250              : pub(crate) struct ObservedStateLocation {
     251              :     /// If None, it means we do not know the status of this shard's location on this node, but
     252              :     /// we know that we might have some state on this node.
     253              :     pub(crate) conf: Option<LocationConfig>,
     254              : }
     255              : pub(crate) struct ReconcilerWaiter {
     256              :     // For observability purposes, remember the ID of the shard we're
     257              :     // waiting for.
     258              :     pub(crate) tenant_shard_id: TenantShardId,
     259              : 
     260              :     seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     261              :     error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
     262              :     error: std::sync::Arc<std::sync::Mutex<String>>,
     263              :     seq: Sequence,
     264              : }
     265              : 
     266            0 : #[derive(thiserror::Error, Debug)]
     267              : pub enum ReconcileWaitError {
     268              :     #[error("Timeout waiting for shard {0}")]
     269              :     Timeout(TenantShardId),
     270              :     #[error("shutting down")]
     271              :     Shutdown,
     272              :     #[error("Reconcile error on shard {0}: {1}")]
     273              :     Failed(TenantShardId, String),
     274              : }
     275              : 
     276              : impl ReconcilerWaiter {
     277            0 :     pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
     278            0 :         tokio::select! {
     279            0 :             result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
     280            0 :                 result.map_err(|e| match e {
     281            0 :                     SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
     282            0 :                     SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
     283            0 :                 })?;
     284              :             },
     285            0 :             result = self.error_seq_wait.wait_for(self.seq) => {
     286            0 :                 result.map_err(|e| match e {
     287            0 :                     SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
     288            0 :                     SeqWaitError::Timeout => unreachable!()
     289            0 :                 })?;
     290              : 
     291              :                 return Err(ReconcileWaitError::Failed(self.tenant_shard_id, self.error.lock().unwrap().clone()))
     292              :             }
     293              :         }
     294              : 
     295            0 :         Ok(())
     296            0 :     }
     297              : }
     298              : 
     299              : /// Having spawned a reconciler task, the tenant shard's state will carry enough
     300              : /// information to optionally cancel & await it later.
     301              : pub(crate) struct ReconcilerHandle {
     302              :     sequence: Sequence,
     303              :     handle: JoinHandle<()>,
     304              :     cancel: CancellationToken,
     305              : }
     306              : 
     307              : /// When a reconcile task completes, it sends this result object
     308              : /// to be applied to the primary TenantState.
     309              : pub(crate) struct ReconcileResult {
     310              :     pub(crate) sequence: Sequence,
     311              :     /// On errors, `observed` should be treated as an incompleted description
     312              :     /// of state (i.e. any nodes present in the result should override nodes
     313              :     /// present in the parent tenant state, but any unmentioned nodes should
     314              :     /// not be removed from parent tenant state)
     315              :     pub(crate) result: Result<(), ReconcileError>,
     316              : 
     317              :     pub(crate) tenant_shard_id: TenantShardId,
     318              :     pub(crate) generation: Generation,
     319              :     pub(crate) observed: ObservedState,
     320              : 
     321              :     /// Set [`TenantState::pending_compute_notification`] from this flag
     322              :     pub(crate) pending_compute_notification: bool,
     323              : }
     324              : 
     325              : impl ObservedState {
     326            0 :     pub(crate) fn new() -> Self {
     327            0 :         Self {
     328            0 :             locations: HashMap::new(),
     329            0 :         }
     330            0 :     }
     331              : }
     332              : 
     333              : impl TenantState {
     334            2 :     pub(crate) fn new(
     335            2 :         tenant_shard_id: TenantShardId,
     336            2 :         shard: ShardIdentity,
     337            2 :         policy: PlacementPolicy,
     338            2 :     ) -> Self {
     339            2 :         Self {
     340            2 :             tenant_shard_id,
     341            2 :             policy,
     342            2 :             intent: IntentState::default(),
     343            2 :             generation: Generation::new(0),
     344            2 :             shard,
     345            2 :             observed: ObservedState::default(),
     346            2 :             config: TenantConfig::default(),
     347            2 :             reconciler: None,
     348            2 :             splitting: SplitState::Idle,
     349            2 :             sequence: Sequence(1),
     350            2 :             waiter: Arc::new(SeqWait::new(Sequence(0))),
     351            2 :             error_waiter: Arc::new(SeqWait::new(Sequence(0))),
     352            2 :             last_error: Arc::default(),
     353            2 :             pending_compute_notification: false,
     354            2 :         }
     355            2 :     }
     356              : 
     357              :     /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
     358              :     /// [`ObservedState`], even if it violates my [`PlacementPolicy`].  Call [`Self::schedule`] next,
     359              :     /// to get an intent state that complies with placement policy.  The overall goal is to do scheduling
     360              :     /// in a way that makes use of any configured locations that already exist in the outside world.
     361            0 :     pub(crate) fn intent_from_observed(&mut self) {
     362            0 :         // Choose an attached location by filtering observed locations, and then sorting to get the highest
     363            0 :         // generation
     364            0 :         let mut attached_locs = self
     365            0 :             .observed
     366            0 :             .locations
     367            0 :             .iter()
     368            0 :             .filter_map(|(node_id, l)| {
     369            0 :                 if let Some(conf) = &l.conf {
     370            0 :                     if conf.mode == LocationConfigMode::AttachedMulti
     371            0 :                         || conf.mode == LocationConfigMode::AttachedSingle
     372            0 :                         || conf.mode == LocationConfigMode::AttachedStale
     373              :                     {
     374            0 :                         Some((node_id, conf.generation))
     375              :                     } else {
     376            0 :                         None
     377              :                     }
     378              :                 } else {
     379            0 :                     None
     380              :                 }
     381            0 :             })
     382            0 :             .collect::<Vec<_>>();
     383            0 : 
     384            0 :         attached_locs.sort_by_key(|i| i.1);
     385            0 :         if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
     386            0 :             self.intent.attached = Some(*node_id);
     387            0 :         }
     388              : 
     389              :         // All remaining observed locations generate secondary intents.  This includes None
     390              :         // observations, as these may well have some local content on disk that is usable (this
     391              :         // is an edge case that might occur if we restarted during a migration or other change)
     392              :         //
     393              :         // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
     394              :         // will take care of promoting one of these secondaries to be attached.
     395            0 :         self.observed.locations.keys().for_each(|node_id| {
     396            0 :             if Some(*node_id) != self.intent.attached {
     397            0 :                 self.intent.secondary.push(*node_id);
     398            0 :             }
     399            0 :         });
     400            0 :     }
     401              : 
     402              :     /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
     403              :     /// attached pageserver for a shard.
     404              :     ///
     405              :     /// Returns whether we modified it, and the NodeId selected.
     406            4 :     fn schedule_attached(
     407            4 :         &mut self,
     408            4 :         scheduler: &mut Scheduler,
     409            4 :     ) -> Result<(bool, NodeId), ScheduleError> {
     410              :         // No work to do if we already have an attached tenant
     411            4 :         if let Some(node_id) = self.intent.attached {
     412            0 :             return Ok((false, node_id));
     413            4 :         }
     414              : 
     415            4 :         if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
     416              :             // Promote a secondary
     417            2 :             tracing::debug!("Promoted secondary {} to attached", promote_secondary);
     418            2 :             self.intent.promote_attached(scheduler, promote_secondary);
     419            2 :             Ok((true, promote_secondary))
     420              :         } else {
     421              :             // Pick a fresh node: either we had no secondaries or none were schedulable
     422            2 :             let node_id = scheduler.schedule_shard(&self.intent.secondary)?;
     423            2 :             tracing::debug!("Selected {} as attached", node_id);
     424            2 :             self.intent.set_attached(scheduler, Some(node_id));
     425            2 :             Ok((true, node_id))
     426              :         }
     427            4 :     }
     428              : 
     429            4 :     pub(crate) fn schedule(&mut self, scheduler: &mut Scheduler) -> Result<(), ScheduleError> {
     430            4 :         // TODO: before scheduling new nodes, check if any existing content in
     431            4 :         // self.intent refers to pageservers that are offline, and pick other
     432            4 :         // pageservers if so.
     433            4 : 
     434            4 :         // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
     435            4 :         // change their attach location.
     436            4 : 
     437            4 :         // Build the set of pageservers already in use by this tenant, to avoid scheduling
     438            4 :         // more work on the same pageservers we're already using.
     439            4 :         let mut modified = false;
     440            4 : 
     441            4 :         use PlacementPolicy::*;
     442            4 :         match self.policy {
     443              :             Single => {
     444              :                 // Should have exactly one attached, and zero secondaries
     445            0 :                 let (modified_attached, _attached_node_id) = self.schedule_attached(scheduler)?;
     446            0 :                 modified |= modified_attached;
     447            0 : 
     448            0 :                 if !self.intent.secondary.is_empty() {
     449            0 :                     self.intent.clear_secondary(scheduler);
     450            0 :                     modified = true;
     451            0 :                 }
     452              :             }
     453            4 :             Double(secondary_count) => {
     454              :                 // Should have exactly one attached, and N secondaries
     455            4 :                 let (modified_attached, attached_node_id) = self.schedule_attached(scheduler)?;
     456            4 :                 modified |= modified_attached;
     457            4 : 
     458            4 :                 let mut used_pageservers = vec![attached_node_id];
     459            6 :                 while self.intent.secondary.len() < secondary_count {
     460            2 :                     let node_id = scheduler.schedule_shard(&used_pageservers)?;
     461            2 :                     self.intent.push_secondary(scheduler, node_id);
     462            2 :                     used_pageservers.push(node_id);
     463            2 :                     modified = true;
     464              :                 }
     465              :             }
     466              :             Detached => {
     467              :                 // Should have no attached or secondary pageservers
     468            0 :                 if self.intent.attached.is_some() {
     469            0 :                     self.intent.set_attached(scheduler, None);
     470            0 :                     modified = true;
     471            0 :                 }
     472              : 
     473            0 :                 if !self.intent.secondary.is_empty() {
     474            0 :                     self.intent.clear_secondary(scheduler);
     475            0 :                     modified = true;
     476            0 :                 }
     477              :             }
     478              :         }
     479              : 
     480            4 :         if modified {
     481            4 :             self.sequence.0 += 1;
     482            4 :         }
     483              : 
     484            4 :         Ok(())
     485            4 :     }
     486              : 
     487              :     /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
     488              :     /// yield the node ID.  This is appropriate for emitting compute hook notifications: we are checking that
     489              :     /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
     490              :     ///
     491              :     /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
     492              :     /// funciton should not be used to decide whether to reconcile.
     493            0 :     pub(crate) fn stably_attached(&self) -> Option<NodeId> {
     494            0 :         if let Some(attach_intent) = self.intent.attached {
     495            0 :             match self.observed.locations.get(&attach_intent) {
     496            0 :                 Some(loc) => match &loc.conf {
     497            0 :                     Some(conf) => match conf.mode {
     498              :                         LocationConfigMode::AttachedMulti
     499              :                         | LocationConfigMode::AttachedSingle
     500              :                         | LocationConfigMode::AttachedStale => {
     501              :                             // Our intent and observed state agree that this node is in an attached state.
     502            0 :                             Some(attach_intent)
     503              :                         }
     504              :                         // Our observed config is not an attached state
     505            0 :                         _ => None,
     506              :                     },
     507              :                     // Our observed state is None, i.e. in flux
     508            0 :                     None => None,
     509              :                 },
     510              :                 // We have no observed state for this node
     511            0 :                 None => None,
     512              :             }
     513              :         } else {
     514              :             // Our intent is not to attach
     515            0 :             None
     516              :         }
     517            0 :     }
     518              : 
     519            0 :     fn dirty(&self) -> bool {
     520            0 :         if let Some(node_id) = self.intent.attached {
     521            0 :             let wanted_conf = attached_location_conf(self.generation, &self.shard, &self.config);
     522            0 :             match self.observed.locations.get(&node_id) {
     523            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
     524              :                 Some(_) | None => {
     525            0 :                     return true;
     526              :                 }
     527              :             }
     528            0 :         }
     529              : 
     530            0 :         for node_id in &self.intent.secondary {
     531            0 :             let wanted_conf = secondary_location_conf(&self.shard, &self.config);
     532            0 :             match self.observed.locations.get(node_id) {
     533            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
     534              :                 Some(_) | None => {
     535            0 :                     return true;
     536              :                 }
     537              :             }
     538              :         }
     539              : 
     540            0 :         for node_id in self.observed.locations.keys() {
     541            0 :             if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
     542              :                 // We have observed state that isn't part of our intent: need to clean it up.
     543            0 :                 return true;
     544            0 :             }
     545              :         }
     546              : 
     547              :         // Even if there is no pageserver work to be done, if we have a pending notification to computes,
     548              :         // wake up a reconciler to send it.
     549            0 :         if self.pending_compute_notification {
     550            0 :             return true;
     551            0 :         }
     552            0 : 
     553            0 :         false
     554            0 :     }
     555              : 
     556              :     #[allow(clippy::too_many_arguments)]
     557            0 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
     558              :     pub(crate) fn maybe_reconcile(
     559              :         &mut self,
     560              :         result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
     561              :         pageservers: &Arc<HashMap<NodeId, Node>>,
     562              :         compute_hook: &Arc<ComputeHook>,
     563              :         service_config: &service::Config,
     564              :         persistence: &Arc<Persistence>,
     565              :         gate: &Gate,
     566              :         cancel: &CancellationToken,
     567              :     ) -> Option<ReconcilerWaiter> {
     568              :         // If there are any ambiguous observed states, and the nodes they refer to are available,
     569              :         // we should reconcile to clean them up.
     570              :         let mut dirty_observed = false;
     571              :         for (node_id, observed_loc) in &self.observed.locations {
     572              :             let node = pageservers
     573              :                 .get(node_id)
     574              :                 .expect("Nodes may not be removed while referenced");
     575              :             if observed_loc.conf.is_none()
     576              :                 && !matches!(node.availability, NodeAvailability::Offline)
     577              :             {
     578              :                 dirty_observed = true;
     579              :                 break;
     580              :             }
     581              :         }
     582              : 
     583              :         if !self.dirty() && !dirty_observed {
     584            0 :             tracing::info!("Not dirty, no reconciliation needed.");
     585              :             return None;
     586              :         }
     587              : 
     588              :         // If we are currently splitting, then never start a reconciler task: the splitting logic
     589              :         // requires that shards are not interfered with while it runs. Do this check here rather than
     590              :         // up top, so that we only log this message if we would otherwise have done a reconciliation.
     591              :         if !matches!(self.splitting, SplitState::Idle) {
     592            0 :             tracing::info!("Refusing to reconcile, splitting in progress");
     593              :             return None;
     594              :         }
     595              : 
     596              :         // Reconcile already in flight for the current sequence?
     597              :         if let Some(handle) = &self.reconciler {
     598              :             if handle.sequence == self.sequence {
     599              :                 return Some(ReconcilerWaiter {
     600              :                     tenant_shard_id: self.tenant_shard_id,
     601              :                     seq_wait: self.waiter.clone(),
     602              :                     error_seq_wait: self.error_waiter.clone(),
     603              :                     error: self.last_error.clone(),
     604              :                     seq: self.sequence,
     605              :                 });
     606              :             }
     607              :         }
     608              : 
     609              :         // Reconcile in flight for a stale sequence?  Our sequence's task will wait for it before
     610              :         // doing our sequence's work.
     611              :         let old_handle = self.reconciler.take();
     612              : 
     613              :         let Ok(gate_guard) = gate.enter() else {
     614              :             // Shutting down, don't start a reconciler
     615              :             return None;
     616              :         };
     617              : 
     618              :         let reconciler_cancel = cancel.child_token();
     619              :         let mut reconciler = Reconciler {
     620              :             tenant_shard_id: self.tenant_shard_id,
     621              :             shard: self.shard,
     622              :             generation: self.generation,
     623              :             intent: TargetState::from_intent(&self.intent),
     624              :             config: self.config.clone(),
     625              :             observed: self.observed.clone(),
     626              :             pageservers: pageservers.clone(),
     627              :             compute_hook: compute_hook.clone(),
     628              :             service_config: service_config.clone(),
     629              :             _gate_guard: gate_guard,
     630              :             cancel: reconciler_cancel.clone(),
     631              :             persistence: persistence.clone(),
     632              :             compute_notify_failure: false,
     633              :         };
     634              : 
     635              :         let reconcile_seq = self.sequence;
     636              : 
     637            0 :         tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
     638              :         let must_notify = self.pending_compute_notification;
     639              :         let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
     640              :                                                         tenant_id=%reconciler.tenant_shard_id.tenant_id,
     641              :                                                         shard_id=%reconciler.tenant_shard_id.shard_slug());
     642              :         metrics::RECONCILER.spawned.inc();
     643              :         let join_handle = tokio::task::spawn(
     644            0 :             async move {
     645              :                 // Wait for any previous reconcile task to complete before we start
     646            0 :                 if let Some(old_handle) = old_handle {
     647            0 :                     old_handle.cancel.cancel();
     648            0 :                     if let Err(e) = old_handle.handle.await {
     649              :                         // We can't do much with this other than log it: the task is done, so
     650              :                         // we may proceed with our work.
     651            0 :                         tracing::error!("Unexpected join error waiting for reconcile task: {e}");
     652            0 :                     }
     653            0 :                 }
     654              : 
     655              :                 // Early check for cancellation before doing any work
     656              :                 // TODO: wrap all remote API operations in cancellation check
     657              :                 // as well.
     658            0 :                 if reconciler.cancel.is_cancelled() {
     659            0 :                     metrics::RECONCILER
     660            0 :                         .complete
     661            0 :                         .with_label_values(&[metrics::ReconcilerMetrics::CANCEL])
     662            0 :                         .inc();
     663            0 :                     return;
     664            0 :                 }
     665              : 
     666              :                 // Attempt to make observed state match intent state
     667            0 :                 let result = reconciler.reconcile().await;
     668              : 
     669              :                 // If we know we had a pending compute notification from some previous action, send a notification irrespective
     670              :                 // of whether the above reconcile() did any work
     671            0 :                 if result.is_ok() && must_notify {
     672              :                     // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
     673            0 :                     reconciler.compute_notify().await.ok();
     674            0 :                 }
     675              : 
     676              :                 // Update result counter
     677            0 :                 match &result {
     678            0 :                     Ok(_) => metrics::RECONCILER
     679            0 :                         .complete
     680            0 :                         .with_label_values(&[metrics::ReconcilerMetrics::SUCCESS]),
     681            0 :                     Err(ReconcileError::Cancel) => metrics::RECONCILER
     682            0 :                         .complete
     683            0 :                         .with_label_values(&[metrics::ReconcilerMetrics::CANCEL]),
     684            0 :                     Err(_) => metrics::RECONCILER
     685            0 :                         .complete
     686            0 :                         .with_label_values(&[metrics::ReconcilerMetrics::ERROR]),
     687              :                 }
     688            0 :                 .inc();
     689            0 : 
     690            0 :                 result_tx
     691            0 :                     .send(ReconcileResult {
     692            0 :                         sequence: reconcile_seq,
     693            0 :                         result,
     694            0 :                         tenant_shard_id: reconciler.tenant_shard_id,
     695            0 :                         generation: reconciler.generation,
     696            0 :                         observed: reconciler.observed,
     697            0 :                         pending_compute_notification: reconciler.compute_notify_failure,
     698            0 :                     })
     699            0 :                     .ok();
     700            0 :             }
     701              :             .instrument(reconciler_span),
     702              :         );
     703              : 
     704              :         self.reconciler = Some(ReconcilerHandle {
     705              :             sequence: self.sequence,
     706              :             handle: join_handle,
     707              :             cancel: reconciler_cancel,
     708              :         });
     709              : 
     710              :         Some(ReconcilerWaiter {
     711              :             tenant_shard_id: self.tenant_shard_id,
     712              :             seq_wait: self.waiter.clone(),
     713              :             error_seq_wait: self.error_waiter.clone(),
     714              :             error: self.last_error.clone(),
     715              :             seq: self.sequence,
     716              :         })
     717              :     }
     718              : 
     719              :     // If we had any state at all referring to this node ID, drop it.  Does not
     720              :     // attempt to reschedule.
     721            0 :     pub(crate) fn deref_node(&mut self, node_id: NodeId) {
     722            0 :         if self.intent.attached == Some(node_id) {
     723            0 :             self.intent.attached = None;
     724            0 :         }
     725              : 
     726            0 :         self.intent.secondary.retain(|n| n != &node_id);
     727            0 : 
     728            0 :         self.observed.locations.remove(&node_id);
     729              : 
     730            0 :         debug_assert!(!self.intent.all_pageservers().contains(&node_id));
     731            0 :     }
     732              : 
     733            0 :     pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
     734            0 :         TenantShardPersistence {
     735            0 :             tenant_id: self.tenant_shard_id.tenant_id.to_string(),
     736            0 :             shard_number: self.tenant_shard_id.shard_number.0 as i32,
     737            0 :             shard_count: self.tenant_shard_id.shard_count.literal() as i32,
     738            0 :             shard_stripe_size: self.shard.stripe_size.0 as i32,
     739            0 :             generation: self.generation.into().unwrap_or(0) as i32,
     740            0 :             generation_pageserver: self
     741            0 :                 .intent
     742            0 :                 .get_attached()
     743            0 :                 .map(|n| n.0 as i64)
     744            0 :                 .unwrap_or(i64::MAX),
     745            0 : 
     746            0 :             placement_policy: serde_json::to_string(&self.policy).unwrap(),
     747            0 :             config: serde_json::to_string(&self.config).unwrap(),
     748            0 :             splitting: SplitState::default(),
     749            0 :         }
     750            0 :     }
     751              : }
     752              : 
     753              : #[cfg(test)]
     754              : pub(crate) mod tests {
     755              :     use pageserver_api::shard::{ShardCount, ShardNumber};
     756              :     use utils::id::TenantId;
     757              : 
     758              :     use crate::scheduler::test_utils::make_test_nodes;
     759              : 
     760              :     use super::*;
     761              : 
     762            2 :     fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantState {
     763            2 :         let tenant_id = TenantId::generate();
     764            2 :         let shard_number = ShardNumber(0);
     765            2 :         let shard_count = ShardCount::new(1);
     766            2 : 
     767            2 :         let tenant_shard_id = TenantShardId {
     768            2 :             tenant_id,
     769            2 :             shard_number,
     770            2 :             shard_count,
     771            2 :         };
     772            2 :         TenantState::new(
     773            2 :             tenant_shard_id,
     774            2 :             ShardIdentity::new(
     775            2 :                 shard_number,
     776            2 :                 shard_count,
     777            2 :                 pageserver_api::shard::ShardStripeSize(32768),
     778            2 :             )
     779            2 :             .unwrap(),
     780            2 :             policy,
     781            2 :         )
     782            2 :     }
     783              : 
     784              :     /// Test the scheduling behaviors used when a tenant configured for HA is subject
     785              :     /// to nodes being marked offline.
     786            2 :     #[test]
     787            2 :     fn tenant_ha_scheduling() -> anyhow::Result<()> {
     788            2 :         // Start with three nodes.  Our tenant will only use two.  The third one is
     789            2 :         // expected to remain unused.
     790            2 :         let mut nodes = make_test_nodes(3);
     791            2 : 
     792            2 :         let mut scheduler = Scheduler::new(nodes.values());
     793            2 : 
     794            2 :         let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Double(1));
     795            2 :         tenant_state
     796            2 :             .schedule(&mut scheduler)
     797            2 :             .expect("we have enough nodes, scheduling should work");
     798            2 : 
     799            2 :         // Expect to initially be schedule on to different nodes
     800            2 :         assert_eq!(tenant_state.intent.secondary.len(), 1);
     801            2 :         assert!(tenant_state.intent.attached.is_some());
     802              : 
     803            2 :         let attached_node_id = tenant_state.intent.attached.unwrap();
     804            2 :         let secondary_node_id = *tenant_state.intent.secondary.iter().last().unwrap();
     805            2 :         assert_ne!(attached_node_id, secondary_node_id);
     806              : 
     807              :         // Notifying the attached node is offline should demote it to a secondary
     808            2 :         let changed = tenant_state.intent.notify_offline(attached_node_id);
     809            2 :         assert!(changed);
     810              : 
     811              :         // Update the scheduler state to indicate the node is offline
     812            2 :         nodes.get_mut(&attached_node_id).unwrap().availability = NodeAvailability::Offline;
     813            2 :         scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
     814            2 : 
     815            2 :         // Scheduling the node should promote the still-available secondary node to attached
     816            2 :         tenant_state
     817            2 :             .schedule(&mut scheduler)
     818            2 :             .expect("active nodes are available");
     819            2 :         assert_eq!(tenant_state.intent.attached.unwrap(), secondary_node_id);
     820              : 
     821              :         // The original attached node should have been retained as a secondary
     822            2 :         assert_eq!(
     823            2 :             *tenant_state.intent.secondary.iter().last().unwrap(),
     824            2 :             attached_node_id
     825            2 :         );
     826              : 
     827            2 :         tenant_state.intent.clear(&mut scheduler);
     828            2 : 
     829            2 :         Ok(())
     830            2 :     }
     831              : }
        

Generated by: LCOV version 2.1-beta