Line data Source code
1 : use std::sync::Arc;
2 :
3 : use hyper::Uri;
4 : use tokio_util::sync::CancellationToken;
5 :
6 : use crate::{
7 : peer_client::{GlobalObservedState, PeerClient},
8 : persistence::{ControllerPersistence, DatabaseError, DatabaseResult, Persistence},
9 : service::Config,
10 : };
11 :
12 : /// Helper for storage controller leadership acquisition
13 : pub(crate) struct Leadership {
14 : persistence: Arc<Persistence>,
15 : config: Config,
16 : cancel: CancellationToken,
17 : }
18 :
19 0 : #[derive(thiserror::Error, Debug)]
20 : pub(crate) enum Error {
21 : #[error(transparent)]
22 : Database(#[from] DatabaseError),
23 : }
24 :
25 : pub(crate) type Result<T> = std::result::Result<T, Error>;
26 :
27 : impl Leadership {
28 0 : pub(crate) fn new(
29 0 : persistence: Arc<Persistence>,
30 0 : config: Config,
31 0 : cancel: CancellationToken,
32 0 : ) -> Self {
33 0 : Self {
34 0 : persistence,
35 0 : config,
36 0 : cancel,
37 0 : }
38 0 : }
39 :
40 : /// Find the current leader in the database and request it to step down if required.
41 : /// Should be called early on in within the start-up sequence.
42 : ///
43 : /// Returns a tuple of two optionals: the current leader and its observed state
44 0 : pub(crate) async fn step_down_current_leader(
45 0 : &self,
46 0 : ) -> Result<(Option<ControllerPersistence>, Option<GlobalObservedState>)> {
47 0 : let leader = self.current_leader().await?;
48 0 : let leader_step_down_state = if let Some(ref leader) = leader {
49 0 : if self.config.start_as_candidate {
50 0 : self.request_step_down(leader).await
51 : } else {
52 0 : None
53 : }
54 : } else {
55 0 : tracing::info!("No leader found to request step down from. Will build observed state.");
56 0 : None
57 : };
58 :
59 0 : Ok((leader, leader_step_down_state))
60 0 : }
61 :
62 : /// Mark the current storage controller instance as the leader in the database
63 0 : pub(crate) async fn become_leader(
64 0 : &self,
65 0 : current_leader: Option<ControllerPersistence>,
66 0 : ) -> Result<()> {
67 0 : if let Some(address_for_peers) = &self.config.address_for_peers {
68 : // TODO: `address-for-peers` can become a mandatory cli arg
69 : // after we update the k8s setup
70 0 : let proposed_leader = ControllerPersistence {
71 0 : address: address_for_peers.to_string(),
72 0 : started_at: chrono::Utc::now(),
73 0 : };
74 0 :
75 0 : self.persistence
76 0 : .update_leader(current_leader, proposed_leader)
77 0 : .await
78 0 : .map_err(Error::Database)
79 : } else {
80 0 : tracing::info!("No address-for-peers provided. Skipping leader persistence.");
81 0 : Ok(())
82 : }
83 0 : }
84 :
85 0 : async fn current_leader(&self) -> DatabaseResult<Option<ControllerPersistence>> {
86 0 : let res = self.persistence.get_leader().await;
87 0 : if let Err(DatabaseError::Query(diesel::result::Error::DatabaseError(_kind, ref err))) = res
88 : {
89 : const REL_NOT_FOUND_MSG: &str = "relation \"controllers\" does not exist";
90 0 : if err.message().trim() == REL_NOT_FOUND_MSG {
91 : // Special case: if this is a brand new storage controller, migrations will not
92 : // have run at this point yet, and, hence, the controllers table does not exist.
93 : // Detect this case via the error string (diesel doesn't type it) and allow it.
94 0 : tracing::info!("Detected first storage controller start-up. Allowing missing controllers table ...");
95 0 : return Ok(None);
96 0 : }
97 0 : }
98 :
99 0 : res
100 0 : }
101 :
102 : /// Request step down from the currently registered leader in the database
103 : ///
104 : /// If such an entry is persisted, the success path returns the observed
105 : /// state and details of the leader. Otherwise, None is returned indicating
106 : /// there is no leader currently.
107 0 : async fn request_step_down(
108 0 : &self,
109 0 : leader: &ControllerPersistence,
110 0 : ) -> Option<GlobalObservedState> {
111 0 : tracing::info!("Sending step down request to {leader:?}");
112 :
113 0 : let client = PeerClient::new(
114 0 : Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"),
115 0 : self.config.peer_jwt_token.clone(),
116 0 : );
117 0 : let state = client.step_down(&self.cancel).await;
118 0 : match state {
119 0 : Ok(state) => Some(state),
120 0 : Err(err) => {
121 0 : // TODO: Make leaders periodically update a timestamp field in the
122 0 : // database and, if the leader is not reachable from the current instance,
123 0 : // but inferred as alive from the timestamp, abort start-up. This avoids
124 0 : // a potential scenario in which we have two controllers acting as leaders.
125 0 : tracing::error!(
126 0 : "Leader ({}) did not respond to step-down request: {}",
127 : leader.address,
128 : err
129 : );
130 :
131 0 : None
132 : }
133 : }
134 0 : }
135 : }
|