LCOV - code coverage report
Current view: top level - safekeeper/src - hadron.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 21.3 % 249 53
Test Date: 2025-07-16 12:29:03 Functions: 10.5 % 19 2

            Line data    Source code
       1              : use pem::Pem;
       2              : use safekeeper_api::models::PullTimelineRequest;
       3              : use std::{collections::HashMap, env::VarError, net::IpAddr, sync::Arc, time::Duration};
       4              : use tokio::time::sleep;
       5              : use tokio_util::sync::CancellationToken;
       6              : use url::Url;
       7              : use utils::{backoff, id::TenantTimelineId, ip_address};
       8              : 
       9              : use anyhow::Result;
      10              : use pageserver_api::controller_api::{
      11              :     AvailabilityZone, NodeRegisterRequest, SafekeeperTimeline, SafekeeperTimelinesResponse,
      12              : };
      13              : 
      14              : use crate::{
      15              :     GlobalTimelines, SafeKeeperConf,
      16              :     metrics::{
      17              :         SK_RECOVERY_PULL_TIMELINE_ERRORS, SK_RECOVERY_PULL_TIMELINE_OKS,
      18              :         SK_RECOVERY_PULL_TIMELINE_SECONDS, SK_RECOVERY_PULL_TIMELINES_SECONDS,
      19              :     },
      20              :     pull_timeline,
      21              :     timelines_global_map::DeleteOrExclude,
      22              : };
      23              : 
      24              : // Extract information in the SafeKeeperConf to build a NodeRegisterRequest used to register the safekeeper with the HCC.
      25            1 : fn build_node_registeration_request(
      26            1 :     conf: &SafeKeeperConf,
      27            1 :     node_ip_addr: Option<IpAddr>,
      28            1 : ) -> Result<NodeRegisterRequest> {
      29            1 :     let advertise_pg_addr_with_port = conf
      30            1 :         .advertise_pg_addr_tenant_only
      31            1 :         .as_deref()
      32            1 :         .expect("advertise_pg_addr_tenant_only is required to register with HCC");
      33              : 
      34              :     // Extract host/port from the string.
      35            1 :     let (advertise_host_addr, pg_port_str) = advertise_pg_addr_with_port.split_at(
      36            1 :         advertise_pg_addr_with_port
      37            1 :             .rfind(':')
      38            1 :             .ok_or(anyhow::anyhow!("Invalid advertise_pg_addr"))?,
      39              :     );
      40              :     // Need the `[1..]` to remove the leading ':'.
      41            1 :     let pg_port = pg_port_str[1..]
      42            1 :         .parse::<u16>()
      43            1 :         .map_err(|e| anyhow::anyhow!("Cannot parse PG port: {}", e))?;
      44              : 
      45            1 :     let (_, http_port_str) = conf.listen_http_addr.split_at(
      46            1 :         conf.listen_http_addr
      47            1 :             .rfind(':')
      48            1 :             .ok_or(anyhow::anyhow!("Invalid listen_http_addr"))?,
      49              :     );
      50            1 :     let http_port = http_port_str[1..]
      51            1 :         .parse::<u16>()
      52            1 :         .map_err(|e| anyhow::anyhow!("Cannot parse HTTP port: {}", e))?;
      53              : 
      54            1 :     Ok(NodeRegisterRequest {
      55            1 :         node_id: conf.my_id,
      56            1 :         listen_pg_addr: advertise_host_addr.to_string(),
      57            1 :         listen_pg_port: pg_port,
      58            1 :         listen_http_addr: advertise_host_addr.to_string(),
      59            1 :         listen_http_port: http_port,
      60            1 :         node_ip_addr,
      61            1 :         availability_zone_id: AvailabilityZone("todo".to_string()),
      62            1 :         listen_grpc_addr: None,
      63            1 :         listen_grpc_port: None,
      64            1 :         listen_https_port: None,
      65            1 :     })
      66            1 : }
      67              : 
      68              : // Retrieve the JWT token used for authenticating with HCC from the environment variable.
      69              : // Returns None if the token cannot be retrieved.
      70            0 : fn get_hcc_auth_token() -> Option<String> {
      71            0 :     match std::env::var("HCC_AUTH_TOKEN") {
      72            0 :         Ok(v) => {
      73            0 :             tracing::info!("Loaded JWT token for authentication with HCC");
      74            0 :             Some(v)
      75              :         }
      76              :         Err(VarError::NotPresent) => {
      77            0 :             tracing::info!("No JWT token for authentication with HCC detected");
      78            0 :             None
      79              :         }
      80              :         Err(_) => {
      81            0 :             tracing::info!(
      82            0 :                 "Failed to either load to detect non-present HCC_AUTH_TOKEN environment variable"
      83              :             );
      84            0 :             None
      85              :         }
      86              :     }
      87            0 : }
      88              : 
      89            0 : async fn send_safekeeper_register_request(
      90            0 :     request_url: &Url,
      91            0 :     auth_token: &Option<String>,
      92            0 :     request: &NodeRegisterRequest,
      93            0 : ) -> Result<()> {
      94            0 :     let client = reqwest::Client::new();
      95            0 :     let mut req_builder = client
      96            0 :         .post(request_url.clone())
      97            0 :         .header("Content-Type", "application/json");
      98            0 :     if let Some(token) = auth_token {
      99            0 :         req_builder = req_builder.bearer_auth(token);
     100            0 :     }
     101            0 :     req_builder
     102            0 :         .json(&request)
     103            0 :         .send()
     104            0 :         .await?
     105            0 :         .error_for_status()?;
     106            0 :     Ok(())
     107            0 : }
     108              : 
     109              : /// Registers this safe keeper with the HCC.
     110            0 : pub async fn register(conf: &SafeKeeperConf) -> Result<()> {
     111            0 :     match conf.hcc_base_url.as_ref() {
     112              :         None => {
     113            0 :             tracing::info!("HCC base URL is not set, skipping registration");
     114            0 :             Ok(())
     115              :         }
     116            0 :         Some(hcc_base_url) => {
     117              :             // The following operations acquiring the auth token and the node IP address both read environment
     118              :             // variables. It's fine for now as this `register()` function is only called once during startup.
     119              :             // If we start to talk to HCC more regularly in the safekeeper we should probably consider
     120              :             // refactoring things into a "HadronClusterCoordinatorClient" struct.
     121            0 :             let auth_token = get_hcc_auth_token();
     122            0 :             let node_ip_addr =
     123            0 :                 ip_address::read_node_ip_addr_from_env().expect("Error reading node IP address.");
     124              : 
     125            0 :             let request = build_node_registeration_request(conf, node_ip_addr)?;
     126            0 :             let cancel = CancellationToken::new();
     127            0 :             let request_url = hcc_base_url.clone().join("/hadron-internal/v1/sk")?;
     128              : 
     129            0 :             backoff::retry(
     130            0 :                 || async {
     131            0 :                     send_safekeeper_register_request(&request_url, &auth_token, &request).await
     132            0 :                 },
     133              :                 |_| false,
     134              :                 3,
     135              :                 u32::MAX,
     136            0 :                 "Calling the HCC safekeeper register API",
     137            0 :                 &cancel,
     138              :             )
     139            0 :             .await
     140            0 :             .ok_or(anyhow::anyhow!(
     141            0 :                 "Error in forever retry loop. This error should never be surfaced."
     142            0 :             ))?
     143              :         }
     144              :     }
     145            0 : }
     146              : 
     147            0 : async fn safekeeper_list_timelines_request(
     148            0 :     conf: &SafeKeeperConf,
     149            0 : ) -> Result<pageserver_api::controller_api::SafekeeperTimelinesResponse> {
     150            0 :     if conf.hcc_base_url.is_none() {
     151            0 :         tracing::info!("HCC base URL is not set, skipping registration");
     152            0 :         return Err(anyhow::anyhow!("HCC base URL is not set"));
     153            0 :     }
     154              : 
     155              :     // The following operations acquiring the auth token and the node IP address both read environment
     156              :     // variables. It's fine for now as this `register()` function is only called once during startup.
     157              :     // If we start to talk to HCC more regularly in the safekeeper we should probably consider
     158              :     // refactoring things into a "HadronClusterCoordinatorClient" struct.
     159            0 :     let auth_token = get_hcc_auth_token();
     160            0 :     let method = format!("/control/v1/safekeeper/{}/timelines", conf.my_id.0);
     161            0 :     let request_url = conf.hcc_base_url.as_ref().unwrap().clone().join(&method)?;
     162              : 
     163            0 :     let client = reqwest::Client::new();
     164            0 :     let mut req_builder = client
     165            0 :         .get(request_url.clone())
     166            0 :         .header("Content-Type", "application/json")
     167            0 :         .query(&[("id", conf.my_id.0)]);
     168            0 :     if let Some(token) = auth_token {
     169            0 :         req_builder = req_builder.bearer_auth(token);
     170            0 :     }
     171            0 :     let response = req_builder
     172            0 :         .send()
     173            0 :         .await?
     174            0 :         .error_for_status()?
     175            0 :         .json::<pageserver_api::controller_api::SafekeeperTimelinesResponse>()
     176            0 :         .await?;
     177            0 :     Ok(response)
     178            0 : }
     179              : 
     180              : // Returns true on success, false otherwise.
     181            0 : pub async fn hcc_pull_timeline(
     182            0 :     timeline: SafekeeperTimeline,
     183            0 :     conf: &SafeKeeperConf,
     184            0 :     global_timelines: Arc<GlobalTimelines>,
     185            0 :     nodeid_http: &HashMap<u64, String>,
     186            0 : ) -> bool {
     187            0 :     let mut request = PullTimelineRequest {
     188            0 :         tenant_id: timeline.tenant_id,
     189            0 :         timeline_id: timeline.timeline_id,
     190            0 :         http_hosts: Vec::new(),
     191            0 :         ignore_tombstone: None,
     192            0 :     };
     193            0 :     for host in timeline.peers {
     194            0 :         if host.0 == conf.my_id.0 {
     195            0 :             continue;
     196            0 :         }
     197            0 :         if let Some(http_host) = nodeid_http.get(&host.0) {
     198            0 :             request.http_hosts.push(http_host.clone());
     199            0 :         }
     200              :     }
     201              : 
     202            0 :     let ca_certs = match conf
     203            0 :         .ssl_ca_certs
     204            0 :         .iter()
     205            0 :         .map(Pem::contents)
     206            0 :         .map(reqwest::Certificate::from_der)
     207            0 :         .collect::<Result<Vec<_>, _>>()
     208              :     {
     209            0 :         Ok(result) => result,
     210              :         Err(_) => {
     211            0 :             return false;
     212              :         }
     213              :     };
     214            0 :     match pull_timeline::handle_request(
     215            0 :         request,
     216            0 :         conf.sk_auth_token.clone(),
     217            0 :         ca_certs,
     218            0 :         global_timelines.clone(),
     219              :         true,
     220              :     )
     221            0 :     .await
     222              :     {
     223            0 :         Ok(resp) => {
     224            0 :             tracing::info!(
     225            0 :                 "Completed pulling tenant {} timeline {} from SK {:?}",
     226              :                 timeline.tenant_id,
     227              :                 timeline.timeline_id,
     228              :                 resp.safekeeper_host
     229              :             );
     230            0 :             return true;
     231              :         }
     232            0 :         Err(e) => {
     233            0 :             tracing::error!(
     234            0 :                 "Failed to pull tenant {} timeline {} from SK {}",
     235              :                 timeline.tenant_id,
     236              :                 timeline.timeline_id,
     237              :                 e
     238              :             );
     239              : 
     240            0 :             let ttid = TenantTimelineId {
     241            0 :                 tenant_id: timeline.tenant_id,
     242            0 :                 timeline_id: timeline.timeline_id,
     243            0 :             };
     244              :             // Revert the failed timeline pull.
     245              :             // Notice that not found timeline returns OK also.
     246            0 :             match global_timelines
     247            0 :                 .delete_or_exclude(&ttid, DeleteOrExclude::DeleteLocal)
     248            0 :                 .await
     249              :             {
     250            0 :                 Ok(dr) => {
     251            0 :                     tracing::info!(
     252            0 :                         "Deleted tenant {} timeline {} DirExists: {}",
     253              :                         timeline.tenant_id,
     254              :                         timeline.timeline_id,
     255              :                         dr.dir_existed,
     256              :                     );
     257              :                 }
     258            0 :                 Err(e) => {
     259            0 :                     tracing::error!(
     260            0 :                         "Failed to delete tenant {} timeline {} from global_timelines: {}",
     261              :                         timeline.tenant_id,
     262              :                         timeline.timeline_id,
     263              :                         e
     264              :                     );
     265              :                 }
     266              :             }
     267              :         }
     268              :     }
     269            0 :     false
     270            0 : }
     271              : 
     272            0 : pub async fn hcc_pull_timeline_till_success(
     273            0 :     timeline: SafekeeperTimeline,
     274            0 :     conf: &SafeKeeperConf,
     275            0 :     global_timelines: Arc<GlobalTimelines>,
     276            0 :     nodeid_http: &HashMap<u64, String>,
     277            0 : ) {
     278              :     const MAX_PULL_TIMELINE_RETRIES: u64 = 100;
     279            0 :     for i in 0..MAX_PULL_TIMELINE_RETRIES {
     280            0 :         if hcc_pull_timeline(
     281            0 :             timeline.clone(),
     282            0 :             conf,
     283            0 :             global_timelines.clone(),
     284            0 :             nodeid_http,
     285            0 :         )
     286            0 :         .await
     287              :         {
     288            0 :             SK_RECOVERY_PULL_TIMELINE_OKS.inc();
     289            0 :             return;
     290            0 :         }
     291            0 :         tracing::error!(
     292            0 :             "Failed to pull timeline {} from SK peers, retrying {}/{}",
     293              :             timeline.timeline_id,
     294            0 :             i + 1,
     295              :             MAX_PULL_TIMELINE_RETRIES
     296              :         );
     297            0 :         tokio::time::sleep(std::time::Duration::from_secs(1)).await;
     298              :     }
     299            0 :     SK_RECOVERY_PULL_TIMELINE_ERRORS.inc();
     300            0 : }
     301              : 
     302            0 : pub async fn hcc_pull_timelines(
     303            0 :     conf: &SafeKeeperConf,
     304            0 :     global_timelines: Arc<GlobalTimelines>,
     305            0 : ) -> Result<()> {
     306            0 :     let _timer = SK_RECOVERY_PULL_TIMELINES_SECONDS.start_timer();
     307            0 :     tracing::info!("Start pulling timelines from SK peers");
     308              : 
     309            0 :     let mut response = SafekeeperTimelinesResponse {
     310            0 :         timelines: Vec::new(),
     311            0 :         safekeeper_peers: Vec::new(),
     312            0 :     };
     313            0 :     for i in 0..100 {
     314            0 :         match safekeeper_list_timelines_request(conf).await {
     315            0 :             Ok(timelines) => {
     316            0 :                 response = timelines;
     317            0 :             }
     318            0 :             Err(e) => {
     319            0 :                 tracing::error!("Failed to list timelines from HCC: {}", e);
     320            0 :                 if i == 99 {
     321            0 :                     return Err(e);
     322            0 :                 }
     323              :             }
     324              :         }
     325            0 :         sleep(Duration::from_millis(100)).await;
     326              :     }
     327              : 
     328            0 :     let mut nodeid_http = HashMap::new();
     329            0 :     for sk in response.safekeeper_peers {
     330            0 :         nodeid_http.insert(
     331            0 :             sk.node_id.0,
     332            0 :             format!("http://{}:{}", sk.listen_http_addr, sk.http_port),
     333            0 :         );
     334            0 :     }
     335            0 :     tracing::info!("Received {} timelines from HCC", response.timelines.len());
     336            0 :     for timeline in response.timelines {
     337            0 :         let _timer = SK_RECOVERY_PULL_TIMELINE_SECONDS
     338            0 :             .with_label_values(&[
     339            0 :                 &timeline.tenant_id.to_string(),
     340            0 :                 &timeline.timeline_id.to_string(),
     341            0 :             ])
     342            0 :             .start_timer();
     343            0 :         hcc_pull_timeline_till_success(timeline, conf, global_timelines.clone(), &nodeid_http)
     344            0 :             .await;
     345              :     }
     346            0 :     Ok(())
     347            0 : }
     348              : 
     349              : #[cfg(test)]
     350              : mod tests {
     351              :     use super::*;
     352              :     use utils::id::NodeId;
     353              : 
     354              :     #[test]
     355            1 :     fn test_build_node_registeration_request() {
     356              :         // Test that:
     357              :         // 1. We always extract the host name and port used to register with the HCC from the
     358              :         //    `advertise_pg_addr` if it is set.
     359              :         // 2. The correct ports are extracted from `advertise_pg_addr` and `listen_http_addr`.
     360            1 :         let mut conf = SafeKeeperConf::dummy();
     361            1 :         conf.my_id = NodeId(1);
     362            1 :         conf.advertise_pg_addr_tenant_only =
     363            1 :             Some("safe-keeper-1.safe-keeper.hadron.svc.cluster.local:5454".to_string());
     364              :         // `listen_pg_addr` and `listen_pg_addr_tenant_only` are not used for node registration. Set them to a different
     365              :         // host and port values and make sure that they don't show up in the node registration request.
     366            1 :         conf.listen_pg_addr = "0.0.0.0:5456".to_string();
     367            1 :         conf.listen_pg_addr_tenant_only = Some("0.0.0.0:5456".to_string());
     368            1 :         conf.listen_http_addr = "0.0.0.0:7676".to_string();
     369            1 :         let node_ip_addr: Option<IpAddr> = Some("127.0.0.1".parse().unwrap());
     370              : 
     371            1 :         let request = build_node_registeration_request(&conf, node_ip_addr).unwrap();
     372            1 :         assert_eq!(request.node_id, NodeId(1));
     373            1 :         assert_eq!(
     374              :             request.listen_pg_addr,
     375              :             "safe-keeper-1.safe-keeper.hadron.svc.cluster.local"
     376              :         );
     377            1 :         assert_eq!(request.listen_pg_port, 5454);
     378            1 :         assert_eq!(
     379              :             request.listen_http_addr,
     380              :             "safe-keeper-1.safe-keeper.hadron.svc.cluster.local"
     381              :         );
     382            1 :         assert_eq!(request.listen_http_port, 7676);
     383            1 :         assert_eq!(
     384              :             request.node_ip_addr,
     385            1 :             Some(IpAddr::V4("127.0.0.1".parse().unwrap()))
     386              :         );
     387            1 :     }
     388              : }
        

Generated by: LCOV version 2.1-beta