LCOV - code coverage report
Current view: top level - safekeeper/src - hadron.rs (source / functions) Coverage Total Hit
Test: 4be46b1c0003aa3bbac9ade362c676b419df4c20.info Lines: 19.3 % 280 54
Test Date: 2025-07-22 17:50:06 Functions: 12.5 % 24 3

            Line data    Source code
       1              : use once_cell::sync::Lazy;
       2              : use pem::Pem;
       3              : use safekeeper_api::models::PullTimelineRequest;
       4              : use std::{
       5              :     collections::HashMap, env::VarError, net::IpAddr, sync::Arc, sync::atomic::AtomicBool,
       6              :     time::Duration,
       7              : };
       8              : use tokio::time::sleep;
       9              : use tokio_util::sync::CancellationToken;
      10              : use url::Url;
      11              : use utils::{backoff, critical_timeline, id::TenantTimelineId, ip_address};
      12              : 
      13              : use anyhow::{Result, anyhow};
      14              : 
      15              : use pageserver_api::controller_api::{
      16              :     AvailabilityZone, NodeRegisterRequest, SafekeeperTimeline, SafekeeperTimelinesResponse,
      17              : };
      18              : 
      19              : use crate::{
      20              :     GlobalTimelines, SafeKeeperConf,
      21              :     metrics::{
      22              :         SK_RECOVERY_PULL_TIMELINE_ERRORS, SK_RECOVERY_PULL_TIMELINE_OKS,
      23              :         SK_RECOVERY_PULL_TIMELINE_SECONDS, SK_RECOVERY_PULL_TIMELINES_SECONDS,
      24              :     },
      25              :     pull_timeline,
      26              :     timelines_global_map::DeleteOrExclude,
      27              : };
      28              : 
      29              : // Extract information in the SafeKeeperConf to build a NodeRegisterRequest used to register the safekeeper with the HCC.
      30            1 : fn build_node_registeration_request(
      31            1 :     conf: &SafeKeeperConf,
      32            1 :     node_ip_addr: Option<IpAddr>,
      33            1 : ) -> Result<NodeRegisterRequest> {
      34            1 :     let advertise_pg_addr_with_port = conf
      35            1 :         .advertise_pg_addr_tenant_only
      36            1 :         .as_deref()
      37            1 :         .expect("advertise_pg_addr_tenant_only is required to register with HCC");
      38              : 
      39              :     // Extract host/port from the string.
      40            1 :     let (advertise_host_addr, pg_port_str) = advertise_pg_addr_with_port.split_at(
      41            1 :         advertise_pg_addr_with_port
      42            1 :             .rfind(':')
      43            1 :             .ok_or(anyhow::anyhow!("Invalid advertise_pg_addr"))?,
      44              :     );
      45              :     // Need the `[1..]` to remove the leading ':'.
      46            1 :     let pg_port = pg_port_str[1..]
      47            1 :         .parse::<u16>()
      48            1 :         .map_err(|e| anyhow::anyhow!("Cannot parse PG port: {}", e))?;
      49              : 
      50            1 :     let (_, http_port_str) = conf.listen_http_addr.split_at(
      51            1 :         conf.listen_http_addr
      52            1 :             .rfind(':')
      53            1 :             .ok_or(anyhow::anyhow!("Invalid listen_http_addr"))?,
      54              :     );
      55            1 :     let http_port = http_port_str[1..]
      56            1 :         .parse::<u16>()
      57            1 :         .map_err(|e| anyhow::anyhow!("Cannot parse HTTP port: {}", e))?;
      58              : 
      59            1 :     Ok(NodeRegisterRequest {
      60            1 :         node_id: conf.my_id,
      61            1 :         listen_pg_addr: advertise_host_addr.to_string(),
      62            1 :         listen_pg_port: pg_port,
      63            1 :         listen_http_addr: advertise_host_addr.to_string(),
      64            1 :         listen_http_port: http_port,
      65            1 :         node_ip_addr,
      66            1 :         availability_zone_id: AvailabilityZone("todo".to_string()),
      67            1 :         listen_grpc_addr: None,
      68            1 :         listen_grpc_port: None,
      69            1 :         listen_https_port: None,
      70            1 :     })
      71            1 : }
      72              : 
      73              : // Retrieve the JWT token used for authenticating with HCC from the environment variable.
      74              : // Returns None if the token cannot be retrieved.
      75            0 : fn get_hcc_auth_token() -> Option<String> {
      76            0 :     match std::env::var("HCC_AUTH_TOKEN") {
      77            0 :         Ok(v) => {
      78            0 :             tracing::info!("Loaded JWT token for authentication with HCC");
      79            0 :             Some(v)
      80              :         }
      81              :         Err(VarError::NotPresent) => {
      82            0 :             tracing::info!("No JWT token for authentication with HCC detected");
      83            0 :             None
      84              :         }
      85              :         Err(_) => {
      86            0 :             tracing::info!(
      87            0 :                 "Failed to either load to detect non-present HCC_AUTH_TOKEN environment variable"
      88              :             );
      89            0 :             None
      90              :         }
      91              :     }
      92            0 : }
      93              : 
      94            0 : async fn send_safekeeper_register_request(
      95            0 :     request_url: &Url,
      96            0 :     auth_token: &Option<String>,
      97            0 :     request: &NodeRegisterRequest,
      98            0 : ) -> Result<()> {
      99            0 :     let client = reqwest::Client::new();
     100            0 :     let mut req_builder = client
     101            0 :         .post(request_url.clone())
     102            0 :         .header("Content-Type", "application/json");
     103            0 :     if let Some(token) = auth_token {
     104            0 :         req_builder = req_builder.bearer_auth(token);
     105            0 :     }
     106            0 :     req_builder
     107            0 :         .json(&request)
     108            0 :         .send()
     109            0 :         .await?
     110            0 :         .error_for_status()?;
     111            0 :     Ok(())
     112            0 : }
     113              : 
     114              : /// Registers this safe keeper with the HCC.
     115            0 : pub async fn register(conf: &SafeKeeperConf) -> Result<()> {
     116            0 :     match conf.hcc_base_url.as_ref() {
     117              :         None => {
     118            0 :             tracing::info!("HCC base URL is not set, skipping registration");
     119            0 :             Ok(())
     120              :         }
     121            0 :         Some(hcc_base_url) => {
     122              :             // The following operations acquiring the auth token and the node IP address both read environment
     123              :             // variables. It's fine for now as this `register()` function is only called once during startup.
     124              :             // If we start to talk to HCC more regularly in the safekeeper we should probably consider
     125              :             // refactoring things into a "HadronClusterCoordinatorClient" struct.
     126            0 :             let auth_token = get_hcc_auth_token();
     127            0 :             let node_ip_addr =
     128            0 :                 ip_address::read_node_ip_addr_from_env().expect("Error reading node IP address.");
     129              : 
     130            0 :             let request = build_node_registeration_request(conf, node_ip_addr)?;
     131            0 :             let cancel = CancellationToken::new();
     132            0 :             let request_url = hcc_base_url.clone().join("/hadron-internal/v1/sk")?;
     133              : 
     134            0 :             backoff::retry(
     135            0 :                 || async {
     136            0 :                     send_safekeeper_register_request(&request_url, &auth_token, &request).await
     137            0 :                 },
     138              :                 |_| false,
     139              :                 3,
     140              :                 u32::MAX,
     141            0 :                 "Calling the HCC safekeeper register API",
     142            0 :                 &cancel,
     143              :             )
     144            0 :             .await
     145            0 :             .ok_or(anyhow::anyhow!(
     146            0 :                 "Error in forever retry loop. This error should never be surfaced."
     147            0 :             ))?
     148              :         }
     149              :     }
     150            0 : }
     151              : 
     152            0 : async fn safekeeper_list_timelines_request(
     153            0 :     conf: &SafeKeeperConf,
     154            0 : ) -> Result<pageserver_api::controller_api::SafekeeperTimelinesResponse> {
     155            0 :     if conf.hcc_base_url.is_none() {
     156            0 :         tracing::info!("HCC base URL is not set, skipping registration");
     157            0 :         return Err(anyhow::anyhow!("HCC base URL is not set"));
     158            0 :     }
     159              : 
     160              :     // The following operations acquiring the auth token and the node IP address both read environment
     161              :     // variables. It's fine for now as this `register()` function is only called once during startup.
     162              :     // If we start to talk to HCC more regularly in the safekeeper we should probably consider
     163              :     // refactoring things into a "HadronClusterCoordinatorClient" struct.
     164            0 :     let auth_token = get_hcc_auth_token();
     165            0 :     let method = format!("/control/v1/safekeeper/{}/timelines", conf.my_id.0);
     166            0 :     let request_url = conf.hcc_base_url.as_ref().unwrap().clone().join(&method)?;
     167              : 
     168            0 :     let client = reqwest::Client::new();
     169            0 :     let mut req_builder = client
     170            0 :         .get(request_url.clone())
     171            0 :         .header("Content-Type", "application/json")
     172            0 :         .query(&[("id", conf.my_id.0)]);
     173            0 :     if let Some(token) = auth_token {
     174            0 :         req_builder = req_builder.bearer_auth(token);
     175            0 :     }
     176            0 :     let response = req_builder
     177            0 :         .send()
     178            0 :         .await?
     179            0 :         .error_for_status()?
     180            0 :         .json::<pageserver_api::controller_api::SafekeeperTimelinesResponse>()
     181            0 :         .await?;
     182            0 :     Ok(response)
     183            0 : }
     184              : 
     185              : // Returns true on success, false otherwise.
     186            0 : pub async fn hcc_pull_timeline(
     187            0 :     timeline: SafekeeperTimeline,
     188            0 :     conf: &SafeKeeperConf,
     189            0 :     global_timelines: Arc<GlobalTimelines>,
     190            0 :     nodeid_http: &HashMap<u64, String>,
     191            0 : ) -> bool {
     192            0 :     let mut request = PullTimelineRequest {
     193            0 :         tenant_id: timeline.tenant_id,
     194            0 :         timeline_id: timeline.timeline_id,
     195            0 :         http_hosts: Vec::new(),
     196            0 :         ignore_tombstone: None,
     197            0 :     };
     198            0 :     for host in timeline.peers {
     199            0 :         if host.0 == conf.my_id.0 {
     200            0 :             continue;
     201            0 :         }
     202            0 :         if let Some(http_host) = nodeid_http.get(&host.0) {
     203            0 :             request.http_hosts.push(http_host.clone());
     204            0 :         }
     205              :     }
     206              : 
     207            0 :     let ca_certs = match conf
     208            0 :         .ssl_ca_certs
     209            0 :         .iter()
     210            0 :         .map(Pem::contents)
     211            0 :         .map(reqwest::Certificate::from_der)
     212            0 :         .collect::<Result<Vec<_>, _>>()
     213              :     {
     214            0 :         Ok(result) => result,
     215              :         Err(_) => {
     216            0 :             return false;
     217              :         }
     218              :     };
     219            0 :     match pull_timeline::handle_request(
     220            0 :         request,
     221            0 :         conf.sk_auth_token.clone(),
     222            0 :         ca_certs,
     223            0 :         global_timelines.clone(),
     224              :         true,
     225              :     )
     226            0 :     .await
     227              :     {
     228            0 :         Ok(resp) => {
     229            0 :             tracing::info!(
     230            0 :                 "Completed pulling tenant {} timeline {} from SK {:?}",
     231              :                 timeline.tenant_id,
     232              :                 timeline.timeline_id,
     233              :                 resp.safekeeper_host
     234              :             );
     235            0 :             return true;
     236              :         }
     237            0 :         Err(e) => {
     238            0 :             tracing::error!(
     239            0 :                 "Failed to pull tenant {} timeline {} from SK {}",
     240              :                 timeline.tenant_id,
     241              :                 timeline.timeline_id,
     242              :                 e
     243              :             );
     244              : 
     245            0 :             let ttid = TenantTimelineId {
     246            0 :                 tenant_id: timeline.tenant_id,
     247            0 :                 timeline_id: timeline.timeline_id,
     248            0 :             };
     249              :             // Revert the failed timeline pull.
     250              :             // Notice that not found timeline returns OK also.
     251            0 :             match global_timelines
     252            0 :                 .delete_or_exclude(&ttid, DeleteOrExclude::DeleteLocal)
     253            0 :                 .await
     254              :             {
     255            0 :                 Ok(dr) => {
     256            0 :                     tracing::info!(
     257            0 :                         "Deleted tenant {} timeline {} DirExists: {}",
     258              :                         timeline.tenant_id,
     259              :                         timeline.timeline_id,
     260              :                         dr.dir_existed,
     261              :                     );
     262              :                 }
     263            0 :                 Err(e) => {
     264            0 :                     tracing::error!(
     265            0 :                         "Failed to delete tenant {} timeline {} from global_timelines: {}",
     266              :                         timeline.tenant_id,
     267              :                         timeline.timeline_id,
     268              :                         e
     269              :                     );
     270              :                 }
     271              :             }
     272              :         }
     273              :     }
     274            0 :     false
     275            0 : }
     276              : 
     277            0 : pub async fn hcc_pull_timeline_till_success(
     278            0 :     timeline: SafekeeperTimeline,
     279            0 :     conf: &SafeKeeperConf,
     280            0 :     global_timelines: Arc<GlobalTimelines>,
     281            0 :     nodeid_http: &HashMap<u64, String>,
     282            0 : ) {
     283              :     const MAX_PULL_TIMELINE_RETRIES: u64 = 100;
     284            0 :     for i in 0..MAX_PULL_TIMELINE_RETRIES {
     285            0 :         if hcc_pull_timeline(
     286            0 :             timeline.clone(),
     287            0 :             conf,
     288            0 :             global_timelines.clone(),
     289            0 :             nodeid_http,
     290            0 :         )
     291            0 :         .await
     292              :         {
     293            0 :             SK_RECOVERY_PULL_TIMELINE_OKS.inc();
     294            0 :             return;
     295            0 :         }
     296            0 :         tracing::error!(
     297            0 :             "Failed to pull timeline {} from SK peers, retrying {}/{}",
     298              :             timeline.timeline_id,
     299            0 :             i + 1,
     300              :             MAX_PULL_TIMELINE_RETRIES
     301              :         );
     302            0 :         tokio::time::sleep(std::time::Duration::from_secs(1)).await;
     303              :     }
     304            0 :     SK_RECOVERY_PULL_TIMELINE_ERRORS.inc();
     305            0 : }
     306              : 
     307            0 : pub async fn hcc_pull_timelines(
     308            0 :     conf: &SafeKeeperConf,
     309            0 :     global_timelines: Arc<GlobalTimelines>,
     310            0 : ) -> Result<()> {
     311            0 :     let _timer = SK_RECOVERY_PULL_TIMELINES_SECONDS.start_timer();
     312            0 :     tracing::info!("Start pulling timelines from SK peers");
     313              : 
     314            0 :     let mut response = SafekeeperTimelinesResponse {
     315            0 :         timelines: Vec::new(),
     316            0 :         safekeeper_peers: Vec::new(),
     317            0 :     };
     318            0 :     for i in 0..100 {
     319            0 :         match safekeeper_list_timelines_request(conf).await {
     320            0 :             Ok(timelines) => {
     321            0 :                 response = timelines;
     322            0 :             }
     323            0 :             Err(e) => {
     324            0 :                 tracing::error!("Failed to list timelines from HCC: {}", e);
     325            0 :                 if i == 99 {
     326            0 :                     return Err(e);
     327            0 :                 }
     328              :             }
     329              :         }
     330            0 :         sleep(Duration::from_millis(100)).await;
     331              :     }
     332              : 
     333            0 :     let mut nodeid_http = HashMap::new();
     334            0 :     for sk in response.safekeeper_peers {
     335            0 :         nodeid_http.insert(
     336            0 :             sk.node_id.0,
     337            0 :             format!("http://{}:{}", sk.listen_http_addr, sk.http_port),
     338            0 :         );
     339            0 :     }
     340            0 :     tracing::info!("Received {} timelines from HCC", response.timelines.len());
     341            0 :     for timeline in response.timelines {
     342            0 :         let _timer = SK_RECOVERY_PULL_TIMELINE_SECONDS
     343            0 :             .with_label_values(&[
     344            0 :                 &timeline.tenant_id.to_string(),
     345            0 :                 &timeline.timeline_id.to_string(),
     346            0 :             ])
     347            0 :             .start_timer();
     348            0 :         hcc_pull_timeline_till_success(timeline, conf, global_timelines.clone(), &nodeid_http)
     349            0 :             .await;
     350              :     }
     351            0 :     Ok(())
     352            0 : }
     353              : 
     354              : /// true if the last background scan found total usage > limit
     355            5 : pub static GLOBAL_DISK_LIMIT_EXCEEDED: Lazy<AtomicBool> = Lazy::new(|| AtomicBool::new(false));
     356              : 
     357              : /// Returns filesystem usage in bytes for the filesystem containing the given path.
     358              : // Need to suppress the clippy::unnecessary_cast warning because the casts on the block count and the
     359              : // block size are required on macOS (they are 32-bit integers on macOS, apparantly).
     360              : #[allow(clippy::unnecessary_cast)]
     361            0 : pub fn get_filesystem_usage(path: &std::path::Path) -> u64 {
     362              :     // Allow overriding disk usage via failpoint for tests
     363            0 :     fail::fail_point!("sk-global-disk-usage", |val| {
     364              :         // val is Option<String>; parse payload if present
     365            0 :         val.and_then(|s| s.parse::<u64>().ok()).unwrap_or(0)
     366            0 :     });
     367              : 
     368              :     // Call statvfs(3) for filesystem usage
     369              :     use nix::sys::statvfs::statvfs;
     370            0 :     match statvfs(path) {
     371            0 :         Ok(stat) => {
     372              :             // fragment size (f_frsize) if non-zero else block size (f_bsize)
     373            0 :             let frsize = stat.fragment_size();
     374            0 :             let blocksz = if frsize > 0 {
     375            0 :                 frsize
     376              :             } else {
     377            0 :                 stat.block_size()
     378              :             };
     379              :             // used blocks = total blocks - available blocks for unprivileged
     380            0 :             let used_blocks = stat.blocks().saturating_sub(stat.blocks_available());
     381            0 :             used_blocks as u64 * blocksz as u64
     382              :         }
     383            0 :         Err(e) => {
     384              :             // The global disk usage watcher aren't associated with a tenant or timeline, so we just
     385              :             // pass placeholder (all-zero) tenant and timeline IDs to the critical!() macro.
     386            0 :             let placeholder_ttid = TenantTimelineId::empty();
     387            0 :             critical_timeline!(
     388            0 :                 placeholder_ttid.tenant_id,
     389            0 :                 placeholder_ttid.timeline_id,
     390            0 :                 "Global disk usage watcher failed to read filesystem usage: {:?}",
     391              :                 e
     392              :             );
     393            0 :             0
     394              :         }
     395              :     }
     396            0 : }
     397              : 
     398              : /// Returns the total capacity of the current working directory's filesystem in bytes.
     399              : #[allow(clippy::unnecessary_cast)]
     400            0 : pub fn get_filesystem_capacity(path: &std::path::Path) -> Result<u64> {
     401              :     // Call statvfs(3) for filesystem stats
     402              :     use nix::sys::statvfs::statvfs;
     403            0 :     match statvfs(path) {
     404            0 :         Ok(stat) => {
     405              :             // fragment size (f_frsize) if non-zero else block size (f_bsize)
     406            0 :             let frsize = stat.fragment_size();
     407            0 :             let blocksz = if frsize > 0 {
     408            0 :                 frsize
     409              :             } else {
     410            0 :                 stat.block_size()
     411              :             };
     412            0 :             Ok(stat.blocks() as u64 * blocksz as u64)
     413              :         }
     414            0 :         Err(e) => Err(anyhow!("Failed to read filesystem capacity: {:?}", e)),
     415              :     }
     416            0 : }
     417              : 
     418              : #[cfg(test)]
     419              : mod tests {
     420              :     use super::*;
     421              :     use utils::id::NodeId;
     422              : 
     423              :     #[test]
     424            1 :     fn test_build_node_registeration_request() {
     425              :         // Test that:
     426              :         // 1. We always extract the host name and port used to register with the HCC from the
     427              :         //    `advertise_pg_addr` if it is set.
     428              :         // 2. The correct ports are extracted from `advertise_pg_addr` and `listen_http_addr`.
     429            1 :         let mut conf = SafeKeeperConf::dummy();
     430            1 :         conf.my_id = NodeId(1);
     431            1 :         conf.advertise_pg_addr_tenant_only =
     432            1 :             Some("safe-keeper-1.safe-keeper.hadron.svc.cluster.local:5454".to_string());
     433              :         // `listen_pg_addr` and `listen_pg_addr_tenant_only` are not used for node registration. Set them to a different
     434              :         // host and port values and make sure that they don't show up in the node registration request.
     435            1 :         conf.listen_pg_addr = "0.0.0.0:5456".to_string();
     436            1 :         conf.listen_pg_addr_tenant_only = Some("0.0.0.0:5456".to_string());
     437            1 :         conf.listen_http_addr = "0.0.0.0:7676".to_string();
     438            1 :         let node_ip_addr: Option<IpAddr> = Some("127.0.0.1".parse().unwrap());
     439              : 
     440            1 :         let request = build_node_registeration_request(&conf, node_ip_addr).unwrap();
     441            1 :         assert_eq!(request.node_id, NodeId(1));
     442            1 :         assert_eq!(
     443              :             request.listen_pg_addr,
     444              :             "safe-keeper-1.safe-keeper.hadron.svc.cluster.local"
     445              :         );
     446            1 :         assert_eq!(request.listen_pg_port, 5454);
     447            1 :         assert_eq!(
     448              :             request.listen_http_addr,
     449              :             "safe-keeper-1.safe-keeper.hadron.svc.cluster.local"
     450              :         );
     451            1 :         assert_eq!(request.listen_http_port, 7676);
     452            1 :         assert_eq!(
     453              :             request.node_ip_addr,
     454            1 :             Some(IpAddr::V4("127.0.0.1".parse().unwrap()))
     455              :         );
     456            1 :     }
     457              : }
        

Generated by: LCOV version 2.1-beta