Line data Source code
1 : use pageserver_api::{models::*, shard::TenantShardId};
2 : use reqwest::{IntoUrl, Method, StatusCode};
3 : use utils::{
4 : http::error::HttpErrorBody,
5 : id::{TenantId, TimelineId},
6 : };
7 :
8 : pub mod util;
9 :
10 0 : #[derive(Debug)]
11 : pub struct Client {
12 : mgmt_api_endpoint: String,
13 : authorization_header: Option<String>,
14 : client: reqwest::Client,
15 : }
16 :
17 14 : #[derive(thiserror::Error, Debug)]
18 : pub enum Error {
19 : #[error("receive body: {0}")]
20 : ReceiveBody(reqwest::Error),
21 :
22 : #[error("receive error body: {0}")]
23 : ReceiveErrorBody(String),
24 :
25 : #[error("pageserver API: {1}")]
26 : ApiError(StatusCode, String),
27 : }
28 :
29 : pub type Result<T> = std::result::Result<T, Error>;
30 :
31 : pub trait ResponseErrorMessageExt: Sized {
32 : fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
33 : }
34 :
35 : impl ResponseErrorMessageExt for reqwest::Response {
36 4778 : async fn error_from_body(self) -> Result<Self> {
37 4778 : let status = self.status();
38 4778 : if !(status.is_client_error() || status.is_server_error()) {
39 4754 : return Ok(self);
40 24 : }
41 24 :
42 24 : let url = self.url().to_owned();
43 24 : Err(match self.json::<HttpErrorBody>().await {
44 24 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
45 : Err(_) => {
46 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), url))
47 : }
48 : })
49 4778 : }
50 : }
51 :
52 : pub enum ForceAwaitLogicalSize {
53 : Yes,
54 : No,
55 : }
56 :
57 : impl Client {
58 4022 : pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
59 4022 : Self {
60 4022 : mgmt_api_endpoint,
61 4022 : authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
62 4022 : client: reqwest::Client::new(),
63 4022 : }
64 4022 : }
65 :
66 6 : pub async fn list_tenants(&self) -> Result<Vec<pageserver_api::models::TenantInfo>> {
67 6 : let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
68 24 : let resp = self.get(&uri).await?;
69 6 : resp.json().await.map_err(Error::ReceiveBody)
70 6 : }
71 :
72 : /// Get an arbitrary path and returning a streaming Response. This function is suitable
73 : /// for pass-through/proxy use cases where we don't care what the response content looks
74 : /// like.
75 : ///
76 : /// Use/add one of the properly typed methods below if you know aren't proxying, and
77 : /// know what kind of response you expect.
78 5 : pub async fn get_raw(&self, path: String) -> Result<reqwest::Response> {
79 5 : debug_assert!(path.starts_with('/'));
80 5 : let uri = format!("{}{}", self.mgmt_api_endpoint, path);
81 5 :
82 5 : let req = self.client.request(Method::GET, uri);
83 5 : let req = if let Some(value) = &self.authorization_header {
84 0 : req.header(reqwest::header::AUTHORIZATION, value)
85 : } else {
86 5 : req
87 : };
88 19 : req.send().await.map_err(Error::ReceiveBody)
89 5 : }
90 :
91 0 : pub async fn tenant_details(
92 0 : &self,
93 0 : tenant_shard_id: TenantShardId,
94 0 : ) -> Result<pageserver_api::models::TenantDetails> {
95 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
96 0 : self.get(uri)
97 0 : .await?
98 0 : .json()
99 0 : .await
100 0 : .map_err(Error::ReceiveBody)
101 0 : }
102 :
103 19 : pub async fn list_timelines(
104 19 : &self,
105 19 : tenant_shard_id: TenantShardId,
106 19 : ) -> Result<Vec<pageserver_api::models::TimelineInfo>> {
107 19 : let uri = format!(
108 19 : "{}/v1/tenant/{tenant_shard_id}/timeline",
109 19 : self.mgmt_api_endpoint
110 19 : );
111 19 : self.get(&uri)
112 76 : .await?
113 19 : .json()
114 0 : .await
115 19 : .map_err(Error::ReceiveBody)
116 19 : }
117 :
118 0 : pub async fn timeline_info(
119 0 : &self,
120 0 : tenant_id: TenantId,
121 0 : timeline_id: TimelineId,
122 0 : force_await_logical_size: ForceAwaitLogicalSize,
123 0 : ) -> Result<pageserver_api::models::TimelineInfo> {
124 0 : let uri = format!(
125 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}",
126 0 : self.mgmt_api_endpoint
127 0 : );
128 :
129 0 : let uri = match force_await_logical_size {
130 0 : ForceAwaitLogicalSize::Yes => format!("{}?force-await-logical-size={}", uri, true),
131 0 : ForceAwaitLogicalSize::No => uri,
132 : };
133 :
134 0 : self.get(&uri)
135 0 : .await?
136 0 : .json()
137 0 : .await
138 0 : .map_err(Error::ReceiveBody)
139 0 : }
140 :
141 0 : pub async fn keyspace(
142 0 : &self,
143 0 : tenant_id: TenantId,
144 0 : timeline_id: TimelineId,
145 0 : ) -> Result<pageserver_api::models::partitioning::Partitioning> {
146 0 : let uri = format!(
147 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/keyspace",
148 0 : self.mgmt_api_endpoint
149 0 : );
150 0 : self.get(&uri)
151 0 : .await?
152 0 : .json()
153 0 : .await
154 0 : .map_err(Error::ReceiveBody)
155 0 : }
156 :
157 1258 : async fn get<U: IntoUrl>(&self, uri: U) -> Result<reqwest::Response> {
158 3775 : self.request(Method::GET, uri, ()).await
159 1258 : }
160 :
161 2629 : async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
162 2629 : &self,
163 2629 : method: Method,
164 2629 : uri: U,
165 2629 : body: B,
166 2629 : ) -> Result<reqwest::Response> {
167 2629 : let req = self.client.request(method, uri);
168 2629 : let req = if let Some(value) = &self.authorization_header {
169 56 : req.header(reqwest::header::AUTHORIZATION, value)
170 : } else {
171 2573 : req
172 : };
173 9028 : let res = req.json(&body).send().await.map_err(Error::ReceiveBody)?;
174 1996 : let response = res.error_from_body().await?;
175 1977 : Ok(response)
176 2629 : }
177 :
178 1233 : pub async fn status(&self) -> Result<()> {
179 1233 : let uri = format!("{}/v1/status", self.mgmt_api_endpoint);
180 3675 : self.get(&uri).await?;
181 605 : Ok(())
182 1233 : }
183 :
184 0 : pub async fn tenant_create(&self, req: &TenantCreateRequest) -> Result<TenantId> {
185 0 : let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
186 0 : self.request(Method::POST, &uri, req)
187 0 : .await?
188 0 : .json()
189 0 : .await
190 0 : .map_err(Error::ReceiveBody)
191 0 : }
192 :
193 : /// The tenant deletion API can return 202 if deletion is incomplete, or
194 : /// 404 if it is complete. Callers are responsible for checking the status
195 : /// code and retrying. Error codes other than 404 will return Err().
196 24 : pub async fn tenant_delete(&self, tenant_shard_id: TenantShardId) -> Result<StatusCode> {
197 24 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
198 24 :
199 94 : match self.request(Method::DELETE, &uri, ()).await {
200 12 : Err(Error::ApiError(status_code, msg)) => {
201 12 : if status_code == StatusCode::NOT_FOUND {
202 12 : Ok(StatusCode::NOT_FOUND)
203 : } else {
204 0 : Err(Error::ApiError(status_code, msg))
205 : }
206 : }
207 0 : Err(e) => Err(e),
208 12 : Ok(response) => Ok(response.status()),
209 : }
210 24 : }
211 :
212 14 : pub async fn tenant_config(&self, req: &TenantConfigRequest) -> Result<()> {
213 14 : let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
214 56 : self.request(Method::PUT, &uri, req).await?;
215 14 : Ok(())
216 14 : }
217 :
218 0 : pub async fn tenant_secondary_download(&self, tenant_id: TenantShardId) -> Result<()> {
219 0 : let uri = format!(
220 0 : "{}/v1/tenant/{}/secondary/download",
221 0 : self.mgmt_api_endpoint, tenant_id
222 0 : );
223 0 : self.request(Method::POST, &uri, ()).await?;
224 0 : Ok(())
225 0 : }
226 :
227 496 : pub async fn location_config(
228 496 : &self,
229 496 : tenant_shard_id: TenantShardId,
230 496 : config: LocationConfig,
231 496 : flush_ms: Option<std::time::Duration>,
232 496 : ) -> Result<()> {
233 496 : let req_body = TenantLocationConfigRequest {
234 496 : tenant_id: tenant_shard_id,
235 496 : config,
236 496 : };
237 496 : let path = format!(
238 496 : "{}/v1/tenant/{}/location_config",
239 496 : self.mgmt_api_endpoint, tenant_shard_id
240 496 : );
241 496 : let path = if let Some(flush_ms) = flush_ms {
242 0 : format!("{}?flush_ms={}", path, flush_ms.as_millis())
243 : } else {
244 496 : path
245 : };
246 1860 : self.request(Method::PUT, &path, &req_body).await?;
247 493 : Ok(())
248 496 : }
249 :
250 8 : pub async fn list_location_config(&self) -> Result<LocationConfigListResponse> {
251 8 : let path = format!("{}/v1/location_config", self.mgmt_api_endpoint);
252 8 : self.request(Method::GET, &path, ())
253 23 : .await?
254 5 : .json()
255 0 : .await
256 5 : .map_err(Error::ReceiveBody)
257 8 : }
258 :
259 825 : pub async fn timeline_create(
260 825 : &self,
261 825 : tenant_shard_id: TenantShardId,
262 825 : req: &TimelineCreateRequest,
263 825 : ) -> Result<TimelineInfo> {
264 825 : let uri = format!(
265 825 : "{}/v1/tenant/{}/timeline",
266 825 : self.mgmt_api_endpoint, tenant_shard_id
267 825 : );
268 825 : self.request(Method::POST, &uri, req)
269 3204 : .await?
270 821 : .json()
271 0 : .await
272 821 : .map_err(Error::ReceiveBody)
273 825 : }
274 :
275 : /// The timeline deletion API can return 201 if deletion is incomplete, or
276 : /// 403 if it is complete. Callers are responsible for checking the status
277 : /// code and retrying. Error codes other than 403 will return Err().
278 4 : pub async fn timeline_delete(
279 4 : &self,
280 4 : tenant_shard_id: TenantShardId,
281 4 : timeline_id: TimelineId,
282 4 : ) -> Result<StatusCode> {
283 4 : let uri = format!(
284 4 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
285 4 : self.mgmt_api_endpoint
286 4 : );
287 4 :
288 16 : match self.request(Method::DELETE, &uri, ()).await {
289 2 : Err(Error::ApiError(status_code, msg)) => {
290 2 : if status_code == StatusCode::NOT_FOUND {
291 2 : Ok(StatusCode::NOT_FOUND)
292 : } else {
293 0 : Err(Error::ApiError(status_code, msg))
294 : }
295 : }
296 0 : Err(e) => Err(e),
297 2 : Ok(response) => Ok(response.status()),
298 : }
299 4 : }
300 :
301 0 : pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
302 0 : let uri = format!(
303 0 : "{}/v1/tenant/{}/reset",
304 0 : self.mgmt_api_endpoint, tenant_shard_id
305 0 : );
306 0 : self.request(Method::POST, &uri, ())
307 0 : .await?
308 0 : .json()
309 0 : .await
310 0 : .map_err(Error::ReceiveBody)
311 0 : }
312 :
313 0 : pub async fn timeline_list(
314 0 : &self,
315 0 : tenant_shard_id: &TenantShardId,
316 0 : ) -> Result<Vec<TimelineInfo>> {
317 0 : let uri = format!(
318 0 : "{}/v1/tenant/{}/timeline",
319 0 : self.mgmt_api_endpoint, tenant_shard_id
320 0 : );
321 0 : self.get(&uri)
322 0 : .await?
323 0 : .json()
324 0 : .await
325 0 : .map_err(Error::ReceiveBody)
326 0 : }
327 :
328 0 : pub async fn tenant_synthetic_size(
329 0 : &self,
330 0 : tenant_shard_id: TenantShardId,
331 0 : ) -> Result<TenantHistorySize> {
332 0 : let uri = format!(
333 0 : "{}/v1/tenant/{}/synthetic_size",
334 0 : self.mgmt_api_endpoint, tenant_shard_id
335 0 : );
336 0 : self.get(&uri)
337 0 : .await?
338 0 : .json()
339 0 : .await
340 0 : .map_err(Error::ReceiveBody)
341 0 : }
342 : }
|