Line data Source code
1 : use std::collections::HashMap;
2 : use std::error::Error as _;
3 :
4 : use bytes::Bytes;
5 : use detach_ancestor::AncestorDetached;
6 : use http_utils::error::HttpErrorBody;
7 : use pageserver_api::models::*;
8 : use pageserver_api::shard::TenantShardId;
9 : pub use reqwest::Body as ReqwestBody;
10 : use reqwest::{Certificate, IntoUrl, Method, StatusCode, Url};
11 : use utils::id::{TenantId, TimelineId};
12 : use utils::lsn::Lsn;
13 :
14 : use crate::BlockUnblock;
15 :
16 : pub mod util;
17 :
18 : #[derive(Debug, Clone)]
19 : pub struct Client {
20 : mgmt_api_endpoint: String,
21 : authorization_header: Option<String>,
22 : client: reqwest::Client,
23 : }
24 :
25 : #[derive(thiserror::Error, Debug)]
26 : pub enum Error {
27 0 : #[error("send request: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
28 : SendRequest(reqwest::Error),
29 :
30 0 : #[error("receive body: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
31 : ReceiveBody(reqwest::Error),
32 :
33 : #[error("receive error body: {0}")]
34 : ReceiveErrorBody(String),
35 :
36 : #[error("pageserver API: {1}")]
37 : ApiError(StatusCode, String),
38 :
39 : #[error("Cancelled")]
40 : Cancelled,
41 :
42 0 : #[error("create client: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
43 : CreateClient(reqwest::Error),
44 : }
45 :
46 : pub type Result<T> = std::result::Result<T, Error>;
47 :
48 : pub trait ResponseErrorMessageExt: Sized {
49 : fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
50 : }
51 :
52 : impl ResponseErrorMessageExt for reqwest::Response {
53 0 : async fn error_from_body(self) -> Result<Self> {
54 0 : let status = self.status();
55 0 : if !(status.is_client_error() || status.is_server_error()) {
56 0 : return Ok(self);
57 0 : }
58 0 :
59 0 : let url = self.url().to_owned();
60 0 : Err(match self.json::<HttpErrorBody>().await {
61 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
62 : Err(_) => {
63 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), url))
64 : }
65 : })
66 0 : }
67 : }
68 :
69 : pub enum ForceAwaitLogicalSize {
70 : Yes,
71 : No,
72 : }
73 :
74 : impl Client {
75 0 : pub fn new(
76 0 : mgmt_api_endpoint: String,
77 0 : jwt: Option<&str>,
78 0 : ssl_ca_cert: Option<Certificate>,
79 0 : ) -> Result<Self> {
80 0 : let mut http_client = reqwest::Client::builder();
81 0 : if let Some(ssl_ca_cert) = ssl_ca_cert {
82 0 : http_client = http_client.add_root_certificate(ssl_ca_cert);
83 0 : }
84 0 : let http_client = http_client.build().map_err(Error::CreateClient)?;
85 0 : Ok(Self::from_client(http_client, mgmt_api_endpoint, jwt))
86 0 : }
87 :
88 0 : pub fn from_client(
89 0 : client: reqwest::Client,
90 0 : mgmt_api_endpoint: String,
91 0 : jwt: Option<&str>,
92 0 : ) -> Self {
93 0 : Self {
94 0 : mgmt_api_endpoint,
95 0 : authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
96 0 : client,
97 0 : }
98 0 : }
99 :
100 0 : pub async fn list_tenants(&self) -> Result<Vec<pageserver_api::models::TenantInfo>> {
101 0 : let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
102 0 : let resp = self.get(&uri).await?;
103 0 : resp.json().await.map_err(Error::ReceiveBody)
104 0 : }
105 :
106 : /// Get an arbitrary path and returning a streaming Response. This function is suitable
107 : /// for pass-through/proxy use cases where we don't care what the response content looks
108 : /// like.
109 : ///
110 : /// Use/add one of the properly typed methods below if you know aren't proxying, and
111 : /// know what kind of response you expect.
112 0 : pub async fn get_raw(&self, path: String) -> Result<reqwest::Response> {
113 0 : debug_assert!(path.starts_with('/'));
114 0 : let uri = format!("{}{}", self.mgmt_api_endpoint, path);
115 0 :
116 0 : let mut req = self.client.request(Method::GET, uri);
117 0 : if let Some(value) = &self.authorization_header {
118 0 : req = req.header(reqwest::header::AUTHORIZATION, value);
119 0 : }
120 0 : req.send().await.map_err(Error::ReceiveBody)
121 0 : }
122 :
123 0 : pub async fn tenant_details(
124 0 : &self,
125 0 : tenant_shard_id: TenantShardId,
126 0 : ) -> Result<pageserver_api::models::TenantDetails> {
127 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
128 0 : self.get(uri)
129 0 : .await?
130 0 : .json()
131 0 : .await
132 0 : .map_err(Error::ReceiveBody)
133 0 : }
134 :
135 0 : pub async fn list_timelines(
136 0 : &self,
137 0 : tenant_shard_id: TenantShardId,
138 0 : ) -> Result<Vec<pageserver_api::models::TimelineInfo>> {
139 0 : let uri = format!(
140 0 : "{}/v1/tenant/{tenant_shard_id}/timeline",
141 0 : self.mgmt_api_endpoint
142 0 : );
143 0 : self.get(&uri)
144 0 : .await?
145 0 : .json()
146 0 : .await
147 0 : .map_err(Error::ReceiveBody)
148 0 : }
149 :
150 0 : pub async fn timeline_info(
151 0 : &self,
152 0 : tenant_shard_id: TenantShardId,
153 0 : timeline_id: TimelineId,
154 0 : force_await_logical_size: ForceAwaitLogicalSize,
155 0 : ) -> Result<pageserver_api::models::TimelineInfo> {
156 0 : let uri = format!(
157 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
158 0 : self.mgmt_api_endpoint
159 0 : );
160 :
161 0 : let uri = match force_await_logical_size {
162 0 : ForceAwaitLogicalSize::Yes => format!("{}?force-await-logical-size={}", uri, true),
163 0 : ForceAwaitLogicalSize::No => uri,
164 : };
165 :
166 0 : self.get(&uri)
167 0 : .await?
168 0 : .json()
169 0 : .await
170 0 : .map_err(Error::ReceiveBody)
171 0 : }
172 :
173 0 : pub async fn keyspace(
174 0 : &self,
175 0 : tenant_shard_id: TenantShardId,
176 0 : timeline_id: TimelineId,
177 0 : ) -> Result<pageserver_api::models::partitioning::Partitioning> {
178 0 : let uri = format!(
179 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/keyspace",
180 0 : self.mgmt_api_endpoint
181 0 : );
182 0 : self.get(&uri)
183 0 : .await?
184 0 : .json()
185 0 : .await
186 0 : .map_err(Error::ReceiveBody)
187 0 : }
188 :
189 0 : async fn get<U: IntoUrl>(&self, uri: U) -> Result<reqwest::Response> {
190 0 : self.request(Method::GET, uri, ()).await
191 0 : }
192 :
193 0 : fn start_request<U: reqwest::IntoUrl>(
194 0 : &self,
195 0 : method: Method,
196 0 : uri: U,
197 0 : ) -> reqwest::RequestBuilder {
198 0 : let req = self.client.request(method, uri);
199 0 : if let Some(value) = &self.authorization_header {
200 0 : req.header(reqwest::header::AUTHORIZATION, value)
201 : } else {
202 0 : req
203 : }
204 0 : }
205 :
206 0 : async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
207 0 : &self,
208 0 : method: Method,
209 0 : uri: U,
210 0 : body: B,
211 0 : ) -> Result<reqwest::Response> {
212 0 : self.start_request(method, uri)
213 0 : .json(&body)
214 0 : .send()
215 0 : .await
216 0 : .map_err(Error::ReceiveBody)
217 0 : }
218 :
219 0 : async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
220 0 : &self,
221 0 : method: Method,
222 0 : uri: U,
223 0 : body: B,
224 0 : ) -> Result<reqwest::Response> {
225 0 : let res = self.request_noerror(method, uri, body).await?;
226 0 : let response = res.error_from_body().await?;
227 0 : Ok(response)
228 0 : }
229 :
230 0 : pub async fn status(&self) -> Result<()> {
231 0 : let uri = format!("{}/v1/status", self.mgmt_api_endpoint);
232 0 : self.get(&uri).await?;
233 0 : Ok(())
234 0 : }
235 :
236 : /// The tenant deletion API can return 202 if deletion is incomplete, or
237 : /// 404 if it is complete. Callers are responsible for checking the status
238 : /// code and retrying. Error codes other than 404 will return Err().
239 0 : pub async fn tenant_delete(&self, tenant_shard_id: TenantShardId) -> Result<StatusCode> {
240 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
241 0 :
242 0 : match self.request(Method::DELETE, &uri, ()).await {
243 0 : Err(Error::ApiError(status_code, msg)) => {
244 0 : if status_code == StatusCode::NOT_FOUND {
245 0 : Ok(StatusCode::NOT_FOUND)
246 : } else {
247 0 : Err(Error::ApiError(status_code, msg))
248 : }
249 : }
250 0 : Err(e) => Err(e),
251 0 : Ok(response) => Ok(response.status()),
252 : }
253 0 : }
254 :
255 0 : pub async fn tenant_time_travel_remote_storage(
256 0 : &self,
257 0 : tenant_shard_id: TenantShardId,
258 0 : timestamp: &str,
259 0 : done_if_after: &str,
260 0 : ) -> Result<()> {
261 0 : let uri = format!(
262 0 : "{}/v1/tenant/{tenant_shard_id}/time_travel_remote_storage?travel_to={timestamp}&done_if_after={done_if_after}",
263 0 : self.mgmt_api_endpoint
264 0 : );
265 0 : self.request(Method::PUT, &uri, ()).await?;
266 0 : Ok(())
267 0 : }
268 :
269 0 : pub async fn tenant_scan_remote_storage(
270 0 : &self,
271 0 : tenant_id: TenantId,
272 0 : ) -> Result<TenantScanRemoteStorageResponse> {
273 0 : let uri = format!(
274 0 : "{}/v1/tenant/{tenant_id}/scan_remote_storage",
275 0 : self.mgmt_api_endpoint
276 0 : );
277 0 : let response = self.request(Method::GET, &uri, ()).await?;
278 0 : let body = response.json().await.map_err(Error::ReceiveBody)?;
279 0 : Ok(body)
280 0 : }
281 :
282 0 : pub async fn set_tenant_config(&self, req: &TenantConfigRequest) -> Result<()> {
283 0 : let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
284 0 : self.request(Method::PUT, &uri, req).await?;
285 0 : Ok(())
286 0 : }
287 :
288 0 : pub async fn patch_tenant_config(&self, req: &TenantConfigPatchRequest) -> Result<()> {
289 0 : let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
290 0 : self.request(Method::PATCH, &uri, req).await?;
291 0 : Ok(())
292 0 : }
293 :
294 0 : pub async fn tenant_secondary_download(
295 0 : &self,
296 0 : tenant_id: TenantShardId,
297 0 : wait: Option<std::time::Duration>,
298 0 : ) -> Result<(StatusCode, SecondaryProgress)> {
299 0 : let mut path = reqwest::Url::parse(&format!(
300 0 : "{}/v1/tenant/{}/secondary/download",
301 0 : self.mgmt_api_endpoint, tenant_id
302 0 : ))
303 0 : .expect("Cannot build URL");
304 :
305 0 : if let Some(wait) = wait {
306 0 : path.query_pairs_mut()
307 0 : .append_pair("wait_ms", &format!("{}", wait.as_millis()));
308 0 : }
309 :
310 0 : let response = self.request(Method::POST, path, ()).await?;
311 0 : let status = response.status();
312 0 : let progress: SecondaryProgress = response.json().await.map_err(Error::ReceiveBody)?;
313 0 : Ok((status, progress))
314 0 : }
315 :
316 0 : pub async fn tenant_secondary_status(
317 0 : &self,
318 0 : tenant_shard_id: TenantShardId,
319 0 : ) -> Result<SecondaryProgress> {
320 0 : let path = reqwest::Url::parse(&format!(
321 0 : "{}/v1/tenant/{}/secondary/status",
322 0 : self.mgmt_api_endpoint, tenant_shard_id
323 0 : ))
324 0 : .expect("Cannot build URL");
325 0 :
326 0 : self.request(Method::GET, path, ())
327 0 : .await?
328 0 : .json()
329 0 : .await
330 0 : .map_err(Error::ReceiveBody)
331 0 : }
332 :
333 0 : pub async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
334 0 : let path = reqwest::Url::parse(&format!(
335 0 : "{}/v1/tenant/{}/heatmap_upload",
336 0 : self.mgmt_api_endpoint, tenant_id
337 0 : ))
338 0 : .expect("Cannot build URL");
339 0 :
340 0 : self.request(Method::POST, path, ()).await?;
341 0 : Ok(())
342 0 : }
343 :
344 0 : pub async fn location_config(
345 0 : &self,
346 0 : tenant_shard_id: TenantShardId,
347 0 : config: LocationConfig,
348 0 : flush_ms: Option<std::time::Duration>,
349 0 : lazy: bool,
350 0 : ) -> Result<()> {
351 0 : let req_body = TenantLocationConfigRequest { config };
352 0 :
353 0 : let mut path = reqwest::Url::parse(&format!(
354 0 : "{}/v1/tenant/{}/location_config",
355 0 : self.mgmt_api_endpoint, tenant_shard_id
356 0 : ))
357 0 : // Should always work: mgmt_api_endpoint is configuration, not user input.
358 0 : .expect("Cannot build URL");
359 0 :
360 0 : if lazy {
361 0 : path.query_pairs_mut().append_pair("lazy", "true");
362 0 : }
363 :
364 0 : if let Some(flush_ms) = flush_ms {
365 0 : path.query_pairs_mut()
366 0 : .append_pair("flush_ms", &format!("{}", flush_ms.as_millis()));
367 0 : }
368 :
369 0 : self.request(Method::PUT, path, &req_body).await?;
370 0 : Ok(())
371 0 : }
372 :
373 0 : pub async fn list_location_config(&self) -> Result<LocationConfigListResponse> {
374 0 : let path = format!("{}/v1/location_config", self.mgmt_api_endpoint);
375 0 : self.request(Method::GET, &path, ())
376 0 : .await?
377 0 : .json()
378 0 : .await
379 0 : .map_err(Error::ReceiveBody)
380 0 : }
381 :
382 0 : pub async fn get_location_config(
383 0 : &self,
384 0 : tenant_shard_id: TenantShardId,
385 0 : ) -> Result<Option<LocationConfig>> {
386 0 : let path = format!(
387 0 : "{}/v1/location_config/{tenant_shard_id}",
388 0 : self.mgmt_api_endpoint
389 0 : );
390 0 : self.request(Method::GET, &path, ())
391 0 : .await?
392 0 : .json()
393 0 : .await
394 0 : .map_err(Error::ReceiveBody)
395 0 : }
396 :
397 0 : pub async fn timeline_create(
398 0 : &self,
399 0 : tenant_shard_id: TenantShardId,
400 0 : req: &TimelineCreateRequest,
401 0 : ) -> Result<TimelineInfo> {
402 0 : let uri = format!(
403 0 : "{}/v1/tenant/{}/timeline",
404 0 : self.mgmt_api_endpoint, tenant_shard_id
405 0 : );
406 0 : self.request(Method::POST, &uri, req)
407 0 : .await?
408 0 : .json()
409 0 : .await
410 0 : .map_err(Error::ReceiveBody)
411 0 : }
412 :
413 : /// The timeline deletion API can return 201 if deletion is incomplete, or
414 : /// 403 if it is complete. Callers are responsible for checking the status
415 : /// code and retrying. Error codes other than 403 will return Err().
416 0 : pub async fn timeline_delete(
417 0 : &self,
418 0 : tenant_shard_id: TenantShardId,
419 0 : timeline_id: TimelineId,
420 0 : ) -> Result<StatusCode> {
421 0 : let uri = format!(
422 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
423 0 : self.mgmt_api_endpoint
424 0 : );
425 0 :
426 0 : match self.request(Method::DELETE, &uri, ()).await {
427 0 : Err(Error::ApiError(status_code, msg)) => {
428 0 : if status_code == StatusCode::NOT_FOUND {
429 0 : Ok(StatusCode::NOT_FOUND)
430 : } else {
431 0 : Err(Error::ApiError(status_code, msg))
432 : }
433 : }
434 0 : Err(e) => Err(e),
435 0 : Ok(response) => Ok(response.status()),
436 : }
437 0 : }
438 :
439 0 : pub async fn timeline_archival_config(
440 0 : &self,
441 0 : tenant_shard_id: TenantShardId,
442 0 : timeline_id: TimelineId,
443 0 : req: &TimelineArchivalConfigRequest,
444 0 : ) -> Result<()> {
445 0 : let uri = format!(
446 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/archival_config",
447 0 : self.mgmt_api_endpoint
448 0 : );
449 0 :
450 0 : self.request(Method::PUT, &uri, req)
451 0 : .await?
452 0 : .json()
453 0 : .await
454 0 : .map_err(Error::ReceiveBody)
455 0 : }
456 :
457 0 : pub async fn timeline_detach_ancestor(
458 0 : &self,
459 0 : tenant_shard_id: TenantShardId,
460 0 : timeline_id: TimelineId,
461 0 : behavior: Option<DetachBehavior>,
462 0 : ) -> Result<AncestorDetached> {
463 0 : let uri = format!(
464 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/detach_ancestor",
465 0 : self.mgmt_api_endpoint
466 0 : );
467 0 : let mut uri = Url::parse(&uri)
468 0 : .map_err(|e| Error::ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{e}")))?;
469 :
470 0 : if let Some(behavior) = behavior {
471 0 : uri.query_pairs_mut()
472 0 : .append_pair("detach_behavior", &behavior.to_string());
473 0 : }
474 :
475 0 : self.request(Method::PUT, uri, ())
476 0 : .await?
477 0 : .json()
478 0 : .await
479 0 : .map_err(Error::ReceiveBody)
480 0 : }
481 :
482 0 : pub async fn timeline_block_unblock_gc(
483 0 : &self,
484 0 : tenant_shard_id: TenantShardId,
485 0 : timeline_id: TimelineId,
486 0 : dir: BlockUnblock,
487 0 : ) -> Result<()> {
488 0 : let uri = format!(
489 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/{dir}_gc",
490 0 : self.mgmt_api_endpoint,
491 0 : );
492 0 :
493 0 : self.request(Method::POST, &uri, ()).await.map(|_| ())
494 0 : }
495 :
496 0 : pub async fn timeline_download_heatmap_layers(
497 0 : &self,
498 0 : tenant_shard_id: TenantShardId,
499 0 : timeline_id: TimelineId,
500 0 : concurrency: Option<usize>,
501 0 : recurse: bool,
502 0 : ) -> Result<()> {
503 0 : let mut path = reqwest::Url::parse(&format!(
504 0 : "{}/v1/tenant/{}/timeline/{}/download_heatmap_layers",
505 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
506 0 : ))
507 0 : .expect("Cannot build URL");
508 0 :
509 0 : path.query_pairs_mut()
510 0 : .append_pair("recurse", &format!("{}", recurse));
511 :
512 0 : if let Some(concurrency) = concurrency {
513 0 : path.query_pairs_mut()
514 0 : .append_pair("concurrency", &format!("{}", concurrency));
515 0 : }
516 :
517 0 : self.request(Method::POST, path, ()).await.map(|_| ())
518 0 : }
519 :
520 0 : pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
521 0 : let uri = format!(
522 0 : "{}/v1/tenant/{}/reset",
523 0 : self.mgmt_api_endpoint, tenant_shard_id
524 0 : );
525 0 : self.request(Method::POST, &uri, ())
526 0 : .await?
527 0 : .json()
528 0 : .await
529 0 : .map_err(Error::ReceiveBody)
530 0 : }
531 :
532 0 : pub async fn tenant_shard_split(
533 0 : &self,
534 0 : tenant_shard_id: TenantShardId,
535 0 : req: TenantShardSplitRequest,
536 0 : ) -> Result<TenantShardSplitResponse> {
537 0 : let uri = format!(
538 0 : "{}/v1/tenant/{}/shard_split",
539 0 : self.mgmt_api_endpoint, tenant_shard_id
540 0 : );
541 0 : self.request(Method::PUT, &uri, req)
542 0 : .await?
543 0 : .json()
544 0 : .await
545 0 : .map_err(Error::ReceiveBody)
546 0 : }
547 :
548 0 : pub async fn timeline_list(
549 0 : &self,
550 0 : tenant_shard_id: &TenantShardId,
551 0 : ) -> Result<Vec<TimelineInfo>> {
552 0 : let uri = format!(
553 0 : "{}/v1/tenant/{}/timeline",
554 0 : self.mgmt_api_endpoint, tenant_shard_id
555 0 : );
556 0 : self.get(&uri)
557 0 : .await?
558 0 : .json()
559 0 : .await
560 0 : .map_err(Error::ReceiveBody)
561 0 : }
562 :
563 0 : pub async fn tenant_synthetic_size(
564 0 : &self,
565 0 : tenant_shard_id: TenantShardId,
566 0 : ) -> Result<TenantHistorySize> {
567 0 : let uri = format!(
568 0 : "{}/v1/tenant/{}/synthetic_size",
569 0 : self.mgmt_api_endpoint, tenant_shard_id
570 0 : );
571 0 : self.get(&uri)
572 0 : .await?
573 0 : .json()
574 0 : .await
575 0 : .map_err(Error::ReceiveBody)
576 0 : }
577 :
578 0 : pub async fn put_io_engine(
579 0 : &self,
580 0 : engine: &pageserver_api::models::virtual_file::IoEngineKind,
581 0 : ) -> Result<()> {
582 0 : let uri = format!("{}/v1/io_engine", self.mgmt_api_endpoint);
583 0 : self.request(Method::PUT, uri, engine)
584 0 : .await?
585 0 : .json()
586 0 : .await
587 0 : .map_err(Error::ReceiveBody)
588 0 : }
589 :
590 : /// Configs io mode at runtime.
591 0 : pub async fn put_io_mode(
592 0 : &self,
593 0 : mode: &pageserver_api::models::virtual_file::IoMode,
594 0 : ) -> Result<()> {
595 0 : let uri = format!("{}/v1/io_mode", self.mgmt_api_endpoint);
596 0 : self.request(Method::PUT, uri, mode)
597 0 : .await?
598 0 : .json()
599 0 : .await
600 0 : .map_err(Error::ReceiveBody)
601 0 : }
602 :
603 0 : pub async fn get_utilization(&self) -> Result<PageserverUtilization> {
604 0 : let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint);
605 0 : self.get(uri)
606 0 : .await?
607 0 : .json()
608 0 : .await
609 0 : .map_err(Error::ReceiveBody)
610 0 : }
611 :
612 0 : pub async fn top_tenant_shards(
613 0 : &self,
614 0 : request: TopTenantShardsRequest,
615 0 : ) -> Result<TopTenantShardsResponse> {
616 0 : let uri = format!("{}/v1/top_tenants", self.mgmt_api_endpoint);
617 0 : self.request(Method::POST, uri, request)
618 0 : .await?
619 0 : .json()
620 0 : .await
621 0 : .map_err(Error::ReceiveBody)
622 0 : }
623 :
624 0 : pub async fn layer_map_info(
625 0 : &self,
626 0 : tenant_shard_id: TenantShardId,
627 0 : timeline_id: TimelineId,
628 0 : ) -> Result<LayerMapInfo> {
629 0 : let uri = format!(
630 0 : "{}/v1/tenant/{}/timeline/{}/layer",
631 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id,
632 0 : );
633 0 : self.get(&uri)
634 0 : .await?
635 0 : .json()
636 0 : .await
637 0 : .map_err(Error::ReceiveBody)
638 0 : }
639 :
640 0 : pub async fn layer_evict(
641 0 : &self,
642 0 : tenant_shard_id: TenantShardId,
643 0 : timeline_id: TimelineId,
644 0 : layer_file_name: &str,
645 0 : ) -> Result<bool> {
646 0 : let uri = format!(
647 0 : "{}/v1/tenant/{}/timeline/{}/layer/{}",
648 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
649 0 : );
650 0 : let resp = self.request_noerror(Method::DELETE, &uri, ()).await?;
651 0 : match resp.status() {
652 0 : StatusCode::OK => Ok(true),
653 0 : StatusCode::NOT_MODIFIED => Ok(false),
654 : // TODO: dedupe this pattern / introduce separate error variant?
655 0 : status => Err(match resp.json::<HttpErrorBody>().await {
656 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
657 : Err(_) => {
658 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
659 : }
660 : }),
661 : }
662 0 : }
663 :
664 0 : pub async fn layer_ondemand_download(
665 0 : &self,
666 0 : tenant_shard_id: TenantShardId,
667 0 : timeline_id: TimelineId,
668 0 : layer_file_name: &str,
669 0 : ) -> Result<bool> {
670 0 : let uri = format!(
671 0 : "{}/v1/tenant/{}/timeline/{}/layer/{}",
672 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
673 0 : );
674 0 : let resp = self.request_noerror(Method::GET, &uri, ()).await?;
675 0 : match resp.status() {
676 0 : StatusCode::OK => Ok(true),
677 0 : StatusCode::NOT_MODIFIED => Ok(false),
678 : // TODO: dedupe this pattern / introduce separate error variant?
679 0 : status => Err(match resp.json::<HttpErrorBody>().await {
680 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
681 : Err(_) => {
682 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
683 : }
684 : }),
685 : }
686 0 : }
687 :
688 0 : pub async fn ingest_aux_files(
689 0 : &self,
690 0 : tenant_shard_id: TenantShardId,
691 0 : timeline_id: TimelineId,
692 0 : aux_files: HashMap<String, String>,
693 0 : ) -> Result<bool> {
694 0 : let uri = format!(
695 0 : "{}/v1/tenant/{}/timeline/{}/ingest_aux_files",
696 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
697 0 : );
698 0 : let resp = self
699 0 : .request_noerror(Method::POST, &uri, IngestAuxFilesRequest { aux_files })
700 0 : .await?;
701 0 : match resp.status() {
702 0 : StatusCode::OK => Ok(true),
703 0 : status => Err(match resp.json::<HttpErrorBody>().await {
704 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
705 : Err(_) => {
706 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
707 : }
708 : }),
709 : }
710 0 : }
711 :
712 0 : pub async fn list_aux_files(
713 0 : &self,
714 0 : tenant_shard_id: TenantShardId,
715 0 : timeline_id: TimelineId,
716 0 : lsn: Lsn,
717 0 : ) -> Result<HashMap<String, Bytes>> {
718 0 : let uri = format!(
719 0 : "{}/v1/tenant/{}/timeline/{}/list_aux_files",
720 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
721 0 : );
722 0 : let resp = self
723 0 : .request_noerror(Method::POST, &uri, ListAuxFilesRequest { lsn })
724 0 : .await?;
725 0 : match resp.status() {
726 : StatusCode::OK => {
727 0 : let resp: HashMap<String, Bytes> = resp.json().await.map_err(|e| {
728 0 : Error::ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{e}"))
729 0 : })?;
730 0 : Ok(resp)
731 : }
732 0 : status => Err(match resp.json::<HttpErrorBody>().await {
733 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
734 : Err(_) => {
735 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
736 : }
737 : }),
738 : }
739 0 : }
740 :
741 0 : pub async fn import_basebackup(
742 0 : &self,
743 0 : tenant_id: TenantId,
744 0 : timeline_id: TimelineId,
745 0 : base_lsn: Lsn,
746 0 : end_lsn: Lsn,
747 0 : pg_version: u32,
748 0 : basebackup_tarball: ReqwestBody,
749 0 : ) -> Result<()> {
750 0 : let uri = format!(
751 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_basebackup?base_lsn={base_lsn}&end_lsn={end_lsn}&pg_version={pg_version}",
752 0 : self.mgmt_api_endpoint,
753 0 : );
754 0 : self.start_request(Method::PUT, uri)
755 0 : .body(basebackup_tarball)
756 0 : .send()
757 0 : .await
758 0 : .map_err(Error::SendRequest)?
759 0 : .error_from_body()
760 0 : .await?
761 0 : .json()
762 0 : .await
763 0 : .map_err(Error::ReceiveBody)
764 0 : }
765 :
766 0 : pub async fn import_wal(
767 0 : &self,
768 0 : tenant_id: TenantId,
769 0 : timeline_id: TimelineId,
770 0 : start_lsn: Lsn,
771 0 : end_lsn: Lsn,
772 0 : wal_tarball: ReqwestBody,
773 0 : ) -> Result<()> {
774 0 : let uri = format!(
775 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_wal?start_lsn={start_lsn}&end_lsn={end_lsn}",
776 0 : self.mgmt_api_endpoint,
777 0 : );
778 0 : self.start_request(Method::PUT, uri)
779 0 : .body(wal_tarball)
780 0 : .send()
781 0 : .await
782 0 : .map_err(Error::SendRequest)?
783 0 : .error_from_body()
784 0 : .await?
785 0 : .json()
786 0 : .await
787 0 : .map_err(Error::ReceiveBody)
788 0 : }
789 :
790 0 : pub async fn timeline_init_lsn_lease(
791 0 : &self,
792 0 : tenant_shard_id: TenantShardId,
793 0 : timeline_id: TimelineId,
794 0 : lsn: Lsn,
795 0 : ) -> Result<LsnLease> {
796 0 : let uri = format!(
797 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/lsn_lease",
798 0 : self.mgmt_api_endpoint,
799 0 : );
800 0 :
801 0 : self.request(Method::POST, &uri, LsnLeaseRequest { lsn })
802 0 : .await?
803 0 : .json()
804 0 : .await
805 0 : .map_err(Error::ReceiveBody)
806 0 : }
807 :
808 0 : pub async fn wait_lsn(
809 0 : &self,
810 0 : tenant_shard_id: TenantShardId,
811 0 : request: TenantWaitLsnRequest,
812 0 : ) -> Result<StatusCode> {
813 0 : let uri = format!(
814 0 : "{}/v1/tenant/{tenant_shard_id}/wait_lsn",
815 0 : self.mgmt_api_endpoint,
816 0 : );
817 0 :
818 0 : self.request_noerror(Method::POST, uri, request)
819 0 : .await
820 0 : .map(|resp| resp.status())
821 0 : }
822 : }
|