Line data Source code
1 : use pem::Pem;
2 : use safekeeper_api::models::PullTimelineRequest;
3 : use std::{collections::HashMap, env::VarError, net::IpAddr, sync::Arc, time::Duration};
4 : use tokio::time::sleep;
5 : use tokio_util::sync::CancellationToken;
6 : use url::Url;
7 : use utils::{backoff, id::TenantTimelineId, ip_address};
8 :
9 : use anyhow::Result;
10 : use pageserver_api::controller_api::{
11 : AvailabilityZone, NodeRegisterRequest, SafekeeperTimeline, SafekeeperTimelinesResponse,
12 : };
13 :
14 : use crate::{
15 : GlobalTimelines, SafeKeeperConf,
16 : metrics::{
17 : SK_RECOVERY_PULL_TIMELINE_ERRORS, SK_RECOVERY_PULL_TIMELINE_OKS,
18 : SK_RECOVERY_PULL_TIMELINE_SECONDS, SK_RECOVERY_PULL_TIMELINES_SECONDS,
19 : },
20 : pull_timeline,
21 : timelines_global_map::DeleteOrExclude,
22 : };
23 :
24 : // Extract information in the SafeKeeperConf to build a NodeRegisterRequest used to register the safekeeper with the HCC.
25 1 : fn build_node_registeration_request(
26 1 : conf: &SafeKeeperConf,
27 1 : node_ip_addr: Option<IpAddr>,
28 1 : ) -> Result<NodeRegisterRequest> {
29 1 : let advertise_pg_addr_with_port = conf
30 1 : .advertise_pg_addr_tenant_only
31 1 : .as_deref()
32 1 : .expect("advertise_pg_addr_tenant_only is required to register with HCC");
33 :
34 : // Extract host/port from the string.
35 1 : let (advertise_host_addr, pg_port_str) = advertise_pg_addr_with_port.split_at(
36 1 : advertise_pg_addr_with_port
37 1 : .rfind(':')
38 1 : .ok_or(anyhow::anyhow!("Invalid advertise_pg_addr"))?,
39 : );
40 : // Need the `[1..]` to remove the leading ':'.
41 1 : let pg_port = pg_port_str[1..]
42 1 : .parse::<u16>()
43 1 : .map_err(|e| anyhow::anyhow!("Cannot parse PG port: {}", e))?;
44 :
45 1 : let (_, http_port_str) = conf.listen_http_addr.split_at(
46 1 : conf.listen_http_addr
47 1 : .rfind(':')
48 1 : .ok_or(anyhow::anyhow!("Invalid listen_http_addr"))?,
49 : );
50 1 : let http_port = http_port_str[1..]
51 1 : .parse::<u16>()
52 1 : .map_err(|e| anyhow::anyhow!("Cannot parse HTTP port: {}", e))?;
53 :
54 1 : Ok(NodeRegisterRequest {
55 1 : node_id: conf.my_id,
56 1 : listen_pg_addr: advertise_host_addr.to_string(),
57 1 : listen_pg_port: pg_port,
58 1 : listen_http_addr: advertise_host_addr.to_string(),
59 1 : listen_http_port: http_port,
60 1 : node_ip_addr,
61 1 : availability_zone_id: AvailabilityZone("todo".to_string()),
62 1 : listen_grpc_addr: None,
63 1 : listen_grpc_port: None,
64 1 : listen_https_port: None,
65 1 : })
66 1 : }
67 :
68 : // Retrieve the JWT token used for authenticating with HCC from the environment variable.
69 : // Returns None if the token cannot be retrieved.
70 0 : fn get_hcc_auth_token() -> Option<String> {
71 0 : match std::env::var("HCC_AUTH_TOKEN") {
72 0 : Ok(v) => {
73 0 : tracing::info!("Loaded JWT token for authentication with HCC");
74 0 : Some(v)
75 : }
76 : Err(VarError::NotPresent) => {
77 0 : tracing::info!("No JWT token for authentication with HCC detected");
78 0 : None
79 : }
80 : Err(_) => {
81 0 : tracing::info!(
82 0 : "Failed to either load to detect non-present HCC_AUTH_TOKEN environment variable"
83 : );
84 0 : None
85 : }
86 : }
87 0 : }
88 :
89 0 : async fn send_safekeeper_register_request(
90 0 : request_url: &Url,
91 0 : auth_token: &Option<String>,
92 0 : request: &NodeRegisterRequest,
93 0 : ) -> Result<()> {
94 0 : let client = reqwest::Client::new();
95 0 : let mut req_builder = client
96 0 : .post(request_url.clone())
97 0 : .header("Content-Type", "application/json");
98 0 : if let Some(token) = auth_token {
99 0 : req_builder = req_builder.bearer_auth(token);
100 0 : }
101 0 : req_builder
102 0 : .json(&request)
103 0 : .send()
104 0 : .await?
105 0 : .error_for_status()?;
106 0 : Ok(())
107 0 : }
108 :
109 : /// Registers this safe keeper with the HCC.
110 0 : pub async fn register(conf: &SafeKeeperConf) -> Result<()> {
111 0 : match conf.hcc_base_url.as_ref() {
112 : None => {
113 0 : tracing::info!("HCC base URL is not set, skipping registration");
114 0 : Ok(())
115 : }
116 0 : Some(hcc_base_url) => {
117 : // The following operations acquiring the auth token and the node IP address both read environment
118 : // variables. It's fine for now as this `register()` function is only called once during startup.
119 : // If we start to talk to HCC more regularly in the safekeeper we should probably consider
120 : // refactoring things into a "HadronClusterCoordinatorClient" struct.
121 0 : let auth_token = get_hcc_auth_token();
122 0 : let node_ip_addr =
123 0 : ip_address::read_node_ip_addr_from_env().expect("Error reading node IP address.");
124 :
125 0 : let request = build_node_registeration_request(conf, node_ip_addr)?;
126 0 : let cancel = CancellationToken::new();
127 0 : let request_url = hcc_base_url.clone().join("/hadron-internal/v1/sk")?;
128 :
129 0 : backoff::retry(
130 0 : || async {
131 0 : send_safekeeper_register_request(&request_url, &auth_token, &request).await
132 0 : },
133 : |_| false,
134 : 3,
135 : u32::MAX,
136 0 : "Calling the HCC safekeeper register API",
137 0 : &cancel,
138 : )
139 0 : .await
140 0 : .ok_or(anyhow::anyhow!(
141 0 : "Error in forever retry loop. This error should never be surfaced."
142 0 : ))?
143 : }
144 : }
145 0 : }
146 :
147 0 : async fn safekeeper_list_timelines_request(
148 0 : conf: &SafeKeeperConf,
149 0 : ) -> Result<pageserver_api::controller_api::SafekeeperTimelinesResponse> {
150 0 : if conf.hcc_base_url.is_none() {
151 0 : tracing::info!("HCC base URL is not set, skipping registration");
152 0 : return Err(anyhow::anyhow!("HCC base URL is not set"));
153 0 : }
154 :
155 : // The following operations acquiring the auth token and the node IP address both read environment
156 : // variables. It's fine for now as this `register()` function is only called once during startup.
157 : // If we start to talk to HCC more regularly in the safekeeper we should probably consider
158 : // refactoring things into a "HadronClusterCoordinatorClient" struct.
159 0 : let auth_token = get_hcc_auth_token();
160 0 : let method = format!("/control/v1/safekeeper/{}/timelines", conf.my_id.0);
161 0 : let request_url = conf.hcc_base_url.as_ref().unwrap().clone().join(&method)?;
162 :
163 0 : let client = reqwest::Client::new();
164 0 : let mut req_builder = client
165 0 : .get(request_url.clone())
166 0 : .header("Content-Type", "application/json")
167 0 : .query(&[("id", conf.my_id.0)]);
168 0 : if let Some(token) = auth_token {
169 0 : req_builder = req_builder.bearer_auth(token);
170 0 : }
171 0 : let response = req_builder
172 0 : .send()
173 0 : .await?
174 0 : .error_for_status()?
175 0 : .json::<pageserver_api::controller_api::SafekeeperTimelinesResponse>()
176 0 : .await?;
177 0 : Ok(response)
178 0 : }
179 :
180 : // Returns true on success, false otherwise.
181 0 : pub async fn hcc_pull_timeline(
182 0 : timeline: SafekeeperTimeline,
183 0 : conf: &SafeKeeperConf,
184 0 : global_timelines: Arc<GlobalTimelines>,
185 0 : nodeid_http: &HashMap<u64, String>,
186 0 : ) -> bool {
187 0 : let mut request = PullTimelineRequest {
188 0 : tenant_id: timeline.tenant_id,
189 0 : timeline_id: timeline.timeline_id,
190 0 : http_hosts: Vec::new(),
191 0 : ignore_tombstone: None,
192 0 : };
193 0 : for host in timeline.peers {
194 0 : if host.0 == conf.my_id.0 {
195 0 : continue;
196 0 : }
197 0 : if let Some(http_host) = nodeid_http.get(&host.0) {
198 0 : request.http_hosts.push(http_host.clone());
199 0 : }
200 : }
201 :
202 0 : let ca_certs = match conf
203 0 : .ssl_ca_certs
204 0 : .iter()
205 0 : .map(Pem::contents)
206 0 : .map(reqwest::Certificate::from_der)
207 0 : .collect::<Result<Vec<_>, _>>()
208 : {
209 0 : Ok(result) => result,
210 : Err(_) => {
211 0 : return false;
212 : }
213 : };
214 0 : match pull_timeline::handle_request(
215 0 : request,
216 0 : conf.sk_auth_token.clone(),
217 0 : ca_certs,
218 0 : global_timelines.clone(),
219 : true,
220 : )
221 0 : .await
222 : {
223 0 : Ok(resp) => {
224 0 : tracing::info!(
225 0 : "Completed pulling tenant {} timeline {} from SK {:?}",
226 : timeline.tenant_id,
227 : timeline.timeline_id,
228 : resp.safekeeper_host
229 : );
230 0 : return true;
231 : }
232 0 : Err(e) => {
233 0 : tracing::error!(
234 0 : "Failed to pull tenant {} timeline {} from SK {}",
235 : timeline.tenant_id,
236 : timeline.timeline_id,
237 : e
238 : );
239 :
240 0 : let ttid = TenantTimelineId {
241 0 : tenant_id: timeline.tenant_id,
242 0 : timeline_id: timeline.timeline_id,
243 0 : };
244 : // Revert the failed timeline pull.
245 : // Notice that not found timeline returns OK also.
246 0 : match global_timelines
247 0 : .delete_or_exclude(&ttid, DeleteOrExclude::DeleteLocal)
248 0 : .await
249 : {
250 0 : Ok(dr) => {
251 0 : tracing::info!(
252 0 : "Deleted tenant {} timeline {} DirExists: {}",
253 : timeline.tenant_id,
254 : timeline.timeline_id,
255 : dr.dir_existed,
256 : );
257 : }
258 0 : Err(e) => {
259 0 : tracing::error!(
260 0 : "Failed to delete tenant {} timeline {} from global_timelines: {}",
261 : timeline.tenant_id,
262 : timeline.timeline_id,
263 : e
264 : );
265 : }
266 : }
267 : }
268 : }
269 0 : false
270 0 : }
271 :
272 0 : pub async fn hcc_pull_timeline_till_success(
273 0 : timeline: SafekeeperTimeline,
274 0 : conf: &SafeKeeperConf,
275 0 : global_timelines: Arc<GlobalTimelines>,
276 0 : nodeid_http: &HashMap<u64, String>,
277 0 : ) {
278 : const MAX_PULL_TIMELINE_RETRIES: u64 = 100;
279 0 : for i in 0..MAX_PULL_TIMELINE_RETRIES {
280 0 : if hcc_pull_timeline(
281 0 : timeline.clone(),
282 0 : conf,
283 0 : global_timelines.clone(),
284 0 : nodeid_http,
285 0 : )
286 0 : .await
287 : {
288 0 : SK_RECOVERY_PULL_TIMELINE_OKS.inc();
289 0 : return;
290 0 : }
291 0 : tracing::error!(
292 0 : "Failed to pull timeline {} from SK peers, retrying {}/{}",
293 : timeline.timeline_id,
294 0 : i + 1,
295 : MAX_PULL_TIMELINE_RETRIES
296 : );
297 0 : tokio::time::sleep(std::time::Duration::from_secs(1)).await;
298 : }
299 0 : SK_RECOVERY_PULL_TIMELINE_ERRORS.inc();
300 0 : }
301 :
302 0 : pub async fn hcc_pull_timelines(
303 0 : conf: &SafeKeeperConf,
304 0 : global_timelines: Arc<GlobalTimelines>,
305 0 : ) -> Result<()> {
306 0 : let _timer = SK_RECOVERY_PULL_TIMELINES_SECONDS.start_timer();
307 0 : tracing::info!("Start pulling timelines from SK peers");
308 :
309 0 : let mut response = SafekeeperTimelinesResponse {
310 0 : timelines: Vec::new(),
311 0 : safekeeper_peers: Vec::new(),
312 0 : };
313 0 : for i in 0..100 {
314 0 : match safekeeper_list_timelines_request(conf).await {
315 0 : Ok(timelines) => {
316 0 : response = timelines;
317 0 : }
318 0 : Err(e) => {
319 0 : tracing::error!("Failed to list timelines from HCC: {}", e);
320 0 : if i == 99 {
321 0 : return Err(e);
322 0 : }
323 : }
324 : }
325 0 : sleep(Duration::from_millis(100)).await;
326 : }
327 :
328 0 : let mut nodeid_http = HashMap::new();
329 0 : for sk in response.safekeeper_peers {
330 0 : nodeid_http.insert(
331 0 : sk.node_id.0,
332 0 : format!("http://{}:{}", sk.listen_http_addr, sk.http_port),
333 0 : );
334 0 : }
335 0 : tracing::info!("Received {} timelines from HCC", response.timelines.len());
336 0 : for timeline in response.timelines {
337 0 : let _timer = SK_RECOVERY_PULL_TIMELINE_SECONDS
338 0 : .with_label_values(&[
339 0 : &timeline.tenant_id.to_string(),
340 0 : &timeline.timeline_id.to_string(),
341 0 : ])
342 0 : .start_timer();
343 0 : hcc_pull_timeline_till_success(timeline, conf, global_timelines.clone(), &nodeid_http)
344 0 : .await;
345 : }
346 0 : Ok(())
347 0 : }
348 :
349 : #[cfg(test)]
350 : mod tests {
351 : use super::*;
352 : use utils::id::NodeId;
353 :
354 : #[test]
355 1 : fn test_build_node_registeration_request() {
356 : // Test that:
357 : // 1. We always extract the host name and port used to register with the HCC from the
358 : // `advertise_pg_addr` if it is set.
359 : // 2. The correct ports are extracted from `advertise_pg_addr` and `listen_http_addr`.
360 1 : let mut conf = SafeKeeperConf::dummy();
361 1 : conf.my_id = NodeId(1);
362 1 : conf.advertise_pg_addr_tenant_only =
363 1 : Some("safe-keeper-1.safe-keeper.hadron.svc.cluster.local:5454".to_string());
364 : // `listen_pg_addr` and `listen_pg_addr_tenant_only` are not used for node registration. Set them to a different
365 : // host and port values and make sure that they don't show up in the node registration request.
366 1 : conf.listen_pg_addr = "0.0.0.0:5456".to_string();
367 1 : conf.listen_pg_addr_tenant_only = Some("0.0.0.0:5456".to_string());
368 1 : conf.listen_http_addr = "0.0.0.0:7676".to_string();
369 1 : let node_ip_addr: Option<IpAddr> = Some("127.0.0.1".parse().unwrap());
370 :
371 1 : let request = build_node_registeration_request(&conf, node_ip_addr).unwrap();
372 1 : assert_eq!(request.node_id, NodeId(1));
373 1 : assert_eq!(
374 : request.listen_pg_addr,
375 : "safe-keeper-1.safe-keeper.hadron.svc.cluster.local"
376 : );
377 1 : assert_eq!(request.listen_pg_port, 5454);
378 1 : assert_eq!(
379 : request.listen_http_addr,
380 : "safe-keeper-1.safe-keeper.hadron.svc.cluster.local"
381 : );
382 1 : assert_eq!(request.listen_http_port, 7676);
383 1 : assert_eq!(
384 : request.node_ip_addr,
385 1 : Some(IpAddr::V4("127.0.0.1".parse().unwrap()))
386 : );
387 1 : }
388 : }
|