Line data Source code
1 : use std::sync::Arc;
2 :
3 : use hyper::Uri;
4 : use tokio_util::sync::CancellationToken;
5 :
6 : use crate::peer_client::{GlobalObservedState, PeerClient};
7 : use crate::persistence::{ControllerPersistence, DatabaseError, DatabaseResult, Persistence};
8 : use crate::service::Config;
9 :
10 : /// Helper for storage controller leadership acquisition
11 : pub(crate) struct Leadership {
12 : persistence: Arc<Persistence>,
13 : config: Config,
14 : cancel: CancellationToken,
15 : }
16 :
17 : #[derive(thiserror::Error, Debug)]
18 : pub(crate) enum Error {
19 : #[error(transparent)]
20 : Database(#[from] DatabaseError),
21 : }
22 :
23 : pub(crate) type Result<T> = std::result::Result<T, Error>;
24 :
25 : impl Leadership {
26 0 : pub(crate) fn new(
27 0 : persistence: Arc<Persistence>,
28 0 : config: Config,
29 0 : cancel: CancellationToken,
30 0 : ) -> Self {
31 0 : Self {
32 0 : persistence,
33 0 : config,
34 0 : cancel,
35 0 : }
36 0 : }
37 :
38 : /// Find the current leader in the database and request it to step down if required.
39 : /// Should be called early on in within the start-up sequence.
40 : ///
41 : /// Returns a tuple of two optionals: the current leader and its observed state
42 0 : pub(crate) async fn step_down_current_leader(
43 0 : &self,
44 0 : ) -> Result<(Option<ControllerPersistence>, Option<GlobalObservedState>)> {
45 0 : let leader = self.current_leader().await?;
46 :
47 0 : if leader.as_ref().map(|l| &l.address)
48 0 : == self
49 0 : .config
50 0 : .address_for_peers
51 0 : .as_ref()
52 0 : .map(Uri::to_string)
53 0 : .as_ref()
54 : {
55 : // We already are the current leader. This is a restart.
56 0 : return Ok((leader, None));
57 0 : }
58 :
59 0 : let leader_step_down_state = if let Some(ref leader) = leader {
60 0 : if self.config.start_as_candidate {
61 0 : self.request_step_down(leader).await
62 : } else {
63 0 : None
64 : }
65 : } else {
66 0 : tracing::info!("No leader found to request step down from. Will build observed state.");
67 0 : None
68 : };
69 :
70 0 : Ok((leader, leader_step_down_state))
71 0 : }
72 :
73 : /// Mark the current storage controller instance as the leader in the database
74 0 : pub(crate) async fn become_leader(
75 0 : &self,
76 0 : current_leader: Option<ControllerPersistence>,
77 0 : ) -> Result<()> {
78 0 : if let Some(address_for_peers) = &self.config.address_for_peers {
79 : // TODO: `address-for-peers` can become a mandatory cli arg
80 : // after we update the k8s setup
81 0 : let proposed_leader = ControllerPersistence {
82 0 : address: address_for_peers.to_string(),
83 0 : started_at: chrono::Utc::now(),
84 0 : };
85 0 :
86 0 : self.persistence
87 0 : .update_leader(current_leader, proposed_leader)
88 0 : .await
89 0 : .map_err(Error::Database)
90 : } else {
91 0 : tracing::info!("No address-for-peers provided. Skipping leader persistence.");
92 0 : Ok(())
93 : }
94 0 : }
95 :
96 0 : async fn current_leader(&self) -> DatabaseResult<Option<ControllerPersistence>> {
97 0 : let res = self.persistence.get_leader().await;
98 0 : if let Err(DatabaseError::Query(diesel::result::Error::DatabaseError(_kind, ref err))) = res
99 : {
100 : const REL_NOT_FOUND_MSG: &str = "relation \"controllers\" does not exist";
101 0 : if err.message().trim() == REL_NOT_FOUND_MSG {
102 : // Special case: if this is a brand new storage controller, migrations will not
103 : // have run at this point yet, and, hence, the controllers table does not exist.
104 : // Detect this case via the error string (diesel doesn't type it) and allow it.
105 0 : tracing::info!(
106 0 : "Detected first storage controller start-up. Allowing missing controllers table ..."
107 : );
108 0 : return Ok(None);
109 0 : }
110 0 : }
111 :
112 0 : res
113 0 : }
114 :
115 : /// Request step down from the currently registered leader in the database
116 : ///
117 : /// If such an entry is persisted, the success path returns the observed
118 : /// state and details of the leader. Otherwise, None is returned indicating
119 : /// there is no leader currently.
120 0 : async fn request_step_down(
121 0 : &self,
122 0 : leader: &ControllerPersistence,
123 0 : ) -> Option<GlobalObservedState> {
124 0 : tracing::info!("Sending step down request to {leader:?}");
125 :
126 0 : let mut http_client = reqwest::Client::builder();
127 0 : for cert in &self.config.ssl_ca_certs {
128 0 : http_client = http_client.add_root_certificate(cert.clone());
129 0 : }
130 0 : let http_client = match http_client.build() {
131 0 : Ok(http_client) => http_client,
132 0 : Err(err) => {
133 0 : tracing::error!("Failed to build client for leader step-down request: {err}");
134 0 : return None;
135 : }
136 : };
137 :
138 0 : let client = PeerClient::new(
139 0 : http_client,
140 0 : Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"),
141 0 : self.config.peer_jwt_token.clone(),
142 0 : );
143 0 : let state = client.step_down(&self.cancel).await;
144 0 : match state {
145 0 : Ok(state) => Some(state),
146 0 : Err(err) => {
147 0 : // TODO: Make leaders periodically update a timestamp field in the
148 0 : // database and, if the leader is not reachable from the current instance,
149 0 : // but inferred as alive from the timestamp, abort start-up. This avoids
150 0 : // a potential scenario in which we have two controllers acting as leaders.
151 0 : tracing::error!(
152 0 : "Leader ({}) did not respond to step-down request: {}",
153 : leader.address,
154 : err
155 : );
156 :
157 0 : None
158 : }
159 : }
160 0 : }
161 : }
|