Line data Source code
1 : use std::time::Duration;
2 :
3 : use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
4 : use reqwest::{Certificate, StatusCode};
5 : use safekeeper_client::mgmt_api;
6 : use tokio_util::sync::CancellationToken;
7 : use utils::backoff;
8 : use utils::id::NodeId;
9 : use utils::logging::SecretString;
10 :
11 : use crate::heartbeater::SafekeeperState;
12 : use crate::persistence::{DatabaseError, SafekeeperPersistence};
13 : use crate::safekeeper_client::SafekeeperClient;
14 :
15 : #[derive(Clone)]
16 : pub struct Safekeeper {
17 : pub(crate) skp: SafekeeperPersistence,
18 : cancel: CancellationToken,
19 : listen_http_addr: String,
20 : listen_http_port: u16,
21 : listen_https_port: Option<u16>,
22 : scheduling_policy: SkSchedulingPolicy,
23 : id: NodeId,
24 : /// Heartbeating result.
25 : availability: SafekeeperState,
26 :
27 : // Flag from storcon's config to use https for safekeeper API.
28 : // Invariant: if |true|, listen_https_port should contain a value.
29 : use_https: bool,
30 : }
31 :
32 : impl Safekeeper {
33 0 : pub(crate) fn from_persistence(
34 0 : skp: SafekeeperPersistence,
35 0 : cancel: CancellationToken,
36 0 : use_https: bool,
37 0 : ) -> anyhow::Result<Self> {
38 0 : if use_https && skp.https_port.is_none() {
39 0 : anyhow::bail!(
40 0 : "cannot load safekeeper {} from persistence: \
41 0 : https is enabled, but https port is not specified",
42 0 : skp.id,
43 0 : );
44 0 : }
45 0 :
46 0 : let scheduling_policy = skp.scheduling_policy.0;
47 0 : Ok(Self {
48 0 : cancel,
49 0 : listen_http_addr: skp.host.clone(),
50 0 : listen_http_port: skp.http_port as u16,
51 0 : listen_https_port: skp.https_port.map(|x| x as u16),
52 0 : id: NodeId(skp.id as u64),
53 0 : skp,
54 0 : availability: SafekeeperState::Offline,
55 0 : scheduling_policy,
56 0 : use_https,
57 0 : })
58 0 : }
59 :
60 0 : pub(crate) fn base_url(&self) -> String {
61 0 : if self.use_https {
62 0 : format!(
63 0 : "https://{}:{}",
64 0 : self.listen_http_addr,
65 0 : self.listen_https_port
66 0 : .expect("https port should be specified if use_https is on"),
67 0 : )
68 : } else {
69 0 : format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
70 : }
71 0 : }
72 :
73 0 : pub(crate) fn get_id(&self) -> NodeId {
74 0 : self.id
75 0 : }
76 0 : pub(crate) fn describe_response(&self) -> Result<SafekeeperDescribeResponse, DatabaseError> {
77 0 : self.skp.as_describe_response()
78 0 : }
79 0 : pub(crate) fn set_availability(&mut self, availability: SafekeeperState) {
80 0 : self.availability = availability;
81 0 : }
82 0 : pub(crate) fn scheduling_policy(&self) -> SkSchedulingPolicy {
83 0 : self.scheduling_policy
84 0 : }
85 0 : pub(crate) fn set_scheduling_policy(&mut self, scheduling_policy: SkSchedulingPolicy) {
86 0 : self.scheduling_policy = scheduling_policy;
87 0 : self.skp.scheduling_policy = scheduling_policy.into();
88 0 : }
89 0 : pub(crate) fn availability(&self) -> SafekeeperState {
90 0 : self.availability.clone()
91 0 : }
92 : /// Perform an operation (which is given a [`SafekeeperClient`]) with retries
93 : #[allow(clippy::too_many_arguments)]
94 0 : pub(crate) async fn with_client_retries<T, O, F>(
95 0 : &self,
96 0 : mut op: O,
97 0 : jwt: &Option<SecretString>,
98 0 : ssl_ca_cert: &Option<Certificate>,
99 0 : warn_threshold: u32,
100 0 : max_retries: u32,
101 0 : timeout: Duration,
102 0 : cancel: &CancellationToken,
103 0 : ) -> mgmt_api::Result<T>
104 0 : where
105 0 : O: FnMut(SafekeeperClient) -> F,
106 0 : F: std::future::Future<Output = mgmt_api::Result<T>>,
107 0 : {
108 0 : fn is_fatal(e: &mgmt_api::Error) -> bool {
109 : use mgmt_api::Error::*;
110 0 : match e {
111 0 : ReceiveBody(_) | ReceiveErrorBody(_) => false,
112 : ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
113 : | ApiError(StatusCode::GATEWAY_TIMEOUT, _)
114 0 : | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
115 0 : ApiError(_, _) => true,
116 0 : Cancelled => true,
117 0 : CreateClient(_) => true,
118 : }
119 0 : }
120 :
121 : // TODO: refactor SafekeeperClient and with_client_retires (#11113).
122 0 : let mut http_client = reqwest::Client::builder().timeout(timeout);
123 0 : if let Some(ssl_ca_cert) = ssl_ca_cert.as_ref() {
124 0 : http_client = http_client.add_root_certificate(ssl_ca_cert.clone());
125 0 : }
126 0 : let http_client = http_client.build().map_err(mgmt_api::Error::CreateClient)?;
127 :
128 0 : backoff::retry(
129 0 : || {
130 0 : let client = SafekeeperClient::new(
131 0 : self.get_id(),
132 0 : http_client.clone(),
133 0 : self.base_url(),
134 0 : jwt.clone(),
135 0 : );
136 0 :
137 0 : let node_cancel_fut = self.cancel.cancelled();
138 0 :
139 0 : let op_fut = op(client);
140 :
141 0 : async {
142 0 : tokio::select! {
143 0 : r = op_fut=> {r},
144 0 : _ = node_cancel_fut => {
145 0 : Err(mgmt_api::Error::Cancelled)
146 : }}
147 0 : }
148 0 : },
149 0 : is_fatal,
150 0 : warn_threshold,
151 0 : max_retries,
152 0 : &format!(
153 0 : "Call to safekeeper {} ({}) management API",
154 0 : self.id,
155 0 : self.base_url(),
156 0 : ),
157 0 : cancel,
158 0 : )
159 0 : .await
160 0 : .unwrap_or(Err(mgmt_api::Error::Cancelled))
161 0 : }
162 :
163 0 : pub(crate) fn update_from_record(
164 0 : &mut self,
165 0 : record: crate::persistence::SafekeeperUpsert,
166 0 : ) -> anyhow::Result<()> {
167 0 : let crate::persistence::SafekeeperUpsert {
168 0 : active: _,
169 0 : availability_zone_id: _,
170 0 : host,
171 0 : http_port,
172 0 : https_port,
173 0 : id,
174 0 : port: _,
175 0 : region_id: _,
176 0 : version: _,
177 0 : } = record.clone();
178 0 : if id != self.id.0 as i64 {
179 : // The way the function is called ensures this. If we regress on that, it's a bug.
180 0 : panic!(
181 0 : "id can't be changed via update_from_record function: {id} != {}",
182 0 : self.id.0
183 0 : );
184 0 : }
185 0 : if self.use_https && https_port.is_none() {
186 0 : anyhow::bail!(
187 0 : "cannot update safekeeper {id}: \
188 0 : https is enabled, but https port is not specified"
189 0 : );
190 0 : }
191 0 : self.skp =
192 0 : crate::persistence::SafekeeperPersistence::from_upsert(record, self.scheduling_policy);
193 0 : self.listen_http_port = http_port as u16;
194 0 : self.listen_https_port = https_port.map(|x| x as u16);
195 0 : self.listen_http_addr = host;
196 0 : Ok(())
197 0 : }
198 : }
|