Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use futures::Future;
4 : use pageserver_api::{
5 : control_api::{
6 : ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
7 : },
8 : shard::TenantShardId,
9 : };
10 : use serde::{de::DeserializeOwned, Serialize};
11 : use tokio_util::sync::CancellationToken;
12 : use url::Url;
13 : use utils::{backoff, generation::Generation, id::NodeId};
14 :
15 : use crate::config::PageServerConf;
16 :
17 : /// The Pageserver's client for using the control plane API: this is a small subset
18 : /// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
19 : pub struct ControlPlaneClient {
20 : http_client: reqwest::Client,
21 : base_url: Url,
22 : node_id: NodeId,
23 : cancel: CancellationToken,
24 : }
25 :
26 : /// Represent operations which internally retry on all errors other than
27 : /// cancellation token firing: the only way they can fail is ShuttingDown.
28 : pub enum RetryForeverError {
29 : ShuttingDown,
30 : }
31 :
32 : pub trait ControlPlaneGenerationsApi {
33 : fn re_attach(
34 : &self,
35 : ) -> impl Future<Output = Result<HashMap<TenantShardId, Generation>, RetryForeverError>> + Send;
36 : fn validate(
37 : &self,
38 : tenants: Vec<(TenantShardId, Generation)>,
39 : ) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
40 : }
41 :
42 : impl ControlPlaneClient {
43 : /// A None return value indicates that the input `conf` object does not have control
44 : /// plane API enabled.
45 1247 : pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
46 1247 : let mut url = match conf.control_plane_api.as_ref() {
47 1245 : Some(u) => u.clone(),
48 2 : None => return None,
49 : };
50 :
51 1245 : if let Ok(mut segs) = url.path_segments_mut() {
52 1245 : // This ensures that `url` ends with a slash if it doesn't already.
53 1245 : // That way, we can subsequently use join() to safely attach extra path elements.
54 1245 : segs.pop_if_empty().push("");
55 1245 : }
56 :
57 1245 : let mut client = reqwest::ClientBuilder::new();
58 :
59 1245 : if let Some(jwt) = &conf.control_plane_api_token {
60 22 : let mut headers = hyper::HeaderMap::new();
61 22 : headers.insert(
62 22 : "Authorization",
63 22 : format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
64 22 : );
65 22 : client = client.default_headers(headers);
66 1223 : }
67 :
68 1245 : Some(Self {
69 1245 : http_client: client.build().expect("Failed to construct HTTP client"),
70 1245 : base_url: url,
71 1245 : node_id: conf.id,
72 1245 : cancel: cancel.clone(),
73 1245 : })
74 1247 : }
75 :
76 1020 : async fn retry_http_forever<R, T>(
77 1020 : &self,
78 1020 : url: &url::Url,
79 1020 : request: R,
80 1020 : ) -> Result<T, RetryForeverError>
81 1020 : where
82 1020 : R: Serialize,
83 1020 : T: DeserializeOwned,
84 1020 : {
85 1020 : let res = backoff::retry(
86 1030 : || async {
87 1030 : let response = self
88 1030 : .http_client
89 1030 : .post(url.clone())
90 1030 : .json(&request)
91 1030 : .send()
92 2764 : .await?;
93 :
94 1019 : response.error_for_status_ref()?;
95 1019 : response.json::<T>().await
96 2060 : },
97 1020 : |_| false,
98 1020 : 3,
99 1020 : u32::MAX,
100 1020 : "calling control plane generation validation API",
101 1020 : &self.cancel,
102 1020 : )
103 2872 : .await
104 1020 : .ok_or(RetryForeverError::ShuttingDown)?
105 1019 : .expect("We retry forever, this should never be reached");
106 1019 :
107 1019 : Ok(res)
108 1020 : }
109 : }
110 :
111 : impl ControlPlaneGenerationsApi for ControlPlaneClient {
112 : /// Block until we get a successful response, or error out if we are shut down
113 622 : async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
114 622 : let re_attach_path = self
115 622 : .base_url
116 622 : .join("re-attach")
117 622 : .expect("Failed to build re-attach path");
118 622 : let request = ReAttachRequest {
119 622 : node_id: self.node_id,
120 622 : };
121 :
122 0 : fail::fail_point!("control-plane-client-re-attach");
123 :
124 1914 : let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
125 622 : tracing::info!(
126 622 : "Received re-attach response with {} tenants",
127 622 : response.tenants.len()
128 622 : );
129 :
130 622 : Ok(response
131 622 : .tenants
132 622 : .into_iter()
133 622 : .map(|t| (t.id, Generation::new(t.gen)))
134 622 : .collect::<HashMap<_, _>>())
135 622 : }
136 :
137 : /// Block until we get a successful response, or error out if we are shut down
138 398 : async fn validate(
139 398 : &self,
140 398 : tenants: Vec<(TenantShardId, Generation)>,
141 398 : ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
142 398 : let re_attach_path = self
143 398 : .base_url
144 398 : .join("validate")
145 398 : .expect("Failed to build validate path");
146 398 :
147 398 : let request = ValidateRequest {
148 398 : tenants: tenants
149 398 : .into_iter()
150 480 : .map(|(id, gen)| ValidateRequestTenant {
151 480 : id,
152 480 : gen: gen
153 480 : .into()
154 480 : .expect("Generation should always be valid for a Tenant doing deletions"),
155 480 : })
156 398 : .collect(),
157 398 : };
158 :
159 0 : fail::fail_point!("control-plane-client-validate");
160 :
161 958 : let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
162 :
163 397 : Ok(response
164 397 : .tenants
165 397 : .into_iter()
166 479 : .map(|rt| (rt.id, rt.valid))
167 397 : .collect())
168 398 : }
169 : }
|