Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use futures::Future;
4 : use pageserver_api::{
5 : shard::TenantShardId,
6 : upcall_api::{
7 : ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
8 : },
9 : };
10 : use serde::{de::DeserializeOwned, Serialize};
11 : use tokio_util::sync::CancellationToken;
12 : use url::Url;
13 : use utils::{backoff, generation::Generation, id::NodeId};
14 :
15 : use crate::config::PageServerConf;
16 :
17 : /// The Pageserver's client for using the control plane API: this is a small subset
18 : /// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
19 : pub struct ControlPlaneClient {
20 : http_client: reqwest::Client,
21 : base_url: Url,
22 : node_id: NodeId,
23 : cancel: CancellationToken,
24 : }
25 :
26 : /// Represent operations which internally retry on all errors other than
27 : /// cancellation token firing: the only way they can fail is ShuttingDown.
28 : pub enum RetryForeverError {
29 : ShuttingDown,
30 : }
31 :
32 : pub trait ControlPlaneGenerationsApi {
33 : fn re_attach(
34 : &self,
35 : ) -> impl Future<Output = Result<HashMap<TenantShardId, Generation>, RetryForeverError>> + Send;
36 : fn validate(
37 : &self,
38 : tenants: Vec<(TenantShardId, Generation)>,
39 : ) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
40 : }
41 :
42 : impl ControlPlaneClient {
43 : /// A None return value indicates that the input `conf` object does not have control
44 : /// plane API enabled.
45 0 : pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
46 0 : let mut url = match conf.control_plane_api.as_ref() {
47 0 : Some(u) => u.clone(),
48 0 : None => return None,
49 : };
50 :
51 0 : if let Ok(mut segs) = url.path_segments_mut() {
52 0 : // This ensures that `url` ends with a slash if it doesn't already.
53 0 : // That way, we can subsequently use join() to safely attach extra path elements.
54 0 : segs.pop_if_empty().push("");
55 0 : }
56 :
57 0 : let mut client = reqwest::ClientBuilder::new();
58 :
59 0 : if let Some(jwt) = &conf.control_plane_api_token {
60 0 : let mut headers = hyper::HeaderMap::new();
61 0 : headers.insert(
62 0 : "Authorization",
63 0 : format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
64 0 : );
65 0 : client = client.default_headers(headers);
66 0 : }
67 :
68 0 : Some(Self {
69 0 : http_client: client.build().expect("Failed to construct HTTP client"),
70 0 : base_url: url,
71 0 : node_id: conf.id,
72 0 : cancel: cancel.clone(),
73 0 : })
74 0 : }
75 :
76 0 : async fn retry_http_forever<R, T>(
77 0 : &self,
78 0 : url: &url::Url,
79 0 : request: R,
80 0 : ) -> Result<T, RetryForeverError>
81 0 : where
82 0 : R: Serialize,
83 0 : T: DeserializeOwned,
84 0 : {
85 0 : let res = backoff::retry(
86 0 : || async {
87 0 : let response = self
88 0 : .http_client
89 0 : .post(url.clone())
90 0 : .json(&request)
91 0 : .send()
92 0 : .await?;
93 :
94 0 : response.error_for_status_ref()?;
95 0 : response.json::<T>().await
96 0 : },
97 0 : |_| false,
98 0 : 3,
99 0 : u32::MAX,
100 0 : "calling control plane generation validation API",
101 0 : &self.cancel,
102 0 : )
103 0 : .await
104 0 : .ok_or(RetryForeverError::ShuttingDown)?
105 0 : .expect("We retry forever, this should never be reached");
106 0 :
107 0 : Ok(res)
108 0 : }
109 : }
110 :
111 : impl ControlPlaneGenerationsApi for ControlPlaneClient {
112 : /// Block until we get a successful response, or error out if we are shut down
113 0 : async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
114 0 : let re_attach_path = self
115 0 : .base_url
116 0 : .join("re-attach")
117 0 : .expect("Failed to build re-attach path");
118 0 : let request = ReAttachRequest {
119 0 : node_id: self.node_id,
120 0 : };
121 :
122 0 : fail::fail_point!("control-plane-client-re-attach");
123 :
124 0 : let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
125 0 : tracing::info!(
126 0 : "Received re-attach response with {} tenants",
127 0 : response.tenants.len()
128 0 : );
129 :
130 0 : Ok(response
131 0 : .tenants
132 0 : .into_iter()
133 0 : .map(|t| (t.id, Generation::new(t.gen)))
134 0 : .collect::<HashMap<_, _>>())
135 0 : }
136 :
137 : /// Block until we get a successful response, or error out if we are shut down
138 0 : async fn validate(
139 0 : &self,
140 0 : tenants: Vec<(TenantShardId, Generation)>,
141 0 : ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
142 0 : let re_attach_path = self
143 0 : .base_url
144 0 : .join("validate")
145 0 : .expect("Failed to build validate path");
146 0 :
147 0 : let request = ValidateRequest {
148 0 : tenants: tenants
149 0 : .into_iter()
150 0 : .map(|(id, gen)| ValidateRequestTenant {
151 0 : id,
152 0 : gen: gen
153 0 : .into()
154 0 : .expect("Generation should always be valid for a Tenant doing deletions"),
155 0 : })
156 0 : .collect(),
157 0 : };
158 :
159 0 : fail::fail_point!("control-plane-client-validate");
160 :
161 0 : let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
162 :
163 0 : Ok(response
164 0 : .tenants
165 0 : .into_iter()
166 0 : .map(|rt| (rt.id, rt.valid))
167 0 : .collect())
168 0 : }
169 : }
|