LCOV - code coverage report
Current view: top level - storage_controller/src - leadership.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 0.0 % 91 0
Test Date: 2025-04-24 20:31:15 Functions: 0.0 % 10 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use hyper::Uri;
       4              : use tokio_util::sync::CancellationToken;
       5              : 
       6              : use crate::peer_client::{GlobalObservedState, PeerClient};
       7              : use crate::persistence::{ControllerPersistence, DatabaseError, DatabaseResult, Persistence};
       8              : use crate::service::Config;
       9              : 
      10              : /// Helper for storage controller leadership acquisition
      11              : pub(crate) struct Leadership {
      12              :     persistence: Arc<Persistence>,
      13              :     config: Config,
      14              :     cancel: CancellationToken,
      15              : }
      16              : 
      17              : #[derive(thiserror::Error, Debug)]
      18              : pub(crate) enum Error {
      19              :     #[error(transparent)]
      20              :     Database(#[from] DatabaseError),
      21              : }
      22              : 
      23              : pub(crate) type Result<T> = std::result::Result<T, Error>;
      24              : 
      25              : impl Leadership {
      26            0 :     pub(crate) fn new(
      27            0 :         persistence: Arc<Persistence>,
      28            0 :         config: Config,
      29            0 :         cancel: CancellationToken,
      30            0 :     ) -> Self {
      31            0 :         Self {
      32            0 :             persistence,
      33            0 :             config,
      34            0 :             cancel,
      35            0 :         }
      36            0 :     }
      37              : 
      38              :     /// Find the current leader in the database and request it to step down if required.
      39              :     /// Should be called early on in within the start-up sequence.
      40              :     ///
      41              :     /// Returns a tuple of two optionals: the current leader and its observed state
      42            0 :     pub(crate) async fn step_down_current_leader(
      43            0 :         &self,
      44            0 :     ) -> Result<(Option<ControllerPersistence>, Option<GlobalObservedState>)> {
      45            0 :         let leader = self.current_leader().await?;
      46              : 
      47            0 :         if leader.as_ref().map(|l| &l.address)
      48            0 :             == self
      49            0 :                 .config
      50            0 :                 .address_for_peers
      51            0 :                 .as_ref()
      52            0 :                 .map(Uri::to_string)
      53            0 :                 .as_ref()
      54              :         {
      55              :             // We already are the current leader. This is a restart.
      56            0 :             return Ok((leader, None));
      57            0 :         }
      58              : 
      59            0 :         let leader_step_down_state = if let Some(ref leader) = leader {
      60            0 :             if self.config.start_as_candidate {
      61            0 :                 self.request_step_down(leader).await
      62              :             } else {
      63            0 :                 None
      64              :             }
      65              :         } else {
      66            0 :             tracing::info!("No leader found to request step down from. Will build observed state.");
      67            0 :             None
      68              :         };
      69              : 
      70            0 :         Ok((leader, leader_step_down_state))
      71            0 :     }
      72              : 
      73              :     /// Mark the current storage controller instance as the leader in the database
      74            0 :     pub(crate) async fn become_leader(
      75            0 :         &self,
      76            0 :         current_leader: Option<ControllerPersistence>,
      77            0 :     ) -> Result<()> {
      78            0 :         if let Some(address_for_peers) = &self.config.address_for_peers {
      79              :             // TODO: `address-for-peers` can become a mandatory cli arg
      80              :             // after we update the k8s setup
      81            0 :             let proposed_leader = ControllerPersistence {
      82            0 :                 address: address_for_peers.to_string(),
      83            0 :                 started_at: chrono::Utc::now(),
      84            0 :             };
      85            0 : 
      86            0 :             self.persistence
      87            0 :                 .update_leader(current_leader, proposed_leader)
      88            0 :                 .await
      89            0 :                 .map_err(Error::Database)
      90              :         } else {
      91            0 :             tracing::info!("No address-for-peers provided. Skipping leader persistence.");
      92            0 :             Ok(())
      93              :         }
      94            0 :     }
      95              : 
      96            0 :     async fn current_leader(&self) -> DatabaseResult<Option<ControllerPersistence>> {
      97            0 :         let res = self.persistence.get_leader().await;
      98            0 :         if let Err(DatabaseError::Query(diesel::result::Error::DatabaseError(_kind, ref err))) = res
      99              :         {
     100              :             const REL_NOT_FOUND_MSG: &str = "relation \"controllers\" does not exist";
     101            0 :             if err.message().trim() == REL_NOT_FOUND_MSG {
     102              :                 // Special case: if this is a brand new storage controller, migrations will not
     103              :                 // have run at this point yet, and, hence, the controllers table does not exist.
     104              :                 // Detect this case via the error string (diesel doesn't type it) and allow it.
     105            0 :                 tracing::info!(
     106            0 :                     "Detected first storage controller start-up. Allowing missing controllers table ..."
     107              :                 );
     108            0 :                 return Ok(None);
     109            0 :             }
     110            0 :         }
     111              : 
     112            0 :         res
     113            0 :     }
     114              : 
     115              :     /// Request step down from the currently registered leader in the database
     116              :     ///
     117              :     /// If such an entry is persisted, the success path returns the observed
     118              :     /// state and details of the leader. Otherwise, None is returned indicating
     119              :     /// there is no leader currently.
     120            0 :     async fn request_step_down(
     121            0 :         &self,
     122            0 :         leader: &ControllerPersistence,
     123            0 :     ) -> Option<GlobalObservedState> {
     124            0 :         tracing::info!("Sending step down request to {leader:?}");
     125              : 
     126            0 :         let mut http_client = reqwest::Client::builder();
     127            0 :         for cert in &self.config.ssl_ca_certs {
     128            0 :             http_client = http_client.add_root_certificate(cert.clone());
     129            0 :         }
     130            0 :         let http_client = match http_client.build() {
     131            0 :             Ok(http_client) => http_client,
     132            0 :             Err(err) => {
     133            0 :                 tracing::error!("Failed to build client for leader step-down request: {err}");
     134            0 :                 return None;
     135              :             }
     136              :         };
     137              : 
     138            0 :         let client = PeerClient::new(
     139            0 :             http_client,
     140            0 :             Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"),
     141            0 :             self.config.peer_jwt_token.clone(),
     142            0 :         );
     143            0 :         let state = client.step_down(&self.cancel).await;
     144            0 :         match state {
     145            0 :             Ok(state) => Some(state),
     146            0 :             Err(err) => {
     147            0 :                 // TODO: Make leaders periodically update a timestamp field in the
     148            0 :                 // database and, if the leader is not reachable from the current instance,
     149            0 :                 // but inferred as alive from the timestamp, abort start-up. This avoids
     150            0 :                 // a potential scenario in which we have two controllers acting as leaders.
     151            0 :                 tracing::error!(
     152            0 :                     "Leader ({}) did not respond to step-down request: {}",
     153              :                     leader.address,
     154              :                     err
     155              :                 );
     156              : 
     157            0 :                 None
     158              :             }
     159              :         }
     160            0 :     }
     161              : }
        

Generated by: LCOV version 2.1-beta