Line data Source code
1 : use std::time::Duration;
2 :
3 : use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
4 : use reqwest::StatusCode;
5 : use safekeeper_client::mgmt_api;
6 : use tokio_util::sync::CancellationToken;
7 : use utils::backoff;
8 : use utils::id::NodeId;
9 : use utils::logging::SecretString;
10 :
11 : use crate::heartbeater::SafekeeperState;
12 : use crate::persistence::{DatabaseError, SafekeeperPersistence};
13 : use crate::safekeeper_client::SafekeeperClient;
14 :
15 : #[derive(Clone)]
16 : pub struct Safekeeper {
17 : pub(crate) skp: SafekeeperPersistence,
18 : cancel: CancellationToken,
19 : listen_http_addr: String,
20 : listen_http_port: u16,
21 : listen_https_port: Option<u16>,
22 : scheduling_policy: SkSchedulingPolicy,
23 : id: NodeId,
24 : /// Heartbeating result.
25 : availability: SafekeeperState,
26 :
27 : // Flag from storcon's config to use https for safekeeper API.
28 : // Invariant: if |true|, listen_https_port should contain a value.
29 : use_https: bool,
30 : }
31 :
32 : impl Safekeeper {
33 0 : pub(crate) fn from_persistence(
34 0 : skp: SafekeeperPersistence,
35 0 : cancel: CancellationToken,
36 0 : use_https: bool,
37 0 : ) -> anyhow::Result<Self> {
38 0 : if use_https && skp.https_port.is_none() {
39 0 : anyhow::bail!(
40 0 : "cannot load safekeeper {} from persistence: \
41 0 : https is enabled, but https port is not specified",
42 0 : skp.id,
43 0 : );
44 0 : }
45 0 :
46 0 : let scheduling_policy = skp.scheduling_policy.0;
47 0 : Ok(Self {
48 0 : cancel,
49 0 : listen_http_addr: skp.host.clone(),
50 0 : listen_http_port: skp.http_port as u16,
51 0 : listen_https_port: skp.https_port.map(|x| x as u16),
52 0 : id: NodeId(skp.id as u64),
53 0 : skp,
54 0 : availability: SafekeeperState::Offline,
55 0 : scheduling_policy,
56 0 : use_https,
57 0 : })
58 0 : }
59 :
60 0 : pub(crate) fn base_url(&self) -> String {
61 0 : if self.use_https {
62 0 : format!(
63 0 : "https://{}:{}",
64 0 : self.listen_http_addr,
65 0 : self.listen_https_port
66 0 : .expect("https port should be specified if use_https is on"),
67 0 : )
68 : } else {
69 0 : format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
70 : }
71 0 : }
72 :
73 0 : pub(crate) fn get_id(&self) -> NodeId {
74 0 : self.id
75 0 : }
76 0 : pub(crate) fn describe_response(&self) -> Result<SafekeeperDescribeResponse, DatabaseError> {
77 0 : self.skp.as_describe_response()
78 0 : }
79 0 : pub(crate) fn set_availability(&mut self, availability: SafekeeperState) {
80 0 : self.availability = availability;
81 0 : }
82 0 : pub(crate) fn scheduling_policy(&self) -> SkSchedulingPolicy {
83 0 : self.scheduling_policy
84 0 : }
85 0 : pub(crate) fn set_scheduling_policy(&mut self, scheduling_policy: SkSchedulingPolicy) {
86 0 : self.scheduling_policy = scheduling_policy;
87 0 : self.skp.scheduling_policy = scheduling_policy.into();
88 0 : }
89 0 : pub(crate) fn availability(&self) -> SafekeeperState {
90 0 : self.availability.clone()
91 0 : }
92 : /// Perform an operation (which is given a [`SafekeeperClient`]) with retries
93 : #[allow(clippy::too_many_arguments)]
94 0 : pub(crate) async fn with_client_retries<T, O, F>(
95 0 : &self,
96 0 : mut op: O,
97 0 : http_client: &reqwest::Client,
98 0 : jwt: &Option<SecretString>,
99 0 : warn_threshold: u32,
100 0 : max_retries: u32,
101 0 : timeout: Duration,
102 0 : cancel: &CancellationToken,
103 0 : ) -> mgmt_api::Result<T>
104 0 : where
105 0 : O: FnMut(SafekeeperClient) -> F,
106 0 : F: std::future::Future<Output = mgmt_api::Result<T>>,
107 0 : {
108 0 : fn is_fatal(e: &mgmt_api::Error) -> bool {
109 : use mgmt_api::Error::*;
110 0 : match e {
111 0 : ReceiveBody(_) | ReceiveErrorBody(_) => false,
112 : ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
113 : | ApiError(StatusCode::GATEWAY_TIMEOUT, _)
114 0 : | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
115 0 : ApiError(_, _) => true,
116 0 : Cancelled => true,
117 0 : Timeout(_) => false,
118 : }
119 0 : }
120 :
121 0 : backoff::retry(
122 0 : || {
123 0 : let client = SafekeeperClient::new(
124 0 : self.get_id(),
125 0 : http_client.clone(),
126 0 : self.base_url(),
127 0 : jwt.clone(),
128 0 : );
129 0 :
130 0 : let node_cancel_fut = self.cancel.cancelled();
131 0 :
132 0 : let op_fut = tokio::time::timeout(timeout, op(client));
133 :
134 0 : async {
135 0 : tokio::select! {
136 0 : r = op_fut => match r {
137 0 : Ok(r) => r,
138 0 : Err(e) => Err(mgmt_api::Error::Timeout(format!("{e}"))),
139 : },
140 0 : _ = node_cancel_fut => {
141 0 : Err(mgmt_api::Error::Cancelled)
142 : }}
143 0 : }
144 0 : },
145 0 : is_fatal,
146 0 : warn_threshold,
147 0 : max_retries,
148 0 : &format!(
149 0 : "Call to safekeeper {} ({}) management API",
150 0 : self.id,
151 0 : self.base_url(),
152 0 : ),
153 0 : cancel,
154 0 : )
155 0 : .await
156 0 : .unwrap_or(Err(mgmt_api::Error::Cancelled))
157 0 : }
158 :
159 0 : pub(crate) fn update_from_record(
160 0 : &mut self,
161 0 : record: crate::persistence::SafekeeperUpsert,
162 0 : ) -> anyhow::Result<()> {
163 0 : let crate::persistence::SafekeeperUpsert {
164 0 : active: _,
165 0 : availability_zone_id: _,
166 0 : host,
167 0 : http_port,
168 0 : https_port,
169 0 : id,
170 0 : port: _,
171 0 : region_id: _,
172 0 : version: _,
173 0 : } = record.clone();
174 0 : if id != self.id.0 as i64 {
175 : // The way the function is called ensures this. If we regress on that, it's a bug.
176 0 : panic!(
177 0 : "id can't be changed via update_from_record function: {id} != {}",
178 0 : self.id.0
179 0 : );
180 0 : }
181 0 : if self.use_https && https_port.is_none() {
182 0 : anyhow::bail!(
183 0 : "cannot update safekeeper {id}: \
184 0 : https is enabled, but https port is not specified"
185 0 : );
186 0 : }
187 0 : self.skp =
188 0 : crate::persistence::SafekeeperPersistence::from_upsert(record, self.scheduling_policy);
189 0 : self.listen_http_port = http_port as u16;
190 0 : self.listen_https_port = https_port.map(|x| x as u16);
191 0 : self.listen_http_addr = host;
192 0 : Ok(())
193 0 : }
194 : }
|