Line data Source code
1 : use std::time::Duration;
2 :
3 : use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
4 : use reqwest::StatusCode;
5 : use safekeeper_api::membership::SafekeeperId;
6 : use safekeeper_client::mgmt_api;
7 : use tokio_util::sync::CancellationToken;
8 : use utils::backoff;
9 : use utils::id::NodeId;
10 : use utils::logging::SecretString;
11 :
12 : use crate::heartbeater::SafekeeperState;
13 : use crate::persistence::{DatabaseError, SafekeeperPersistence};
14 : use crate::safekeeper_client::SafekeeperClient;
15 :
16 : #[derive(Clone)]
17 : pub struct Safekeeper {
18 : pub(crate) skp: SafekeeperPersistence,
19 : cancel: CancellationToken,
20 : listen_http_addr: String,
21 : listen_http_port: u16,
22 : listen_https_port: Option<u16>,
23 : scheduling_policy: SkSchedulingPolicy,
24 : id: NodeId,
25 : /// Heartbeating result.
26 : availability: SafekeeperState,
27 :
28 : // Flag from storcon's config to use https for safekeeper API.
29 : // Invariant: if |true|, listen_https_port should contain a value.
30 : use_https: bool,
31 : }
32 :
33 : impl Safekeeper {
34 0 : pub(crate) fn from_persistence(
35 0 : skp: SafekeeperPersistence,
36 0 : cancel: CancellationToken,
37 0 : use_https: bool,
38 0 : ) -> anyhow::Result<Self> {
39 0 : if use_https && skp.https_port.is_none() {
40 0 : anyhow::bail!(
41 0 : "cannot load safekeeper {} from persistence: \
42 0 : https is enabled, but https port is not specified",
43 : skp.id,
44 : );
45 0 : }
46 :
47 0 : let scheduling_policy = skp.scheduling_policy.0;
48 : Ok(Self {
49 0 : cancel,
50 0 : listen_http_addr: skp.host.clone(),
51 0 : listen_http_port: skp.http_port as u16,
52 0 : listen_https_port: skp.https_port.map(|x| x as u16),
53 0 : id: NodeId(skp.id as u64),
54 0 : skp,
55 0 : availability: SafekeeperState::Offline,
56 0 : scheduling_policy,
57 0 : use_https,
58 : })
59 0 : }
60 :
61 0 : pub(crate) fn base_url(&self) -> String {
62 0 : if self.use_https {
63 0 : format!(
64 0 : "https://{}:{}",
65 : self.listen_http_addr,
66 0 : self.listen_https_port
67 0 : .expect("https port should be specified if use_https is on"),
68 : )
69 : } else {
70 0 : format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
71 : }
72 0 : }
73 :
74 0 : pub(crate) fn get_id(&self) -> NodeId {
75 0 : self.id
76 0 : }
77 0 : pub(crate) fn describe_response(&self) -> Result<SafekeeperDescribeResponse, DatabaseError> {
78 0 : self.skp.as_describe_response()
79 0 : }
80 0 : pub(crate) fn set_availability(&mut self, availability: SafekeeperState) {
81 0 : self.availability = availability;
82 0 : }
83 0 : pub(crate) fn scheduling_policy(&self) -> SkSchedulingPolicy {
84 0 : self.scheduling_policy
85 0 : }
86 0 : pub(crate) fn set_scheduling_policy(&mut self, scheduling_policy: SkSchedulingPolicy) {
87 0 : self.scheduling_policy = scheduling_policy;
88 0 : self.skp.scheduling_policy = scheduling_policy.into();
89 0 : }
90 0 : pub(crate) fn availability(&self) -> SafekeeperState {
91 0 : self.availability.clone()
92 0 : }
93 0 : pub(crate) fn has_https_port(&self) -> bool {
94 0 : self.listen_https_port.is_some()
95 0 : }
96 0 : pub(crate) fn get_safekeeper_id(&self) -> SafekeeperId {
97 0 : SafekeeperId {
98 0 : id: self.id,
99 0 : host: self.skp.host.clone(),
100 0 : pg_port: self.skp.port as u16,
101 0 : }
102 0 : }
103 : /// Perform an operation (which is given a [`SafekeeperClient`]) with retries
104 : #[allow(clippy::too_many_arguments)]
105 0 : pub(crate) async fn with_client_retries<T, O, F>(
106 0 : &self,
107 0 : mut op: O,
108 0 : http_client: &reqwest::Client,
109 0 : jwt: &Option<SecretString>,
110 0 : warn_threshold: u32,
111 0 : max_retries: u32,
112 0 : timeout: Duration,
113 0 : cancel: &CancellationToken,
114 0 : ) -> mgmt_api::Result<T>
115 0 : where
116 0 : O: FnMut(SafekeeperClient) -> F,
117 0 : F: std::future::Future<Output = mgmt_api::Result<T>>,
118 0 : {
119 0 : fn is_fatal(e: &mgmt_api::Error) -> bool {
120 : use mgmt_api::Error::*;
121 0 : match e {
122 0 : ReceiveBody(_) | ReceiveErrorBody(_) => false,
123 : ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
124 : | ApiError(StatusCode::GATEWAY_TIMEOUT, _)
125 0 : | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
126 0 : ApiError(_, _) => true,
127 0 : Cancelled => true,
128 0 : Timeout(_) => false,
129 : }
130 0 : }
131 :
132 0 : backoff::retry(
133 0 : || {
134 0 : let client = SafekeeperClient::new(
135 0 : self.get_id(),
136 0 : http_client.clone(),
137 0 : self.base_url(),
138 0 : jwt.clone(),
139 : );
140 :
141 0 : let node_cancel_fut = self.cancel.cancelled();
142 :
143 0 : let op_fut = tokio::time::timeout(timeout, op(client));
144 :
145 0 : async {
146 0 : tokio::select! {
147 0 : r = op_fut => match r {
148 0 : Ok(r) => r,
149 0 : Err(e) => Err(mgmt_api::Error::Timeout(format!("{e}"))),
150 : },
151 0 : _ = node_cancel_fut => {
152 0 : Err(mgmt_api::Error::Cancelled)
153 : }}
154 0 : }
155 0 : },
156 : is_fatal,
157 0 : warn_threshold,
158 0 : max_retries,
159 0 : &format!(
160 0 : "Call to safekeeper {} ({}) management API",
161 0 : self.id,
162 0 : self.base_url(),
163 0 : ),
164 0 : cancel,
165 : )
166 0 : .await
167 0 : .unwrap_or(Err(mgmt_api::Error::Cancelled))
168 0 : }
169 :
170 0 : pub(crate) fn update_from_record(
171 0 : &mut self,
172 0 : record: crate::persistence::SafekeeperUpsert,
173 0 : ) -> anyhow::Result<()> {
174 : let crate::persistence::SafekeeperUpsert {
175 : active: _,
176 : availability_zone_id: _,
177 0 : host,
178 0 : http_port,
179 0 : https_port,
180 0 : id,
181 : port: _,
182 : region_id: _,
183 : version: _,
184 0 : } = record.clone();
185 0 : if id != self.id.0 as i64 {
186 : // The way the function is called ensures this. If we regress on that, it's a bug.
187 0 : panic!(
188 0 : "id can't be changed via update_from_record function: {id} != {}",
189 : self.id.0
190 : );
191 0 : }
192 0 : if self.use_https && https_port.is_none() {
193 0 : anyhow::bail!(
194 0 : "cannot update safekeeper {id}: \
195 0 : https is enabled, but https port is not specified"
196 : );
197 0 : }
198 0 : self.skp =
199 0 : crate::persistence::SafekeeperPersistence::from_upsert(record, self.scheduling_policy);
200 0 : self.listen_http_port = http_port as u16;
201 0 : self.listen_https_port = https_port.map(|x| x as u16);
202 0 : self.listen_http_addr = host;
203 0 : Ok(())
204 0 : }
205 : }
|