Line data Source code
1 : use std::sync::Arc;
2 :
3 : use hyper::Uri;
4 : use tokio_util::sync::CancellationToken;
5 :
6 : use crate::peer_client::{GlobalObservedState, PeerClient};
7 : use crate::persistence::{ControllerPersistence, DatabaseError, DatabaseResult, Persistence};
8 : use crate::service::Config;
9 :
10 : /// Helper for storage controller leadership acquisition
11 : pub(crate) struct Leadership {
12 : persistence: Arc<Persistence>,
13 : config: Config,
14 : cancel: CancellationToken,
15 : }
16 :
17 : #[derive(thiserror::Error, Debug)]
18 : pub(crate) enum Error {
19 : #[error(transparent)]
20 : Database(#[from] DatabaseError),
21 : }
22 :
23 : pub(crate) type Result<T> = std::result::Result<T, Error>;
24 :
25 : impl Leadership {
26 0 : pub(crate) fn new(
27 0 : persistence: Arc<Persistence>,
28 0 : config: Config,
29 0 : cancel: CancellationToken,
30 0 : ) -> Self {
31 0 : Self {
32 0 : persistence,
33 0 : config,
34 0 : cancel,
35 0 : }
36 0 : }
37 :
38 : /// Find the current leader in the database and request it to step down if required.
39 : /// Should be called early on in within the start-up sequence.
40 : ///
41 : /// Returns a tuple of two optionals: the current leader and its observed state
42 0 : pub(crate) async fn step_down_current_leader(
43 0 : &self,
44 0 : ) -> Result<(Option<ControllerPersistence>, Option<GlobalObservedState>)> {
45 0 : let leader = self.current_leader().await?;
46 0 : let leader_step_down_state = if let Some(ref leader) = leader {
47 0 : if self.config.start_as_candidate {
48 0 : self.request_step_down(leader).await
49 : } else {
50 0 : None
51 : }
52 : } else {
53 0 : tracing::info!("No leader found to request step down from. Will build observed state.");
54 0 : None
55 : };
56 :
57 0 : Ok((leader, leader_step_down_state))
58 0 : }
59 :
60 : /// Mark the current storage controller instance as the leader in the database
61 0 : pub(crate) async fn become_leader(
62 0 : &self,
63 0 : current_leader: Option<ControllerPersistence>,
64 0 : ) -> Result<()> {
65 0 : if let Some(address_for_peers) = &self.config.address_for_peers {
66 : // TODO: `address-for-peers` can become a mandatory cli arg
67 : // after we update the k8s setup
68 0 : let proposed_leader = ControllerPersistence {
69 0 : address: address_for_peers.to_string(),
70 0 : started_at: chrono::Utc::now(),
71 0 : };
72 0 :
73 0 : self.persistence
74 0 : .update_leader(current_leader, proposed_leader)
75 0 : .await
76 0 : .map_err(Error::Database)
77 : } else {
78 0 : tracing::info!("No address-for-peers provided. Skipping leader persistence.");
79 0 : Ok(())
80 : }
81 0 : }
82 :
83 0 : async fn current_leader(&self) -> DatabaseResult<Option<ControllerPersistence>> {
84 0 : let res = self.persistence.get_leader().await;
85 0 : if let Err(DatabaseError::Query(diesel::result::Error::DatabaseError(_kind, ref err))) = res
86 : {
87 : const REL_NOT_FOUND_MSG: &str = "relation \"controllers\" does not exist";
88 0 : if err.message().trim() == REL_NOT_FOUND_MSG {
89 : // Special case: if this is a brand new storage controller, migrations will not
90 : // have run at this point yet, and, hence, the controllers table does not exist.
91 : // Detect this case via the error string (diesel doesn't type it) and allow it.
92 0 : tracing::info!(
93 0 : "Detected first storage controller start-up. Allowing missing controllers table ..."
94 : );
95 0 : return Ok(None);
96 0 : }
97 0 : }
98 :
99 0 : res
100 0 : }
101 :
102 : /// Request step down from the currently registered leader in the database
103 : ///
104 : /// If such an entry is persisted, the success path returns the observed
105 : /// state and details of the leader. Otherwise, None is returned indicating
106 : /// there is no leader currently.
107 0 : async fn request_step_down(
108 0 : &self,
109 0 : leader: &ControllerPersistence,
110 0 : ) -> Option<GlobalObservedState> {
111 0 : tracing::info!("Sending step down request to {leader:?}");
112 :
113 0 : let client = PeerClient::new(
114 0 : Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"),
115 0 : self.config.peer_jwt_token.clone(),
116 0 : );
117 0 : let state = client.step_down(&self.cancel).await;
118 0 : match state {
119 0 : Ok(state) => Some(state),
120 0 : Err(err) => {
121 0 : // TODO: Make leaders periodically update a timestamp field in the
122 0 : // database and, if the leader is not reachable from the current instance,
123 0 : // but inferred as alive from the timestamp, abort start-up. This avoids
124 0 : // a potential scenario in which we have two controllers acting as leaders.
125 0 : tracing::error!(
126 0 : "Leader ({}) did not respond to step-down request: {}",
127 : leader.address,
128 : err
129 : );
130 :
131 0 : None
132 : }
133 : }
134 0 : }
135 : }
|