Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use futures::Future;
4 : use pageserver_api::{
5 : control_api::{
6 : ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
7 : },
8 : shard::TenantShardId,
9 : };
10 : use serde::{de::DeserializeOwned, Serialize};
11 : use tokio_util::sync::CancellationToken;
12 : use url::Url;
13 : use utils::{backoff, generation::Generation, id::NodeId};
14 :
15 : use crate::config::PageServerConf;
16 :
17 : /// The Pageserver's client for using the control plane API: this is a small subset
18 : /// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
19 : pub struct ControlPlaneClient {
20 : http_client: reqwest::Client,
21 : base_url: Url,
22 : node_id: NodeId,
23 : cancel: CancellationToken,
24 : }
25 :
26 : /// Represent operations which internally retry on all errors other than
27 : /// cancellation token firing: the only way they can fail is ShuttingDown.
28 : pub enum RetryForeverError {
29 : ShuttingDown,
30 : }
31 :
32 : pub trait ControlPlaneGenerationsApi {
33 : fn re_attach(
34 : &self,
35 : ) -> impl Future<Output = Result<HashMap<TenantShardId, Generation>, RetryForeverError>> + Send;
36 : fn validate(
37 : &self,
38 : tenants: Vec<(TenantShardId, Generation)>,
39 : ) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
40 : }
41 :
42 : impl ControlPlaneClient {
43 : /// A None return value indicates that the input `conf` object does not have control
44 : /// plane API enabled.
45 1207 : pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
46 1207 : let mut url = match conf.control_plane_api.as_ref() {
47 1205 : Some(u) => u.clone(),
48 2 : None => return None,
49 : };
50 :
51 1205 : if let Ok(mut segs) = url.path_segments_mut() {
52 1205 : // This ensures that `url` ends with a slash if it doesn't already.
53 1205 : // That way, we can subsequently use join() to safely attach extra path elements.
54 1205 : segs.pop_if_empty().push("");
55 1205 : }
56 :
57 1205 : let mut client = reqwest::ClientBuilder::new();
58 :
59 1205 : if let Some(jwt) = &conf.control_plane_api_token {
60 22 : let mut headers = hyper::HeaderMap::new();
61 22 : headers.insert(
62 22 : "Authorization",
63 22 : format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
64 22 : );
65 22 : client = client.default_headers(headers);
66 1183 : }
67 :
68 1205 : Some(Self {
69 1205 : http_client: client.build().expect("Failed to construct HTTP client"),
70 1205 : base_url: url,
71 1205 : node_id: conf.id,
72 1205 : cancel: cancel.clone(),
73 1205 : })
74 1207 : }
75 :
76 1013 : async fn retry_http_forever<R, T>(
77 1013 : &self,
78 1013 : url: &url::Url,
79 1013 : request: R,
80 1013 : ) -> Result<T, RetryForeverError>
81 1013 : where
82 1013 : R: Serialize,
83 1013 : T: DeserializeOwned,
84 1013 : {
85 1013 : let res = backoff::retry(
86 1022 : || async {
87 1022 : let response = self
88 1022 : .http_client
89 1022 : .post(url.clone())
90 1022 : .json(&request)
91 1022 : .send()
92 2700 : .await?;
93 :
94 1012 : response.error_for_status_ref()?;
95 1012 : response.json::<T>().await
96 1022 : },
97 1013 : |_| false,
98 1013 : 3,
99 1013 : u32::MAX,
100 1013 : "calling control plane generation validation API",
101 1013 : &self.cancel,
102 1013 : )
103 2772 : .await
104 1013 : .ok_or(RetryForeverError::ShuttingDown)?
105 1012 : .expect("We retry forever, this should never be reached");
106 1012 :
107 1012 : Ok(res)
108 1013 : }
109 : }
110 :
111 : impl ControlPlaneGenerationsApi for ControlPlaneClient {
112 : /// Block until we get a successful response, or error out if we are shut down
113 602 : async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
114 602 : let re_attach_path = self
115 602 : .base_url
116 602 : .join("re-attach")
117 602 : .expect("Failed to build re-attach path");
118 602 : let request = ReAttachRequest {
119 602 : node_id: self.node_id,
120 602 : };
121 602 :
122 602 : fail::fail_point!("control-plane-client-re-attach");
123 :
124 1826 : let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
125 602 : tracing::info!(
126 602 : "Received re-attach response with {} tenants",
127 602 : response.tenants.len()
128 602 : );
129 :
130 602 : Ok(response
131 602 : .tenants
132 602 : .into_iter()
133 602 : .map(|t| (t.id, Generation::new(t.gen)))
134 602 : .collect::<HashMap<_, _>>())
135 602 : }
136 :
137 : /// Block until we get a successful response, or error out if we are shut down
138 411 : async fn validate(
139 411 : &self,
140 411 : tenants: Vec<(TenantShardId, Generation)>,
141 411 : ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
142 411 : let re_attach_path = self
143 411 : .base_url
144 411 : .join("validate")
145 411 : .expect("Failed to build validate path");
146 411 :
147 411 : let request = ValidateRequest {
148 411 : tenants: tenants
149 411 : .into_iter()
150 498 : .map(|(id, gen)| ValidateRequestTenant {
151 498 : id,
152 498 : gen: gen
153 498 : .into()
154 498 : .expect("Generation should always be valid for a Tenant doing deletions"),
155 498 : })
156 411 : .collect(),
157 411 : };
158 411 :
159 411 : fail::fail_point!("control-plane-client-validate");
160 :
161 946 : let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
162 :
163 410 : Ok(response
164 410 : .tenants
165 410 : .into_iter()
166 497 : .map(|rt| (rt.id, rt.valid))
167 410 : .collect())
168 411 : }
169 : }
|