LCOV - differential code coverage report
Current view: top level - pageserver/src - control_plane_client.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 95.7 % 117 112 5 112
Current Date: 2023-10-19 02:04:12 Functions: 80.0 % 25 20 5 20
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use std::collections::HashMap;
       2                 : 
       3                 : use pageserver_api::control_api::{
       4                 :     ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
       5                 : };
       6                 : use serde::{de::DeserializeOwned, Serialize};
       7                 : use tokio_util::sync::CancellationToken;
       8                 : use url::Url;
       9                 : use utils::{
      10                 :     backoff,
      11                 :     generation::Generation,
      12                 :     id::{NodeId, TenantId},
      13                 : };
      14                 : 
      15                 : use crate::config::PageServerConf;
      16                 : 
      17                 : /// The Pageserver's client for using the control plane API: this is a small subset
      18                 : /// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
      19                 : pub struct ControlPlaneClient {
      20                 :     http_client: reqwest::Client,
      21                 :     base_url: Url,
      22                 :     node_id: NodeId,
      23                 :     cancel: CancellationToken,
      24                 : }
      25                 : 
      26                 : /// Represent operations which internally retry on all errors other than
      27                 : /// cancellation token firing: the only way they can fail is ShuttingDown.
      28                 : pub enum RetryForeverError {
      29                 :     ShuttingDown,
      30                 : }
      31                 : 
      32                 : #[async_trait::async_trait]
      33                 : pub trait ControlPlaneGenerationsApi {
      34                 :     async fn re_attach(&self) -> Result<HashMap<TenantId, Generation>, RetryForeverError>;
      35                 :     async fn validate(
      36                 :         &self,
      37                 :         tenants: Vec<(TenantId, Generation)>,
      38                 :     ) -> Result<HashMap<TenantId, bool>, RetryForeverError>;
      39                 : }
      40                 : 
      41                 : impl ControlPlaneClient {
      42                 :     /// A None return value indicates that the input `conf` object does not have control
      43                 :     /// plane API enabled.
      44 CBC        1119 :     pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
      45            1119 :         let mut url = match conf.control_plane_api.as_ref() {
      46              61 :             Some(u) => u.clone(),
      47            1058 :             None => return None,
      48                 :         };
      49                 : 
      50              61 :         if let Ok(mut segs) = url.path_segments_mut() {
      51              61 :             // This ensures that `url` ends with a slash if it doesn't already.
      52              61 :             // That way, we can subsequently use join() to safely attach extra path elements.
      53              61 :             segs.pop_if_empty().push("");
      54              61 :         }
      55                 : 
      56              61 :         let mut client = reqwest::ClientBuilder::new();
      57                 : 
      58              61 :         if let Some(jwt) = &conf.control_plane_api_token {
      59 UBC           0 :             let mut headers = hyper::HeaderMap::new();
      60               0 :             headers.insert("Authorization", jwt.get_contents().parse().unwrap());
      61               0 :             client = client.default_headers(headers);
      62 CBC          61 :         }
      63                 : 
      64              61 :         Some(Self {
      65              61 :             http_client: client.build().expect("Failed to construct HTTP client"),
      66              61 :             base_url: url,
      67              61 :             node_id: conf.id,
      68              61 :             cancel: cancel.clone(),
      69              61 :         })
      70            1119 :     }
      71                 : 
      72              40 :     async fn retry_http_forever<R, T>(
      73              40 :         &self,
      74              40 :         url: &url::Url,
      75              40 :         request: R,
      76              40 :     ) -> Result<T, RetryForeverError>
      77              40 :     where
      78              40 :         R: Serialize,
      79              40 :         T: DeserializeOwned,
      80              40 :     {
      81              40 :         #[derive(thiserror::Error, Debug)]
      82              40 :         enum RemoteAttemptError {
      83              40 :             #[error("shutdown")]
      84              40 :             Shutdown,
      85              40 :             #[error("remote: {0}")]
      86              40 :             Remote(reqwest::Error),
      87              40 :         }
      88              40 : 
      89              40 :         match backoff::retry(
      90              48 :             || async {
      91              48 :                 let response = self
      92              48 :                     .http_client
      93              48 :                     .post(url.clone())
      94              48 :                     .json(&request)
      95              48 :                     .send()
      96             120 :                     .await
      97              48 :                     .map_err(RemoteAttemptError::Remote)?;
      98                 : 
      99              39 :                 response
     100              39 :                     .error_for_status_ref()
     101              39 :                     .map_err(RemoteAttemptError::Remote)?;
     102              39 :                 response
     103              39 :                     .json::<T>()
     104 UBC           0 :                     .await
     105 CBC          39 :                     .map_err(RemoteAttemptError::Remote)
     106              48 :             },
     107              40 :             |_| false,
     108              40 :             3,
     109              40 :             u32::MAX,
     110              40 :             "calling control plane generation validation API",
     111              40 :             backoff::Cancel::new(self.cancel.clone(), || RemoteAttemptError::Shutdown),
     112              40 :         )
     113             129 :         .await
     114                 :         {
     115               1 :             Err(RemoteAttemptError::Shutdown) => Err(RetryForeverError::ShuttingDown),
     116                 :             Err(RemoteAttemptError::Remote(_)) => {
     117 UBC           0 :                 panic!("We retry forever, this should never be reached");
     118                 :             }
     119 CBC          39 :             Ok(r) => Ok(r),
     120                 :         }
     121              40 :     }
     122                 : }
     123                 : 
     124                 : #[async_trait::async_trait]
     125                 : impl ControlPlaneGenerationsApi for ControlPlaneClient {
     126                 :     /// Block until we get a successful response, or error out if we are shut down
     127              30 :     async fn re_attach(&self) -> Result<HashMap<TenantId, Generation>, RetryForeverError> {
     128              30 :         let re_attach_path = self
     129              30 :             .base_url
     130              30 :             .join("re-attach")
     131              30 :             .expect("Failed to build re-attach path");
     132              30 :         let request = ReAttachRequest {
     133              30 :             node_id: self.node_id,
     134              30 :         };
     135              30 : 
     136              30 :         fail::fail_point!("control-plane-client-re-attach");
     137                 : 
     138              88 :         let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
     139              30 :         tracing::info!(
     140              30 :             "Received re-attach response with {} tenants",
     141              30 :             response.tenants.len()
     142              30 :         );
     143                 : 
     144              30 :         Ok(response
     145              30 :             .tenants
     146              30 :             .into_iter()
     147              30 :             .map(|t| (t.id, Generation::new(t.generation)))
     148              30 :             .collect::<HashMap<_, _>>())
     149              60 :     }
     150                 : 
     151                 :     /// Block until we get a successful response, or error out if we are shut down
     152              10 :     async fn validate(
     153              10 :         &self,
     154              10 :         tenants: Vec<(TenantId, Generation)>,
     155              10 :     ) -> Result<HashMap<TenantId, bool>, RetryForeverError> {
     156              10 :         let re_attach_path = self
     157              10 :             .base_url
     158              10 :             .join("validate")
     159              10 :             .expect("Failed to build validate path");
     160              10 : 
     161              10 :         let request = ValidateRequest {
     162              10 :             tenants: tenants
     163              10 :                 .into_iter()
     164              10 :                 .map(|(id, gen)| ValidateRequestTenant {
     165              10 :                     id,
     166              10 :                     gen: gen
     167              10 :                         .into()
     168              10 :                         .expect("Generation should always be valid for a Tenant doing deletions"),
     169              10 :                 })
     170              10 :                 .collect(),
     171              10 :         };
     172              10 : 
     173              10 :         fail::fail_point!("control-plane-client-validate");
     174                 : 
     175              41 :         let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
     176                 : 
     177               9 :         Ok(response
     178               9 :             .tenants
     179               9 :             .into_iter()
     180               9 :             .map(|rt| (rt.id, rt.valid))
     181               9 :             .collect())
     182              20 :     }
     183                 : }
        

Generated by: LCOV version 2.1-beta