LCOV - differential code coverage report
Current view: top level - pageserver/src - control_plane_client.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 92.4 % 118 109 9 109
Current Date: 2024-01-09 02:06:09 Functions: 78.3 % 23 18 5 18
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use std::collections::HashMap;
       2                 : 
       3                 : use pageserver_api::{
       4                 :     control_api::{
       5                 :         ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
       6                 :     },
       7                 :     shard::TenantShardId,
       8                 : };
       9                 : use serde::{de::DeserializeOwned, Serialize};
      10                 : use tokio_util::sync::CancellationToken;
      11                 : use url::Url;
      12                 : use utils::{backoff, generation::Generation, id::NodeId};
      13                 : 
      14                 : use crate::config::PageServerConf;
      15                 : 
      16                 : /// The Pageserver's client for using the control plane API: this is a small subset
      17                 : /// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
      18                 : pub struct ControlPlaneClient {
      19                 :     http_client: reqwest::Client,
      20                 :     base_url: Url,
      21                 :     node_id: NodeId,
      22                 :     cancel: CancellationToken,
      23                 : }
      24                 : 
      25                 : /// Represent operations which internally retry on all errors other than
      26                 : /// cancellation token firing: the only way they can fail is ShuttingDown.
      27                 : pub enum RetryForeverError {
      28                 :     ShuttingDown,
      29                 : }
      30                 : 
      31                 : #[async_trait::async_trait]
      32                 : pub trait ControlPlaneGenerationsApi {
      33                 :     async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError>;
      34                 :     async fn validate(
      35                 :         &self,
      36                 :         tenants: Vec<(TenantShardId, Generation)>,
      37                 :     ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError>;
      38                 : }
      39                 : 
      40                 : impl ControlPlaneClient {
      41                 :     /// A None return value indicates that the input `conf` object does not have control
      42                 :     /// plane API enabled.
      43 CBC        1113 :     pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
      44            1113 :         let mut url = match conf.control_plane_api.as_ref() {
      45            1111 :             Some(u) => u.clone(),
      46               2 :             None => return None,
      47                 :         };
      48                 : 
      49            1111 :         if let Ok(mut segs) = url.path_segments_mut() {
      50            1111 :             // This ensures that `url` ends with a slash if it doesn't already.
      51            1111 :             // That way, we can subsequently use join() to safely attach extra path elements.
      52            1111 :             segs.pop_if_empty().push("");
      53            1111 :         }
      54                 : 
      55            1111 :         let mut client = reqwest::ClientBuilder::new();
      56                 : 
      57            1111 :         if let Some(jwt) = &conf.control_plane_api_token {
      58 UBC           0 :             let mut headers = hyper::HeaderMap::new();
      59               0 :             headers.insert(
      60               0 :                 "Authorization",
      61               0 :                 format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
      62               0 :             );
      63               0 :             client = client.default_headers(headers);
      64 CBC        1111 :         }
      65                 : 
      66            1111 :         Some(Self {
      67            1111 :             http_client: client.build().expect("Failed to construct HTTP client"),
      68            1111 :             base_url: url,
      69            1111 :             node_id: conf.id,
      70            1111 :             cancel: cancel.clone(),
      71            1111 :         })
      72            1113 :     }
      73                 : 
      74            1000 :     async fn retry_http_forever<R, T>(
      75            1000 :         &self,
      76            1000 :         url: &url::Url,
      77            1000 :         request: R,
      78            1000 :     ) -> Result<T, RetryForeverError>
      79            1000 :     where
      80            1000 :         R: Serialize,
      81            1000 :         T: DeserializeOwned,
      82            1000 :     {
      83            1000 :         #[derive(thiserror::Error, Debug)]
      84            1000 :         enum RemoteAttemptError {
      85            1000 :             #[error("shutdown")]
      86            1000 :             Shutdown,
      87            1000 :             #[error("remote: {0}")]
      88            1000 :             Remote(reqwest::Error),
      89            1000 :         }
      90            1000 : 
      91            1000 :         match backoff::retry(
      92            1005 :             || async {
      93            1005 :                 let response = self
      94            1005 :                     .http_client
      95            1005 :                     .post(url.clone())
      96            1005 :                     .json(&request)
      97            1005 :                     .send()
      98            2573 :                     .await
      99            1005 :                     .map_err(RemoteAttemptError::Remote)?;
     100                 : 
     101             999 :                 response
     102             999 :                     .error_for_status_ref()
     103             999 :                     .map_err(RemoteAttemptError::Remote)?;
     104             999 :                 response
     105             999 :                     .json::<T>()
     106              66 :                     .await
     107             999 :                     .map_err(RemoteAttemptError::Remote)
     108            1005 :             },
     109            1000 :             |_| false,
     110            1000 :             3,
     111            1000 :             u32::MAX,
     112            1000 :             "calling control plane generation validation API",
     113            1000 :             backoff::Cancel::new(self.cancel.clone(), || RemoteAttemptError::Shutdown),
     114            1000 :         )
     115            2645 :         .await
     116                 :         {
     117               1 :             Err(RemoteAttemptError::Shutdown) => Err(RetryForeverError::ShuttingDown),
     118                 :             Err(RemoteAttemptError::Remote(_)) => {
     119 UBC           0 :                 panic!("We retry forever, this should never be reached");
     120                 :             }
     121 CBC         999 :             Ok(r) => Ok(r),
     122                 :         }
     123            1000 :     }
     124                 : }
     125                 : 
     126                 : #[async_trait::async_trait]
     127                 : impl ControlPlaneGenerationsApi for ControlPlaneClient {
     128                 :     /// Block until we get a successful response, or error out if we are shut down
     129             555 :     async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
     130             555 :         let re_attach_path = self
     131             555 :             .base_url
     132             555 :             .join("re-attach")
     133             555 :             .expect("Failed to build re-attach path");
     134             555 :         let request = ReAttachRequest {
     135             555 :             node_id: self.node_id,
     136             555 :         };
     137                 : 
     138 UBC           0 :         fail::fail_point!("control-plane-client-re-attach");
     139                 : 
     140 CBC        1686 :         let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
     141             555 :         tracing::info!(
     142             555 :             "Received re-attach response with {} tenants",
     143             555 :             response.tenants.len()
     144             555 :         );
     145                 : 
     146             555 :         Ok(response
     147             555 :             .tenants
     148             555 :             .into_iter()
     149             555 :             .map(|t| (t.id, Generation::new(t.gen)))
     150             555 :             .collect::<HashMap<_, _>>())
     151            1110 :     }
     152                 : 
     153                 :     /// Block until we get a successful response, or error out if we are shut down
     154             447 :     async fn validate(
     155             447 :         &self,
     156             447 :         tenants: Vec<(TenantShardId, Generation)>,
     157             447 :     ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
     158             447 :         let re_attach_path = self
     159             447 :             .base_url
     160             447 :             .join("validate")
     161             447 :             .expect("Failed to build validate path");
     162             447 : 
     163             447 :         let request = ValidateRequest {
     164             447 :             tenants: tenants
     165             447 :                 .into_iter()
     166             514 :                 .map(|(id, gen)| ValidateRequestTenant {
     167             514 :                     id,
     168             514 :                     gen: gen
     169             514 :                         .into()
     170             514 :                         .expect("Generation should always be valid for a Tenant doing deletions"),
     171             514 :                 })
     172             447 :                 .collect(),
     173             447 :         };
     174                 : 
     175 UBC           0 :         fail::fail_point!("control-plane-client-validate");
     176                 : 
     177 CBC         959 :         let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
     178                 : 
     179             444 :         Ok(response
     180             444 :             .tenants
     181             444 :             .into_iter()
     182             511 :             .map(|rt| (rt.id, rt.valid))
     183             444 :             .collect())
     184             892 :     }
     185                 : }
        

Generated by: LCOV version 2.1-beta