Line data Source code
1 : use once_cell::sync::Lazy;
2 : use pem::Pem;
3 : use safekeeper_api::models::PullTimelineRequest;
4 : use std::{
5 : collections::HashMap, env::VarError, net::IpAddr, sync::Arc, sync::atomic::AtomicBool,
6 : time::Duration,
7 : };
8 : use tokio::time::sleep;
9 : use tokio_util::sync::CancellationToken;
10 : use url::Url;
11 : use utils::{backoff, critical_timeline, id::TenantTimelineId, ip_address};
12 :
13 : use anyhow::{Result, anyhow};
14 :
15 : use pageserver_api::controller_api::{
16 : AvailabilityZone, NodeRegisterRequest, SafekeeperTimeline, SafekeeperTimelinesResponse,
17 : };
18 :
19 : use crate::{
20 : GlobalTimelines, SafeKeeperConf,
21 : metrics::{
22 : SK_RECOVERY_PULL_TIMELINE_ERRORS, SK_RECOVERY_PULL_TIMELINE_OKS,
23 : SK_RECOVERY_PULL_TIMELINE_SECONDS, SK_RECOVERY_PULL_TIMELINES_SECONDS,
24 : },
25 : pull_timeline,
26 : timelines_global_map::DeleteOrExclude,
27 : };
28 :
29 : // Extract information in the SafeKeeperConf to build a NodeRegisterRequest used to register the safekeeper with the HCC.
30 1 : fn build_node_registeration_request(
31 1 : conf: &SafeKeeperConf,
32 1 : node_ip_addr: Option<IpAddr>,
33 1 : ) -> Result<NodeRegisterRequest> {
34 1 : let advertise_pg_addr_with_port = conf
35 1 : .advertise_pg_addr_tenant_only
36 1 : .as_deref()
37 1 : .expect("advertise_pg_addr_tenant_only is required to register with HCC");
38 :
39 : // Extract host/port from the string.
40 1 : let (advertise_host_addr, pg_port_str) = advertise_pg_addr_with_port.split_at(
41 1 : advertise_pg_addr_with_port
42 1 : .rfind(':')
43 1 : .ok_or(anyhow::anyhow!("Invalid advertise_pg_addr"))?,
44 : );
45 : // Need the `[1..]` to remove the leading ':'.
46 1 : let pg_port = pg_port_str[1..]
47 1 : .parse::<u16>()
48 1 : .map_err(|e| anyhow::anyhow!("Cannot parse PG port: {}", e))?;
49 :
50 1 : let (_, http_port_str) = conf.listen_http_addr.split_at(
51 1 : conf.listen_http_addr
52 1 : .rfind(':')
53 1 : .ok_or(anyhow::anyhow!("Invalid listen_http_addr"))?,
54 : );
55 1 : let http_port = http_port_str[1..]
56 1 : .parse::<u16>()
57 1 : .map_err(|e| anyhow::anyhow!("Cannot parse HTTP port: {}", e))?;
58 :
59 1 : Ok(NodeRegisterRequest {
60 1 : node_id: conf.my_id,
61 1 : listen_pg_addr: advertise_host_addr.to_string(),
62 1 : listen_pg_port: pg_port,
63 1 : listen_http_addr: advertise_host_addr.to_string(),
64 1 : listen_http_port: http_port,
65 1 : node_ip_addr,
66 1 : availability_zone_id: AvailabilityZone("todo".to_string()),
67 1 : listen_grpc_addr: None,
68 1 : listen_grpc_port: None,
69 1 : listen_https_port: None,
70 1 : })
71 1 : }
72 :
73 : // Retrieve the JWT token used for authenticating with HCC from the environment variable.
74 : // Returns None if the token cannot be retrieved.
75 0 : fn get_hcc_auth_token() -> Option<String> {
76 0 : match std::env::var("HCC_AUTH_TOKEN") {
77 0 : Ok(v) => {
78 0 : tracing::info!("Loaded JWT token for authentication with HCC");
79 0 : Some(v)
80 : }
81 : Err(VarError::NotPresent) => {
82 0 : tracing::info!("No JWT token for authentication with HCC detected");
83 0 : None
84 : }
85 : Err(_) => {
86 0 : tracing::info!(
87 0 : "Failed to either load to detect non-present HCC_AUTH_TOKEN environment variable"
88 : );
89 0 : None
90 : }
91 : }
92 0 : }
93 :
94 0 : async fn send_safekeeper_register_request(
95 0 : request_url: &Url,
96 0 : auth_token: &Option<String>,
97 0 : request: &NodeRegisterRequest,
98 0 : ) -> Result<()> {
99 0 : let client = reqwest::Client::new();
100 0 : let mut req_builder = client
101 0 : .post(request_url.clone())
102 0 : .header("Content-Type", "application/json");
103 0 : if let Some(token) = auth_token {
104 0 : req_builder = req_builder.bearer_auth(token);
105 0 : }
106 0 : req_builder
107 0 : .json(&request)
108 0 : .send()
109 0 : .await?
110 0 : .error_for_status()?;
111 0 : Ok(())
112 0 : }
113 :
114 : /// Registers this safe keeper with the HCC.
115 0 : pub async fn register(conf: &SafeKeeperConf) -> Result<()> {
116 0 : match conf.hcc_base_url.as_ref() {
117 : None => {
118 0 : tracing::info!("HCC base URL is not set, skipping registration");
119 0 : Ok(())
120 : }
121 0 : Some(hcc_base_url) => {
122 : // The following operations acquiring the auth token and the node IP address both read environment
123 : // variables. It's fine for now as this `register()` function is only called once during startup.
124 : // If we start to talk to HCC more regularly in the safekeeper we should probably consider
125 : // refactoring things into a "HadronClusterCoordinatorClient" struct.
126 0 : let auth_token = get_hcc_auth_token();
127 0 : let node_ip_addr =
128 0 : ip_address::read_node_ip_addr_from_env().expect("Error reading node IP address.");
129 :
130 0 : let request = build_node_registeration_request(conf, node_ip_addr)?;
131 0 : let cancel = CancellationToken::new();
132 0 : let request_url = hcc_base_url.clone().join("/hadron-internal/v1/sk")?;
133 :
134 0 : backoff::retry(
135 0 : || async {
136 0 : send_safekeeper_register_request(&request_url, &auth_token, &request).await
137 0 : },
138 : |_| false,
139 : 3,
140 : u32::MAX,
141 0 : "Calling the HCC safekeeper register API",
142 0 : &cancel,
143 : )
144 0 : .await
145 0 : .ok_or(anyhow::anyhow!(
146 0 : "Error in forever retry loop. This error should never be surfaced."
147 0 : ))?
148 : }
149 : }
150 0 : }
151 :
152 0 : async fn safekeeper_list_timelines_request(
153 0 : conf: &SafeKeeperConf,
154 0 : ) -> Result<pageserver_api::controller_api::SafekeeperTimelinesResponse> {
155 0 : if conf.hcc_base_url.is_none() {
156 0 : tracing::info!("HCC base URL is not set, skipping registration");
157 0 : return Err(anyhow::anyhow!("HCC base URL is not set"));
158 0 : }
159 :
160 : // The following operations acquiring the auth token and the node IP address both read environment
161 : // variables. It's fine for now as this `register()` function is only called once during startup.
162 : // If we start to talk to HCC more regularly in the safekeeper we should probably consider
163 : // refactoring things into a "HadronClusterCoordinatorClient" struct.
164 0 : let auth_token = get_hcc_auth_token();
165 0 : let method = format!("/control/v1/safekeeper/{}/timelines", conf.my_id.0);
166 0 : let request_url = conf.hcc_base_url.as_ref().unwrap().clone().join(&method)?;
167 :
168 0 : let client = reqwest::Client::new();
169 0 : let mut req_builder = client
170 0 : .get(request_url.clone())
171 0 : .header("Content-Type", "application/json")
172 0 : .query(&[("id", conf.my_id.0)]);
173 0 : if let Some(token) = auth_token {
174 0 : req_builder = req_builder.bearer_auth(token);
175 0 : }
176 0 : let response = req_builder
177 0 : .send()
178 0 : .await?
179 0 : .error_for_status()?
180 0 : .json::<pageserver_api::controller_api::SafekeeperTimelinesResponse>()
181 0 : .await?;
182 0 : Ok(response)
183 0 : }
184 :
185 : // Returns true on success, false otherwise.
186 0 : pub async fn hcc_pull_timeline(
187 0 : timeline: SafekeeperTimeline,
188 0 : conf: &SafeKeeperConf,
189 0 : global_timelines: Arc<GlobalTimelines>,
190 0 : nodeid_http: &HashMap<u64, String>,
191 0 : ) -> bool {
192 0 : let mut request = PullTimelineRequest {
193 0 : tenant_id: timeline.tenant_id,
194 0 : timeline_id: timeline.timeline_id,
195 0 : http_hosts: Vec::new(),
196 0 : ignore_tombstone: None,
197 0 : };
198 0 : for host in timeline.peers {
199 0 : if host.0 == conf.my_id.0 {
200 0 : continue;
201 0 : }
202 0 : if let Some(http_host) = nodeid_http.get(&host.0) {
203 0 : request.http_hosts.push(http_host.clone());
204 0 : }
205 : }
206 :
207 0 : let ca_certs = match conf
208 0 : .ssl_ca_certs
209 0 : .iter()
210 0 : .map(Pem::contents)
211 0 : .map(reqwest::Certificate::from_der)
212 0 : .collect::<Result<Vec<_>, _>>()
213 : {
214 0 : Ok(result) => result,
215 : Err(_) => {
216 0 : return false;
217 : }
218 : };
219 0 : match pull_timeline::handle_request(
220 0 : request,
221 0 : conf.sk_auth_token.clone(),
222 0 : ca_certs,
223 0 : global_timelines.clone(),
224 : true,
225 : )
226 0 : .await
227 : {
228 0 : Ok(resp) => {
229 0 : tracing::info!(
230 0 : "Completed pulling tenant {} timeline {} from SK {:?}",
231 : timeline.tenant_id,
232 : timeline.timeline_id,
233 : resp.safekeeper_host
234 : );
235 0 : return true;
236 : }
237 0 : Err(e) => {
238 0 : tracing::error!(
239 0 : "Failed to pull tenant {} timeline {} from SK {}",
240 : timeline.tenant_id,
241 : timeline.timeline_id,
242 : e
243 : );
244 :
245 0 : let ttid = TenantTimelineId {
246 0 : tenant_id: timeline.tenant_id,
247 0 : timeline_id: timeline.timeline_id,
248 0 : };
249 : // Revert the failed timeline pull.
250 : // Notice that not found timeline returns OK also.
251 0 : match global_timelines
252 0 : .delete_or_exclude(&ttid, DeleteOrExclude::DeleteLocal)
253 0 : .await
254 : {
255 0 : Ok(dr) => {
256 0 : tracing::info!(
257 0 : "Deleted tenant {} timeline {} DirExists: {}",
258 : timeline.tenant_id,
259 : timeline.timeline_id,
260 : dr.dir_existed,
261 : );
262 : }
263 0 : Err(e) => {
264 0 : tracing::error!(
265 0 : "Failed to delete tenant {} timeline {} from global_timelines: {}",
266 : timeline.tenant_id,
267 : timeline.timeline_id,
268 : e
269 : );
270 : }
271 : }
272 : }
273 : }
274 0 : false
275 0 : }
276 :
277 0 : pub async fn hcc_pull_timeline_till_success(
278 0 : timeline: SafekeeperTimeline,
279 0 : conf: &SafeKeeperConf,
280 0 : global_timelines: Arc<GlobalTimelines>,
281 0 : nodeid_http: &HashMap<u64, String>,
282 0 : ) {
283 : const MAX_PULL_TIMELINE_RETRIES: u64 = 100;
284 0 : for i in 0..MAX_PULL_TIMELINE_RETRIES {
285 0 : if hcc_pull_timeline(
286 0 : timeline.clone(),
287 0 : conf,
288 0 : global_timelines.clone(),
289 0 : nodeid_http,
290 0 : )
291 0 : .await
292 : {
293 0 : SK_RECOVERY_PULL_TIMELINE_OKS.inc();
294 0 : return;
295 0 : }
296 0 : tracing::error!(
297 0 : "Failed to pull timeline {} from SK peers, retrying {}/{}",
298 : timeline.timeline_id,
299 0 : i + 1,
300 : MAX_PULL_TIMELINE_RETRIES
301 : );
302 0 : tokio::time::sleep(std::time::Duration::from_secs(1)).await;
303 : }
304 0 : SK_RECOVERY_PULL_TIMELINE_ERRORS.inc();
305 0 : }
306 :
307 0 : pub async fn hcc_pull_timelines(
308 0 : conf: &SafeKeeperConf,
309 0 : global_timelines: Arc<GlobalTimelines>,
310 0 : ) -> Result<()> {
311 0 : let _timer = SK_RECOVERY_PULL_TIMELINES_SECONDS.start_timer();
312 0 : tracing::info!("Start pulling timelines from SK peers");
313 :
314 0 : let mut response = SafekeeperTimelinesResponse {
315 0 : timelines: Vec::new(),
316 0 : safekeeper_peers: Vec::new(),
317 0 : };
318 0 : for i in 0..100 {
319 0 : match safekeeper_list_timelines_request(conf).await {
320 0 : Ok(timelines) => {
321 0 : response = timelines;
322 0 : }
323 0 : Err(e) => {
324 0 : tracing::error!("Failed to list timelines from HCC: {}", e);
325 0 : if i == 99 {
326 0 : return Err(e);
327 0 : }
328 : }
329 : }
330 0 : sleep(Duration::from_millis(100)).await;
331 : }
332 :
333 0 : let mut nodeid_http = HashMap::new();
334 0 : for sk in response.safekeeper_peers {
335 0 : nodeid_http.insert(
336 0 : sk.node_id.0,
337 0 : format!("http://{}:{}", sk.listen_http_addr, sk.http_port),
338 0 : );
339 0 : }
340 0 : tracing::info!("Received {} timelines from HCC", response.timelines.len());
341 0 : for timeline in response.timelines {
342 0 : let _timer = SK_RECOVERY_PULL_TIMELINE_SECONDS
343 0 : .with_label_values(&[
344 0 : &timeline.tenant_id.to_string(),
345 0 : &timeline.timeline_id.to_string(),
346 0 : ])
347 0 : .start_timer();
348 0 : hcc_pull_timeline_till_success(timeline, conf, global_timelines.clone(), &nodeid_http)
349 0 : .await;
350 : }
351 0 : Ok(())
352 0 : }
353 :
354 : /// true if the last background scan found total usage > limit
355 5 : pub static GLOBAL_DISK_LIMIT_EXCEEDED: Lazy<AtomicBool> = Lazy::new(|| AtomicBool::new(false));
356 :
357 : /// Returns filesystem usage in bytes for the filesystem containing the given path.
358 : // Need to suppress the clippy::unnecessary_cast warning because the casts on the block count and the
359 : // block size are required on macOS (they are 32-bit integers on macOS, apparantly).
360 : #[allow(clippy::unnecessary_cast)]
361 0 : pub fn get_filesystem_usage(path: &std::path::Path) -> u64 {
362 : // Allow overriding disk usage via failpoint for tests
363 0 : fail::fail_point!("sk-global-disk-usage", |val| {
364 : // val is Option<String>; parse payload if present
365 0 : val.and_then(|s| s.parse::<u64>().ok()).unwrap_or(0)
366 0 : });
367 :
368 : // Call statvfs(3) for filesystem usage
369 : use nix::sys::statvfs::statvfs;
370 0 : match statvfs(path) {
371 0 : Ok(stat) => {
372 : // fragment size (f_frsize) if non-zero else block size (f_bsize)
373 0 : let frsize = stat.fragment_size();
374 0 : let blocksz = if frsize > 0 {
375 0 : frsize
376 : } else {
377 0 : stat.block_size()
378 : };
379 : // used blocks = total blocks - available blocks for unprivileged
380 0 : let used_blocks = stat.blocks().saturating_sub(stat.blocks_available());
381 0 : used_blocks as u64 * blocksz as u64
382 : }
383 0 : Err(e) => {
384 : // The global disk usage watcher aren't associated with a tenant or timeline, so we just
385 : // pass placeholder (all-zero) tenant and timeline IDs to the critical!() macro.
386 0 : let placeholder_ttid = TenantTimelineId::empty();
387 0 : critical_timeline!(
388 0 : placeholder_ttid.tenant_id,
389 0 : placeholder_ttid.timeline_id,
390 0 : "Global disk usage watcher failed to read filesystem usage: {:?}",
391 : e
392 : );
393 0 : 0
394 : }
395 : }
396 0 : }
397 :
398 : /// Returns the total capacity of the current working directory's filesystem in bytes.
399 : #[allow(clippy::unnecessary_cast)]
400 0 : pub fn get_filesystem_capacity(path: &std::path::Path) -> Result<u64> {
401 : // Call statvfs(3) for filesystem stats
402 : use nix::sys::statvfs::statvfs;
403 0 : match statvfs(path) {
404 0 : Ok(stat) => {
405 : // fragment size (f_frsize) if non-zero else block size (f_bsize)
406 0 : let frsize = stat.fragment_size();
407 0 : let blocksz = if frsize > 0 {
408 0 : frsize
409 : } else {
410 0 : stat.block_size()
411 : };
412 0 : Ok(stat.blocks() as u64 * blocksz as u64)
413 : }
414 0 : Err(e) => Err(anyhow!("Failed to read filesystem capacity: {:?}", e)),
415 : }
416 0 : }
417 :
418 : #[cfg(test)]
419 : mod tests {
420 : use super::*;
421 : use utils::id::NodeId;
422 :
423 : #[test]
424 1 : fn test_build_node_registeration_request() {
425 : // Test that:
426 : // 1. We always extract the host name and port used to register with the HCC from the
427 : // `advertise_pg_addr` if it is set.
428 : // 2. The correct ports are extracted from `advertise_pg_addr` and `listen_http_addr`.
429 1 : let mut conf = SafeKeeperConf::dummy();
430 1 : conf.my_id = NodeId(1);
431 1 : conf.advertise_pg_addr_tenant_only =
432 1 : Some("safe-keeper-1.safe-keeper.hadron.svc.cluster.local:5454".to_string());
433 : // `listen_pg_addr` and `listen_pg_addr_tenant_only` are not used for node registration. Set them to a different
434 : // host and port values and make sure that they don't show up in the node registration request.
435 1 : conf.listen_pg_addr = "0.0.0.0:5456".to_string();
436 1 : conf.listen_pg_addr_tenant_only = Some("0.0.0.0:5456".to_string());
437 1 : conf.listen_http_addr = "0.0.0.0:7676".to_string();
438 1 : let node_ip_addr: Option<IpAddr> = Some("127.0.0.1".parse().unwrap());
439 :
440 1 : let request = build_node_registeration_request(&conf, node_ip_addr).unwrap();
441 1 : assert_eq!(request.node_id, NodeId(1));
442 1 : assert_eq!(
443 : request.listen_pg_addr,
444 : "safe-keeper-1.safe-keeper.hadron.svc.cluster.local"
445 : );
446 1 : assert_eq!(request.listen_pg_port, 5454);
447 1 : assert_eq!(
448 : request.listen_http_addr,
449 : "safe-keeper-1.safe-keeper.hadron.svc.cluster.local"
450 : );
451 1 : assert_eq!(request.listen_http_port, 7676);
452 1 : assert_eq!(
453 : request.node_ip_addr,
454 1 : Some(IpAddr::V4("127.0.0.1".parse().unwrap()))
455 : );
456 1 : }
457 : }
|