Line data Source code
1 : use pageserver_api::{models::*, shard::TenantShardId};
2 : use reqwest::{IntoUrl, Method, StatusCode};
3 : use utils::{
4 : http::error::HttpErrorBody,
5 : id::{TenantId, TimelineId},
6 : };
7 :
8 : pub mod util;
9 :
10 : #[derive(Debug, Clone)]
11 : pub struct Client {
12 : mgmt_api_endpoint: String,
13 : authorization_header: Option<String>,
14 : client: reqwest::Client,
15 : }
16 :
17 0 : #[derive(thiserror::Error, Debug)]
18 : pub enum Error {
19 : #[error("receive body: {0}")]
20 : ReceiveBody(reqwest::Error),
21 :
22 : #[error("receive error body: {0}")]
23 : ReceiveErrorBody(String),
24 :
25 : #[error("pageserver API: {1}")]
26 : ApiError(StatusCode, String),
27 :
28 : #[error("Cancelled")]
29 : Cancelled,
30 : }
31 :
32 : pub type Result<T> = std::result::Result<T, Error>;
33 :
34 : pub trait ResponseErrorMessageExt: Sized {
35 : fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
36 : }
37 :
38 : impl ResponseErrorMessageExt for reqwest::Response {
39 0 : async fn error_from_body(self) -> Result<Self> {
40 0 : let status = self.status();
41 0 : if !(status.is_client_error() || status.is_server_error()) {
42 0 : return Ok(self);
43 0 : }
44 0 :
45 0 : let url = self.url().to_owned();
46 0 : Err(match self.json::<HttpErrorBody>().await {
47 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
48 : Err(_) => {
49 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), url))
50 : }
51 : })
52 0 : }
53 : }
54 :
55 : pub enum ForceAwaitLogicalSize {
56 : Yes,
57 : No,
58 : }
59 :
60 : impl Client {
61 0 : pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
62 0 : Self::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt)
63 0 : }
64 :
65 0 : pub fn from_client(
66 0 : client: reqwest::Client,
67 0 : mgmt_api_endpoint: String,
68 0 : jwt: Option<&str>,
69 0 : ) -> Self {
70 0 : Self {
71 0 : mgmt_api_endpoint,
72 0 : authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
73 0 : client,
74 0 : }
75 0 : }
76 :
77 0 : pub async fn list_tenants(&self) -> Result<Vec<pageserver_api::models::TenantInfo>> {
78 0 : let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
79 0 : let resp = self.get(&uri).await?;
80 0 : resp.json().await.map_err(Error::ReceiveBody)
81 0 : }
82 :
83 : /// Get an arbitrary path and returning a streaming Response. This function is suitable
84 : /// for pass-through/proxy use cases where we don't care what the response content looks
85 : /// like.
86 : ///
87 : /// Use/add one of the properly typed methods below if you know aren't proxying, and
88 : /// know what kind of response you expect.
89 0 : pub async fn get_raw(&self, path: String) -> Result<reqwest::Response> {
90 0 : debug_assert!(path.starts_with('/'));
91 0 : let uri = format!("{}{}", self.mgmt_api_endpoint, path);
92 0 :
93 0 : let req = self.client.request(Method::GET, uri);
94 0 : let req = if let Some(value) = &self.authorization_header {
95 0 : req.header(reqwest::header::AUTHORIZATION, value)
96 : } else {
97 0 : req
98 : };
99 0 : req.send().await.map_err(Error::ReceiveBody)
100 0 : }
101 :
102 0 : pub async fn tenant_details(
103 0 : &self,
104 0 : tenant_shard_id: TenantShardId,
105 0 : ) -> Result<pageserver_api::models::TenantDetails> {
106 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
107 0 : self.get(uri)
108 0 : .await?
109 0 : .json()
110 0 : .await
111 0 : .map_err(Error::ReceiveBody)
112 0 : }
113 :
114 0 : pub async fn list_timelines(
115 0 : &self,
116 0 : tenant_shard_id: TenantShardId,
117 0 : ) -> Result<Vec<pageserver_api::models::TimelineInfo>> {
118 0 : let uri = format!(
119 0 : "{}/v1/tenant/{tenant_shard_id}/timeline",
120 0 : self.mgmt_api_endpoint
121 0 : );
122 0 : self.get(&uri)
123 0 : .await?
124 0 : .json()
125 0 : .await
126 0 : .map_err(Error::ReceiveBody)
127 0 : }
128 :
129 0 : pub async fn timeline_info(
130 0 : &self,
131 0 : tenant_shard_id: TenantShardId,
132 0 : timeline_id: TimelineId,
133 0 : force_await_logical_size: ForceAwaitLogicalSize,
134 0 : ) -> Result<pageserver_api::models::TimelineInfo> {
135 0 : let uri = format!(
136 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
137 0 : self.mgmt_api_endpoint
138 0 : );
139 :
140 0 : let uri = match force_await_logical_size {
141 0 : ForceAwaitLogicalSize::Yes => format!("{}?force-await-logical-size={}", uri, true),
142 0 : ForceAwaitLogicalSize::No => uri,
143 : };
144 :
145 0 : self.get(&uri)
146 0 : .await?
147 0 : .json()
148 0 : .await
149 0 : .map_err(Error::ReceiveBody)
150 0 : }
151 :
152 0 : pub async fn keyspace(
153 0 : &self,
154 0 : tenant_shard_id: TenantShardId,
155 0 : timeline_id: TimelineId,
156 0 : ) -> Result<pageserver_api::models::partitioning::Partitioning> {
157 0 : let uri = format!(
158 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/keyspace",
159 0 : self.mgmt_api_endpoint
160 0 : );
161 0 : self.get(&uri)
162 0 : .await?
163 0 : .json()
164 0 : .await
165 0 : .map_err(Error::ReceiveBody)
166 0 : }
167 :
168 0 : async fn get<U: IntoUrl>(&self, uri: U) -> Result<reqwest::Response> {
169 0 : self.request(Method::GET, uri, ()).await
170 0 : }
171 :
172 0 : async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
173 0 : &self,
174 0 : method: Method,
175 0 : uri: U,
176 0 : body: B,
177 0 : ) -> Result<reqwest::Response> {
178 0 : let req = self.client.request(method, uri);
179 0 : let req = if let Some(value) = &self.authorization_header {
180 0 : req.header(reqwest::header::AUTHORIZATION, value)
181 : } else {
182 0 : req
183 : };
184 0 : req.json(&body).send().await.map_err(Error::ReceiveBody)
185 0 : }
186 :
187 0 : async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
188 0 : &self,
189 0 : method: Method,
190 0 : uri: U,
191 0 : body: B,
192 0 : ) -> Result<reqwest::Response> {
193 0 : let res = self.request_noerror(method, uri, body).await?;
194 0 : let response = res.error_from_body().await?;
195 0 : Ok(response)
196 0 : }
197 :
198 0 : pub async fn status(&self) -> Result<()> {
199 0 : let uri = format!("{}/v1/status", self.mgmt_api_endpoint);
200 0 : self.get(&uri).await?;
201 0 : Ok(())
202 0 : }
203 :
204 0 : pub async fn tenant_create(&self, req: &TenantCreateRequest) -> Result<TenantId> {
205 0 : let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
206 0 : self.request(Method::POST, &uri, req)
207 0 : .await?
208 0 : .json()
209 0 : .await
210 0 : .map_err(Error::ReceiveBody)
211 0 : }
212 :
213 : /// The tenant deletion API can return 202 if deletion is incomplete, or
214 : /// 404 if it is complete. Callers are responsible for checking the status
215 : /// code and retrying. Error codes other than 404 will return Err().
216 0 : pub async fn tenant_delete(&self, tenant_shard_id: TenantShardId) -> Result<StatusCode> {
217 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
218 0 :
219 0 : match self.request(Method::DELETE, &uri, ()).await {
220 0 : Err(Error::ApiError(status_code, msg)) => {
221 0 : if status_code == StatusCode::NOT_FOUND {
222 0 : Ok(StatusCode::NOT_FOUND)
223 : } else {
224 0 : Err(Error::ApiError(status_code, msg))
225 : }
226 : }
227 0 : Err(e) => Err(e),
228 0 : Ok(response) => Ok(response.status()),
229 : }
230 0 : }
231 :
232 0 : pub async fn tenant_time_travel_remote_storage(
233 0 : &self,
234 0 : tenant_shard_id: TenantShardId,
235 0 : timestamp: &str,
236 0 : done_if_after: &str,
237 0 : ) -> Result<()> {
238 0 : let uri = format!(
239 0 : "{}/v1/tenant/{tenant_shard_id}/time_travel_remote_storage?travel_to={timestamp}&done_if_after={done_if_after}",
240 0 : self.mgmt_api_endpoint
241 0 : );
242 0 : self.request(Method::PUT, &uri, ()).await?;
243 0 : Ok(())
244 0 : }
245 :
246 0 : pub async fn tenant_scan_remote_storage(
247 0 : &self,
248 0 : tenant_id: TenantId,
249 0 : ) -> Result<TenantScanRemoteStorageResponse> {
250 0 : let uri = format!(
251 0 : "{}/v1/tenant/{tenant_id}/scan_remote_storage",
252 0 : self.mgmt_api_endpoint
253 0 : );
254 0 : let response = self.request(Method::GET, &uri, ()).await?;
255 0 : let body = response.json().await.map_err(Error::ReceiveBody)?;
256 0 : Ok(body)
257 0 : }
258 :
259 0 : pub async fn tenant_config(&self, req: &TenantConfigRequest) -> Result<()> {
260 0 : let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
261 0 : self.request(Method::PUT, &uri, req).await?;
262 0 : Ok(())
263 0 : }
264 :
265 0 : pub async fn tenant_secondary_download(
266 0 : &self,
267 0 : tenant_id: TenantShardId,
268 0 : wait: Option<std::time::Duration>,
269 0 : ) -> Result<(StatusCode, SecondaryProgress)> {
270 0 : let mut path = reqwest::Url::parse(&format!(
271 0 : "{}/v1/tenant/{}/secondary/download",
272 0 : self.mgmt_api_endpoint, tenant_id
273 0 : ))
274 0 : .expect("Cannot build URL");
275 :
276 0 : if let Some(wait) = wait {
277 0 : path.query_pairs_mut()
278 0 : .append_pair("wait_ms", &format!("{}", wait.as_millis()));
279 0 : }
280 :
281 0 : let response = self.request(Method::POST, path, ()).await?;
282 0 : let status = response.status();
283 0 : let progress: SecondaryProgress = response.json().await.map_err(Error::ReceiveBody)?;
284 0 : Ok((status, progress))
285 0 : }
286 :
287 0 : pub async fn tenant_secondary_status(
288 0 : &self,
289 0 : tenant_shard_id: TenantShardId,
290 0 : ) -> Result<SecondaryProgress> {
291 0 : let path = reqwest::Url::parse(&format!(
292 0 : "{}/v1/tenant/{}/secondary/status",
293 0 : self.mgmt_api_endpoint, tenant_shard_id
294 0 : ))
295 0 : .expect("Cannot build URL");
296 0 :
297 0 : self.request(Method::GET, path, ())
298 0 : .await?
299 0 : .json()
300 0 : .await
301 0 : .map_err(Error::ReceiveBody)
302 0 : }
303 :
304 0 : pub async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
305 0 : let path = reqwest::Url::parse(&format!(
306 0 : "{}/v1/tenant/{}/heatmap_upload",
307 0 : self.mgmt_api_endpoint, tenant_id
308 0 : ))
309 0 : .expect("Cannot build URL");
310 0 :
311 0 : self.request(Method::POST, path, ()).await?;
312 0 : Ok(())
313 0 : }
314 :
315 0 : pub async fn location_config(
316 0 : &self,
317 0 : tenant_shard_id: TenantShardId,
318 0 : config: LocationConfig,
319 0 : flush_ms: Option<std::time::Duration>,
320 0 : lazy: bool,
321 0 : ) -> Result<()> {
322 0 : let req_body = TenantLocationConfigRequest { config };
323 0 :
324 0 : let mut path = reqwest::Url::parse(&format!(
325 0 : "{}/v1/tenant/{}/location_config",
326 0 : self.mgmt_api_endpoint, tenant_shard_id
327 0 : ))
328 0 : // Should always work: mgmt_api_endpoint is configuration, not user input.
329 0 : .expect("Cannot build URL");
330 0 :
331 0 : if lazy {
332 0 : path.query_pairs_mut().append_pair("lazy", "true");
333 0 : }
334 :
335 0 : if let Some(flush_ms) = flush_ms {
336 0 : path.query_pairs_mut()
337 0 : .append_pair("flush_ms", &format!("{}", flush_ms.as_millis()));
338 0 : }
339 :
340 0 : self.request(Method::PUT, path, &req_body).await?;
341 0 : Ok(())
342 0 : }
343 :
344 0 : pub async fn list_location_config(&self) -> Result<LocationConfigListResponse> {
345 0 : let path = format!("{}/v1/location_config", self.mgmt_api_endpoint);
346 0 : self.request(Method::GET, &path, ())
347 0 : .await?
348 0 : .json()
349 0 : .await
350 0 : .map_err(Error::ReceiveBody)
351 0 : }
352 :
353 0 : pub async fn get_location_config(
354 0 : &self,
355 0 : tenant_shard_id: TenantShardId,
356 0 : ) -> Result<Option<LocationConfig>> {
357 0 : let path = format!(
358 0 : "{}/v1/location_config/{tenant_shard_id}",
359 0 : self.mgmt_api_endpoint
360 0 : );
361 0 : self.request(Method::GET, &path, ())
362 0 : .await?
363 0 : .json()
364 0 : .await
365 0 : .map_err(Error::ReceiveBody)
366 0 : }
367 :
368 0 : pub async fn timeline_create(
369 0 : &self,
370 0 : tenant_shard_id: TenantShardId,
371 0 : req: &TimelineCreateRequest,
372 0 : ) -> Result<TimelineInfo> {
373 0 : let uri = format!(
374 0 : "{}/v1/tenant/{}/timeline",
375 0 : self.mgmt_api_endpoint, tenant_shard_id
376 0 : );
377 0 : self.request(Method::POST, &uri, req)
378 0 : .await?
379 0 : .json()
380 0 : .await
381 0 : .map_err(Error::ReceiveBody)
382 0 : }
383 :
384 : /// The timeline deletion API can return 201 if deletion is incomplete, or
385 : /// 403 if it is complete. Callers are responsible for checking the status
386 : /// code and retrying. Error codes other than 403 will return Err().
387 0 : pub async fn timeline_delete(
388 0 : &self,
389 0 : tenant_shard_id: TenantShardId,
390 0 : timeline_id: TimelineId,
391 0 : ) -> Result<StatusCode> {
392 0 : let uri = format!(
393 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
394 0 : self.mgmt_api_endpoint
395 0 : );
396 0 :
397 0 : match self.request(Method::DELETE, &uri, ()).await {
398 0 : Err(Error::ApiError(status_code, msg)) => {
399 0 : if status_code == StatusCode::NOT_FOUND {
400 0 : Ok(StatusCode::NOT_FOUND)
401 : } else {
402 0 : Err(Error::ApiError(status_code, msg))
403 : }
404 : }
405 0 : Err(e) => Err(e),
406 0 : Ok(response) => Ok(response.status()),
407 : }
408 0 : }
409 :
410 0 : pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
411 0 : let uri = format!(
412 0 : "{}/v1/tenant/{}/reset",
413 0 : self.mgmt_api_endpoint, tenant_shard_id
414 0 : );
415 0 : self.request(Method::POST, &uri, ())
416 0 : .await?
417 0 : .json()
418 0 : .await
419 0 : .map_err(Error::ReceiveBody)
420 0 : }
421 :
422 0 : pub async fn tenant_shard_split(
423 0 : &self,
424 0 : tenant_shard_id: TenantShardId,
425 0 : req: TenantShardSplitRequest,
426 0 : ) -> Result<TenantShardSplitResponse> {
427 0 : let uri = format!(
428 0 : "{}/v1/tenant/{}/shard_split",
429 0 : self.mgmt_api_endpoint, tenant_shard_id
430 0 : );
431 0 : self.request(Method::PUT, &uri, req)
432 0 : .await?
433 0 : .json()
434 0 : .await
435 0 : .map_err(Error::ReceiveBody)
436 0 : }
437 :
438 0 : pub async fn timeline_list(
439 0 : &self,
440 0 : tenant_shard_id: &TenantShardId,
441 0 : ) -> Result<Vec<TimelineInfo>> {
442 0 : let uri = format!(
443 0 : "{}/v1/tenant/{}/timeline",
444 0 : self.mgmt_api_endpoint, tenant_shard_id
445 0 : );
446 0 : self.get(&uri)
447 0 : .await?
448 0 : .json()
449 0 : .await
450 0 : .map_err(Error::ReceiveBody)
451 0 : }
452 :
453 0 : pub async fn tenant_synthetic_size(
454 0 : &self,
455 0 : tenant_shard_id: TenantShardId,
456 0 : ) -> Result<TenantHistorySize> {
457 0 : let uri = format!(
458 0 : "{}/v1/tenant/{}/synthetic_size",
459 0 : self.mgmt_api_endpoint, tenant_shard_id
460 0 : );
461 0 : self.get(&uri)
462 0 : .await?
463 0 : .json()
464 0 : .await
465 0 : .map_err(Error::ReceiveBody)
466 0 : }
467 :
468 0 : pub async fn put_io_engine(
469 0 : &self,
470 0 : engine: &pageserver_api::models::virtual_file::IoEngineKind,
471 0 : ) -> Result<()> {
472 0 : let uri = format!("{}/v1/io_engine", self.mgmt_api_endpoint);
473 0 : self.request(Method::PUT, uri, engine)
474 0 : .await?
475 0 : .json()
476 0 : .await
477 0 : .map_err(Error::ReceiveBody)
478 0 : }
479 :
480 0 : pub async fn get_utilization(&self) -> Result<PageserverUtilization> {
481 0 : let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint);
482 0 : self.get(uri)
483 0 : .await?
484 0 : .json()
485 0 : .await
486 0 : .map_err(Error::ReceiveBody)
487 0 : }
488 :
489 0 : pub async fn layer_map_info(
490 0 : &self,
491 0 : tenant_shard_id: TenantShardId,
492 0 : timeline_id: TimelineId,
493 0 : ) -> Result<LayerMapInfo> {
494 0 : let uri = format!(
495 0 : "{}/v1/tenant/{}/timeline/{}/layer",
496 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id,
497 0 : );
498 0 : self.get(&uri)
499 0 : .await?
500 0 : .json()
501 0 : .await
502 0 : .map_err(Error::ReceiveBody)
503 0 : }
504 :
505 0 : pub async fn layer_evict(
506 0 : &self,
507 0 : tenant_shard_id: TenantShardId,
508 0 : timeline_id: TimelineId,
509 0 : layer_file_name: &str,
510 0 : ) -> Result<bool> {
511 0 : let uri = format!(
512 0 : "{}/v1/tenant/{}/timeline/{}/layer/{}",
513 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
514 0 : );
515 0 : let resp = self.request_noerror(Method::DELETE, &uri, ()).await?;
516 0 : match resp.status() {
517 0 : StatusCode::OK => Ok(true),
518 0 : StatusCode::NOT_MODIFIED => Ok(false),
519 : // TODO: dedupe this pattern / introduce separate error variant?
520 0 : status => Err(match resp.json::<HttpErrorBody>().await {
521 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
522 : Err(_) => {
523 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
524 : }
525 : }),
526 : }
527 0 : }
528 :
529 0 : pub async fn layer_ondemand_download(
530 0 : &self,
531 0 : tenant_shard_id: TenantShardId,
532 0 : timeline_id: TimelineId,
533 0 : layer_file_name: &str,
534 0 : ) -> Result<bool> {
535 0 : let uri = format!(
536 0 : "{}/v1/tenant/{}/timeline/{}/layer/{}",
537 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
538 0 : );
539 0 : let resp = self.request_noerror(Method::GET, &uri, ()).await?;
540 0 : match resp.status() {
541 0 : StatusCode::OK => Ok(true),
542 0 : StatusCode::NOT_MODIFIED => Ok(false),
543 : // TODO: dedupe this pattern / introduce separate error variant?
544 0 : status => Err(match resp.json::<HttpErrorBody>().await {
545 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
546 : Err(_) => {
547 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
548 : }
549 : }),
550 : }
551 0 : }
552 : }
|