LCOV - code coverage report
Current view: top level - pageserver/src - controller_upcall_client.rs (source / functions) Coverage Total Hit
Test: 4be46b1c0003aa3bbac9ade362c676b419df4c20.info Lines: 0.0 % 50 0
Test Date: 2025-07-22 17:50:06 Functions: 0.0 % 14 0

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::net::IpAddr;
       3              : 
       4              : use futures::Future;
       5              : use pageserver_api::config::NodeMetadata;
       6              : use pageserver_api::controller_api::{AvailabilityZone, NodeRegisterRequest};
       7              : use pageserver_api::models::ShardImportStatus;
       8              : use pageserver_api::shard::TenantShardId;
       9              : use pageserver_api::upcall_api::{
      10              :     PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant,
      11              :     TimelineImportStatusRequest, ValidateRequest, ValidateRequestTenant, ValidateResponse,
      12              : };
      13              : use reqwest::Certificate;
      14              : use serde::Serialize;
      15              : use serde::de::DeserializeOwned;
      16              : use tokio_util::sync::CancellationToken;
      17              : use url::Url;
      18              : use utils::generation::Generation;
      19              : use utils::id::{NodeId, TimelineId};
      20              : use utils::{backoff, failpoint_support, ip_address};
      21              : 
      22              : use crate::config::PageServerConf;
      23              : use crate::virtual_file::on_fatal_io_error;
      24              : 
      25              : /// The Pageserver's client for using the storage controller upcall API: this is a small API
      26              : /// for dealing with generations (see docs/rfcs/025-generation-numbers.md).
      27              : pub struct StorageControllerUpcallClient {
      28              :     http_client: reqwest::Client,
      29              :     base_url: Url,
      30              :     node_id: NodeId,
      31              :     node_ip_addr: Option<IpAddr>,
      32              :     cancel: CancellationToken,
      33              : }
      34              : 
      35              : /// Represent operations which internally retry on all errors other than
      36              : /// cancellation token firing: the only way they can fail is ShuttingDown.
      37              : pub enum RetryForeverError {
      38              :     ShuttingDown,
      39              : }
      40              : 
      41              : pub trait StorageControllerUpcallApi {
      42              :     fn re_attach(
      43              :         &self,
      44              :         conf: &PageServerConf,
      45              :         empty_local_disk: bool,
      46              :     ) -> impl Future<
      47              :         Output = Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError>,
      48              :     > + Send;
      49              :     fn validate(
      50              :         &self,
      51              :         tenants: Vec<(TenantShardId, Generation)>,
      52              :     ) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
      53              :     fn put_timeline_import_status(
      54              :         &self,
      55              :         tenant_shard_id: TenantShardId,
      56              :         timeline_id: TimelineId,
      57              :         generation: Generation,
      58              :         status: ShardImportStatus,
      59              :     ) -> impl Future<Output = Result<(), RetryForeverError>> + Send;
      60              :     fn get_timeline_import_status(
      61              :         &self,
      62              :         tenant_shard_id: TenantShardId,
      63              :         timeline_id: TimelineId,
      64              :         generation: Generation,
      65              :     ) -> impl Future<Output = Result<ShardImportStatus, RetryForeverError>> + Send;
      66              : }
      67              : 
      68              : impl StorageControllerUpcallClient {
      69              :     /// A None return value indicates that the input `conf` object does not have control
      70              :     /// plane API enabled.
      71            0 :     pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Self {
      72            0 :         let mut url = conf.control_plane_api.clone();
      73              : 
      74            0 :         if let Ok(mut segs) = url.path_segments_mut() {
      75            0 :             // This ensures that `url` ends with a slash if it doesn't already.
      76            0 :             // That way, we can subsequently use join() to safely attach extra path elements.
      77            0 :             segs.pop_if_empty().push("");
      78            0 :         }
      79              : 
      80            0 :         let mut client = reqwest::ClientBuilder::new();
      81              : 
      82            0 :         if let Some(jwt) = &conf.control_plane_api_token {
      83            0 :             let mut headers = reqwest::header::HeaderMap::new();
      84            0 :             headers.insert(
      85            0 :                 "Authorization",
      86            0 :                 format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
      87            0 :             );
      88            0 :             client = client.default_headers(headers);
      89            0 :         }
      90              : 
      91            0 :         for cert in &conf.ssl_ca_certs {
      92            0 :             client = client.add_root_certificate(
      93            0 :                 Certificate::from_der(cert.contents()).expect("Invalid certificate in config"),
      94            0 :             );
      95            0 :         }
      96              : 
      97              :         // Intentionally panics if we encountered any errors parsing or reading the IP address.
      98              :         // Note that if the required environment variable is not set, `read_node_ip_addr_from_env` returns `Ok(None)`
      99              :         // instead of an error.
     100            0 :         let node_ip_addr =
     101            0 :             ip_address::read_node_ip_addr_from_env().expect("Error reading node IP address.");
     102              : 
     103            0 :         Self {
     104            0 :             http_client: client.build().expect("Failed to construct HTTP client"),
     105            0 :             base_url: url,
     106            0 :             node_id: conf.id,
     107            0 :             cancel: cancel.clone(),
     108            0 :             node_ip_addr,
     109            0 :         }
     110            0 :     }
     111              : 
     112              :     #[tracing::instrument(skip_all)]
     113              :     async fn retry_http_forever<R, T>(
     114              :         &self,
     115              :         url: &url::Url,
     116              :         request: R,
     117              :         method: reqwest::Method,
     118              :     ) -> Result<T, RetryForeverError>
     119              :     where
     120              :         R: Serialize,
     121              :         T: DeserializeOwned,
     122              :     {
     123              :         let res = backoff::retry(
     124            0 :             || async {
     125            0 :                 let response = self
     126            0 :                     .http_client
     127            0 :                     .request(method.clone(), url.clone())
     128            0 :                     .json(&request)
     129            0 :                     .send()
     130            0 :                     .await?;
     131              : 
     132            0 :                 response.error_for_status_ref()?;
     133            0 :                 response.json::<T>().await
     134            0 :             },
     135              :             |_| false,
     136              :             3,
     137              :             u32::MAX,
     138              :             "storage controller upcall",
     139              :             &self.cancel,
     140              :         )
     141              :         .await
     142              :         .ok_or(RetryForeverError::ShuttingDown)?
     143              :         .expect("We retry forever, this should never be reached");
     144              : 
     145              :         Ok(res)
     146              :     }
     147              : 
     148            0 :     pub(crate) fn base_url(&self) -> &Url {
     149            0 :         &self.base_url
     150            0 :     }
     151              : }
     152              : 
     153              : impl StorageControllerUpcallApi for StorageControllerUpcallClient {
     154              :     /// Block until we get a successful response, or error out if we are shut down
     155              :     #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
     156              :     async fn re_attach(
     157              :         &self,
     158              :         conf: &PageServerConf,
     159              :         empty_local_disk: bool,
     160              :     ) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
     161              :         let url = self
     162              :             .base_url
     163              :             .join("re-attach")
     164              :             .expect("Failed to build re-attach path");
     165              : 
     166              :         // Include registration content in the re-attach request if a metadata file is readable
     167              :         let metadata_path = conf.metadata_path();
     168              :         let register = match tokio::fs::read_to_string(&metadata_path).await {
     169              :             Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
     170              :                 Ok(m) => {
     171              :                     // Since we run one time at startup, be generous in our logging and
     172              :                     // dump all metadata.
     173              :                     tracing::info!("Loaded node metadata: {m}");
     174              : 
     175              :                     let az_id = {
     176              :                         let az_id_from_metadata = m
     177              :                             .other
     178              :                             .get("availability_zone_id")
     179            0 :                             .and_then(|jv| jv.as_str().map(|str| str.to_owned()));
     180              : 
     181              :                         match az_id_from_metadata {
     182              :                             Some(az_id) => Some(AvailabilityZone(az_id)),
     183              :                             None => {
     184              :                                 tracing::warn!(
     185              :                                     "metadata.json does not contain an 'availability_zone_id' field"
     186              :                                 );
     187              :                                 conf.availability_zone.clone().map(AvailabilityZone)
     188              :                             }
     189              :                         }
     190              :                     };
     191              : 
     192              :                     if az_id.is_none() {
     193              :                         panic!(
     194              :                             "Availablity zone id could not be inferred from metadata.json or pageserver config"
     195              :                         );
     196              :                     }
     197              : 
     198              :                     Some(NodeRegisterRequest {
     199              :                         node_id: conf.id,
     200              :                         listen_pg_addr: m.postgres_host,
     201              :                         listen_pg_port: m.postgres_port,
     202              :                         listen_grpc_addr: m.grpc_host,
     203              :                         listen_grpc_port: m.grpc_port,
     204              :                         listen_http_addr: m.http_host,
     205              :                         listen_http_port: m.http_port,
     206              :                         listen_https_port: m.https_port,
     207              :                         node_ip_addr: self.node_ip_addr,
     208              :                         availability_zone_id: az_id.expect("Checked above"),
     209              :                     })
     210              :                 }
     211              :                 Err(e) => {
     212              :                     tracing::error!("Unreadable metadata in {metadata_path}: {e}");
     213              :                     None
     214              :                 }
     215              :             },
     216              :             Err(e) => {
     217              :                 if e.kind() == std::io::ErrorKind::NotFound {
     218              :                     // This is legal: we may have been deployed with some external script
     219              :                     // doing registration for us.
     220              :                     tracing::info!("Metadata file not found at {metadata_path}");
     221              :                 } else {
     222              :                     on_fatal_io_error(&e, &format!("Loading metadata at {metadata_path}"))
     223              :                 }
     224              :                 None
     225              :             }
     226              :         };
     227              : 
     228              :         let request = ReAttachRequest {
     229              :             node_id: self.node_id,
     230              :             register: register.clone(),
     231              :             empty_local_disk: Some(empty_local_disk),
     232              :         };
     233              : 
     234              :         let response: ReAttachResponse = self
     235              :             .retry_http_forever(&url, request, reqwest::Method::POST)
     236              :             .await?;
     237              :         tracing::info!(
     238              :             "Received re-attach response with {} tenants (node {}, register: {:?})",
     239              :             response.tenants.len(),
     240              :             self.node_id,
     241              :             register,
     242              :         );
     243              : 
     244              :         failpoint_support::sleep_millis_async!("control-plane-client-re-attach");
     245              : 
     246              :         Ok(response
     247              :             .tenants
     248              :             .into_iter()
     249            0 :             .map(|rart| (rart.id, rart))
     250              :             .collect::<HashMap<_, _>>())
     251              :     }
     252              : 
     253              :     /// Block until we get a successful response, or error out if we are shut down
     254              :     #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
     255              :     async fn validate(
     256              :         &self,
     257              :         tenants: Vec<(TenantShardId, Generation)>,
     258              :     ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
     259              :         let url = self
     260              :             .base_url
     261              :             .join("validate")
     262              :             .expect("Failed to build validate path");
     263              : 
     264              :         // When sending validate requests, break them up into chunks so that we
     265              :         // avoid possible edge cases of generating any HTTP requests that
     266              :         // require database I/O across many thousands of tenants.
     267              :         let mut result: HashMap<TenantShardId, bool> = HashMap::with_capacity(tenants.len());
     268              :         for tenant_chunk in (tenants).chunks(128) {
     269              :             let request = ValidateRequest {
     270              :                 tenants: tenant_chunk
     271              :                     .iter()
     272              :                     .map(|(id, generation)| ValidateRequestTenant {
     273            0 :                         id: *id,
     274            0 :                         r#gen: (*generation).into().expect(
     275            0 :                             "Generation should always be valid for a Tenant doing deletions",
     276              :                         ),
     277            0 :                     })
     278              :                     .collect(),
     279              :             };
     280              : 
     281              :             failpoint_support::sleep_millis_async!(
     282              :                 "control-plane-client-validate-sleep",
     283              :                 &self.cancel
     284              :             );
     285              :             if self.cancel.is_cancelled() {
     286              :                 return Err(RetryForeverError::ShuttingDown);
     287              :             }
     288              : 
     289              :             let response: ValidateResponse = self
     290              :                 .retry_http_forever(&url, request, reqwest::Method::POST)
     291              :                 .await?;
     292              :             for rt in response.tenants {
     293              :                 result.insert(rt.id, rt.valid);
     294              :             }
     295              :         }
     296              : 
     297              :         Ok(result.into_iter().collect())
     298              :     }
     299              : 
     300              :     /// Send a shard import status to the storage controller
     301              :     ///
     302              :     /// The implementation must have at-least-once delivery semantics.
     303              :     /// To this end, we retry the request until it succeeds. If the pageserver
     304              :     /// restarts or crashes, the shard import will start again from the beggining.
     305              :     #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
     306              :     async fn put_timeline_import_status(
     307              :         &self,
     308              :         tenant_shard_id: TenantShardId,
     309              :         timeline_id: TimelineId,
     310              :         generation: Generation,
     311              :         status: ShardImportStatus,
     312              :     ) -> Result<(), RetryForeverError> {
     313              :         let url = self
     314              :             .base_url
     315              :             .join("timeline_import_status")
     316              :             .expect("Failed to build path");
     317              : 
     318              :         let request = PutTimelineImportStatusRequest {
     319              :             tenant_shard_id,
     320              :             timeline_id,
     321              :             generation,
     322              :             status,
     323              :         };
     324              : 
     325              :         self.retry_http_forever(&url, request, reqwest::Method::POST)
     326              :             .await
     327              :     }
     328              : 
     329              :     #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
     330              :     async fn get_timeline_import_status(
     331              :         &self,
     332              :         tenant_shard_id: TenantShardId,
     333              :         timeline_id: TimelineId,
     334              :         generation: Generation,
     335              :     ) -> Result<ShardImportStatus, RetryForeverError> {
     336              :         let url = self
     337              :             .base_url
     338              :             .join("timeline_import_status")
     339              :             .expect("Failed to build path");
     340              : 
     341              :         let request = TimelineImportStatusRequest {
     342              :             tenant_shard_id,
     343              :             timeline_id,
     344              :             generation,
     345              :         };
     346              : 
     347              :         let response: ShardImportStatus = self
     348              :             .retry_http_forever(&url, request, reqwest::Method::GET)
     349              :             .await?;
     350              :         Ok(response)
     351              :     }
     352              : }
        

Generated by: LCOV version 2.1-beta