LCOV - code coverage report
Current view: top level - storage_controller/src - leadership.rs (source / functions) Coverage Total Hit
Test: 09e7485004805bd42b53a0c369170b3228136512.info Lines: 0.0 % 72 0
Test Date: 2024-11-21 18:36:18 Functions: 0.0 % 12 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use hyper::Uri;
       4              : use tokio_util::sync::CancellationToken;
       5              : 
       6              : use crate::{
       7              :     peer_client::{GlobalObservedState, PeerClient},
       8              :     persistence::{ControllerPersistence, DatabaseError, DatabaseResult, Persistence},
       9              :     service::Config,
      10              : };
      11              : 
      12              : /// Helper for storage controller leadership acquisition
      13              : pub(crate) struct Leadership {
      14              :     persistence: Arc<Persistence>,
      15              :     config: Config,
      16              :     cancel: CancellationToken,
      17              : }
      18              : 
      19            0 : #[derive(thiserror::Error, Debug)]
      20              : pub(crate) enum Error {
      21              :     #[error(transparent)]
      22              :     Database(#[from] DatabaseError),
      23              : }
      24              : 
      25              : pub(crate) type Result<T> = std::result::Result<T, Error>;
      26              : 
      27              : impl Leadership {
      28            0 :     pub(crate) fn new(
      29            0 :         persistence: Arc<Persistence>,
      30            0 :         config: Config,
      31            0 :         cancel: CancellationToken,
      32            0 :     ) -> Self {
      33            0 :         Self {
      34            0 :             persistence,
      35            0 :             config,
      36            0 :             cancel,
      37            0 :         }
      38            0 :     }
      39              : 
      40              :     /// Find the current leader in the database and request it to step down if required.
      41              :     /// Should be called early on in within the start-up sequence.
      42              :     ///
      43              :     /// Returns a tuple of two optionals: the current leader and its observed state
      44            0 :     pub(crate) async fn step_down_current_leader(
      45            0 :         &self,
      46            0 :     ) -> Result<(Option<ControllerPersistence>, Option<GlobalObservedState>)> {
      47            0 :         let leader = self.current_leader().await?;
      48            0 :         let leader_step_down_state = if let Some(ref leader) = leader {
      49            0 :             if self.config.start_as_candidate {
      50            0 :                 self.request_step_down(leader).await
      51              :             } else {
      52            0 :                 None
      53              :             }
      54              :         } else {
      55            0 :             tracing::info!("No leader found to request step down from. Will build observed state.");
      56            0 :             None
      57              :         };
      58              : 
      59            0 :         Ok((leader, leader_step_down_state))
      60            0 :     }
      61              : 
      62              :     /// Mark the current storage controller instance as the leader in the database
      63            0 :     pub(crate) async fn become_leader(
      64            0 :         &self,
      65            0 :         current_leader: Option<ControllerPersistence>,
      66            0 :     ) -> Result<()> {
      67            0 :         if let Some(address_for_peers) = &self.config.address_for_peers {
      68              :             // TODO: `address-for-peers` can become a mandatory cli arg
      69              :             // after we update the k8s setup
      70            0 :             let proposed_leader = ControllerPersistence {
      71            0 :                 address: address_for_peers.to_string(),
      72            0 :                 started_at: chrono::Utc::now(),
      73            0 :             };
      74            0 : 
      75            0 :             self.persistence
      76            0 :                 .update_leader(current_leader, proposed_leader)
      77            0 :                 .await
      78            0 :                 .map_err(Error::Database)
      79              :         } else {
      80            0 :             tracing::info!("No address-for-peers provided. Skipping leader persistence.");
      81            0 :             Ok(())
      82              :         }
      83            0 :     }
      84              : 
      85            0 :     async fn current_leader(&self) -> DatabaseResult<Option<ControllerPersistence>> {
      86            0 :         let res = self.persistence.get_leader().await;
      87            0 :         if let Err(DatabaseError::Query(diesel::result::Error::DatabaseError(_kind, ref err))) = res
      88              :         {
      89              :             const REL_NOT_FOUND_MSG: &str = "relation \"controllers\" does not exist";
      90            0 :             if err.message().trim() == REL_NOT_FOUND_MSG {
      91              :                 // Special case: if this is a brand new storage controller, migrations will not
      92              :                 // have run at this point yet, and, hence, the controllers table does not exist.
      93              :                 // Detect this case via the error string (diesel doesn't type it) and allow it.
      94            0 :                 tracing::info!("Detected first storage controller start-up. Allowing missing controllers table ...");
      95            0 :                 return Ok(None);
      96            0 :             }
      97            0 :         }
      98              : 
      99            0 :         res
     100            0 :     }
     101              : 
     102              :     /// Request step down from the currently registered leader in the database
     103              :     ///
     104              :     /// If such an entry is persisted, the success path returns the observed
     105              :     /// state and details of the leader. Otherwise, None is returned indicating
     106              :     /// there is no leader currently.
     107            0 :     async fn request_step_down(
     108            0 :         &self,
     109            0 :         leader: &ControllerPersistence,
     110            0 :     ) -> Option<GlobalObservedState> {
     111            0 :         tracing::info!("Sending step down request to {leader:?}");
     112              : 
     113            0 :         let client = PeerClient::new(
     114            0 :             Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"),
     115            0 :             self.config.peer_jwt_token.clone(),
     116            0 :         );
     117            0 :         let state = client.step_down(&self.cancel).await;
     118            0 :         match state {
     119            0 :             Ok(state) => Some(state),
     120            0 :             Err(err) => {
     121            0 :                 // TODO: Make leaders periodically update a timestamp field in the
     122            0 :                 // database and, if the leader is not reachable from the current instance,
     123            0 :                 // but inferred as alive from the timestamp, abort start-up. This avoids
     124            0 :                 // a potential scenario in which we have two controllers acting as leaders.
     125            0 :                 tracing::error!(
     126            0 :                     "Leader ({}) did not respond to step-down request: {}",
     127              :                     leader.address,
     128              :                     err
     129              :                 );
     130              : 
     131            0 :                 None
     132              :             }
     133              :         }
     134            0 :     }
     135              : }
        

Generated by: LCOV version 2.1-beta