Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use bytes::Bytes;
4 : use detach_ancestor::AncestorDetached;
5 : use pageserver_api::{models::*, shard::TenantShardId};
6 : use reqwest::{IntoUrl, Method, StatusCode};
7 : use utils::{
8 : http::error::HttpErrorBody,
9 : id::{TenantId, TimelineId},
10 : lsn::Lsn,
11 : };
12 :
13 : pub use reqwest::Body as ReqwestBody;
14 :
15 : pub mod util;
16 :
17 : #[derive(Debug, Clone)]
18 : pub struct Client {
19 : mgmt_api_endpoint: String,
20 : authorization_header: Option<String>,
21 : client: reqwest::Client,
22 : }
23 :
24 0 : #[derive(thiserror::Error, Debug)]
25 : pub enum Error {
26 : #[error("send request: {0}")]
27 : SendRequest(reqwest::Error),
28 :
29 : #[error("receive body: {0}")]
30 : ReceiveBody(reqwest::Error),
31 :
32 : #[error("receive error body: {0}")]
33 : ReceiveErrorBody(String),
34 :
35 : #[error("pageserver API: {1}")]
36 : ApiError(StatusCode, String),
37 :
38 : #[error("Cancelled")]
39 : Cancelled,
40 : }
41 :
42 : pub type Result<T> = std::result::Result<T, Error>;
43 :
44 : pub trait ResponseErrorMessageExt: Sized {
45 : fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
46 : }
47 :
48 : impl ResponseErrorMessageExt for reqwest::Response {
49 0 : async fn error_from_body(self) -> Result<Self> {
50 0 : let status = self.status();
51 0 : if !(status.is_client_error() || status.is_server_error()) {
52 0 : return Ok(self);
53 0 : }
54 0 :
55 0 : let url = self.url().to_owned();
56 0 : Err(match self.json::<HttpErrorBody>().await {
57 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
58 : Err(_) => {
59 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), url))
60 : }
61 : })
62 0 : }
63 : }
64 :
65 : pub enum ForceAwaitLogicalSize {
66 : Yes,
67 : No,
68 : }
69 :
70 : impl Client {
71 0 : pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
72 0 : Self::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt)
73 0 : }
74 :
75 0 : pub fn from_client(
76 0 : client: reqwest::Client,
77 0 : mgmt_api_endpoint: String,
78 0 : jwt: Option<&str>,
79 0 : ) -> Self {
80 0 : Self {
81 0 : mgmt_api_endpoint,
82 0 : authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
83 0 : client,
84 0 : }
85 0 : }
86 :
87 0 : pub async fn list_tenants(&self) -> Result<Vec<pageserver_api::models::TenantInfo>> {
88 0 : let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
89 0 : let resp = self.get(&uri).await?;
90 0 : resp.json().await.map_err(Error::ReceiveBody)
91 0 : }
92 :
93 : /// Get an arbitrary path and returning a streaming Response. This function is suitable
94 : /// for pass-through/proxy use cases where we don't care what the response content looks
95 : /// like.
96 : ///
97 : /// Use/add one of the properly typed methods below if you know aren't proxying, and
98 : /// know what kind of response you expect.
99 0 : pub async fn get_raw(&self, path: String) -> Result<reqwest::Response> {
100 0 : debug_assert!(path.starts_with('/'));
101 0 : let uri = format!("{}{}", self.mgmt_api_endpoint, path);
102 0 :
103 0 : let req = self.client.request(Method::GET, uri);
104 0 : let req = if let Some(value) = &self.authorization_header {
105 0 : req.header(reqwest::header::AUTHORIZATION, value)
106 : } else {
107 0 : req
108 : };
109 0 : req.send().await.map_err(Error::ReceiveBody)
110 0 : }
111 :
112 0 : pub async fn tenant_details(
113 0 : &self,
114 0 : tenant_shard_id: TenantShardId,
115 0 : ) -> Result<pageserver_api::models::TenantDetails> {
116 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
117 0 : self.get(uri)
118 0 : .await?
119 0 : .json()
120 0 : .await
121 0 : .map_err(Error::ReceiveBody)
122 0 : }
123 :
124 0 : pub async fn list_timelines(
125 0 : &self,
126 0 : tenant_shard_id: TenantShardId,
127 0 : ) -> Result<Vec<pageserver_api::models::TimelineInfo>> {
128 0 : let uri = format!(
129 0 : "{}/v1/tenant/{tenant_shard_id}/timeline",
130 0 : self.mgmt_api_endpoint
131 0 : );
132 0 : self.get(&uri)
133 0 : .await?
134 0 : .json()
135 0 : .await
136 0 : .map_err(Error::ReceiveBody)
137 0 : }
138 :
139 0 : pub async fn timeline_info(
140 0 : &self,
141 0 : tenant_shard_id: TenantShardId,
142 0 : timeline_id: TimelineId,
143 0 : force_await_logical_size: ForceAwaitLogicalSize,
144 0 : ) -> Result<pageserver_api::models::TimelineInfo> {
145 0 : let uri = format!(
146 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
147 0 : self.mgmt_api_endpoint
148 0 : );
149 :
150 0 : let uri = match force_await_logical_size {
151 0 : ForceAwaitLogicalSize::Yes => format!("{}?force-await-logical-size={}", uri, true),
152 0 : ForceAwaitLogicalSize::No => uri,
153 : };
154 :
155 0 : self.get(&uri)
156 0 : .await?
157 0 : .json()
158 0 : .await
159 0 : .map_err(Error::ReceiveBody)
160 0 : }
161 :
162 0 : pub async fn keyspace(
163 0 : &self,
164 0 : tenant_shard_id: TenantShardId,
165 0 : timeline_id: TimelineId,
166 0 : ) -> Result<pageserver_api::models::partitioning::Partitioning> {
167 0 : let uri = format!(
168 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/keyspace",
169 0 : self.mgmt_api_endpoint
170 0 : );
171 0 : self.get(&uri)
172 0 : .await?
173 0 : .json()
174 0 : .await
175 0 : .map_err(Error::ReceiveBody)
176 0 : }
177 :
178 0 : async fn get<U: IntoUrl>(&self, uri: U) -> Result<reqwest::Response> {
179 0 : self.request(Method::GET, uri, ()).await
180 0 : }
181 :
182 0 : fn start_request<U: reqwest::IntoUrl>(
183 0 : &self,
184 0 : method: Method,
185 0 : uri: U,
186 0 : ) -> reqwest::RequestBuilder {
187 0 : let req = self.client.request(method, uri);
188 0 : if let Some(value) = &self.authorization_header {
189 0 : req.header(reqwest::header::AUTHORIZATION, value)
190 : } else {
191 0 : req
192 : }
193 0 : }
194 :
195 0 : async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
196 0 : &self,
197 0 : method: Method,
198 0 : uri: U,
199 0 : body: B,
200 0 : ) -> Result<reqwest::Response> {
201 0 : self.start_request(method, uri)
202 0 : .json(&body)
203 0 : .send()
204 0 : .await
205 0 : .map_err(Error::ReceiveBody)
206 0 : }
207 :
208 0 : async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
209 0 : &self,
210 0 : method: Method,
211 0 : uri: U,
212 0 : body: B,
213 0 : ) -> Result<reqwest::Response> {
214 0 : let res = self.request_noerror(method, uri, body).await?;
215 0 : let response = res.error_from_body().await?;
216 0 : Ok(response)
217 0 : }
218 :
219 0 : pub async fn status(&self) -> Result<()> {
220 0 : let uri = format!("{}/v1/status", self.mgmt_api_endpoint);
221 0 : self.get(&uri).await?;
222 0 : Ok(())
223 0 : }
224 :
225 : /// The tenant deletion API can return 202 if deletion is incomplete, or
226 : /// 404 if it is complete. Callers are responsible for checking the status
227 : /// code and retrying. Error codes other than 404 will return Err().
228 0 : pub async fn tenant_delete(&self, tenant_shard_id: TenantShardId) -> Result<StatusCode> {
229 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
230 0 :
231 0 : match self.request(Method::DELETE, &uri, ()).await {
232 0 : Err(Error::ApiError(status_code, msg)) => {
233 0 : if status_code == StatusCode::NOT_FOUND {
234 0 : Ok(StatusCode::NOT_FOUND)
235 : } else {
236 0 : Err(Error::ApiError(status_code, msg))
237 : }
238 : }
239 0 : Err(e) => Err(e),
240 0 : Ok(response) => Ok(response.status()),
241 : }
242 0 : }
243 :
244 0 : pub async fn tenant_time_travel_remote_storage(
245 0 : &self,
246 0 : tenant_shard_id: TenantShardId,
247 0 : timestamp: &str,
248 0 : done_if_after: &str,
249 0 : ) -> Result<()> {
250 0 : let uri = format!(
251 0 : "{}/v1/tenant/{tenant_shard_id}/time_travel_remote_storage?travel_to={timestamp}&done_if_after={done_if_after}",
252 0 : self.mgmt_api_endpoint
253 0 : );
254 0 : self.request(Method::PUT, &uri, ()).await?;
255 0 : Ok(())
256 0 : }
257 :
258 0 : pub async fn tenant_scan_remote_storage(
259 0 : &self,
260 0 : tenant_id: TenantId,
261 0 : ) -> Result<TenantScanRemoteStorageResponse> {
262 0 : let uri = format!(
263 0 : "{}/v1/tenant/{tenant_id}/scan_remote_storage",
264 0 : self.mgmt_api_endpoint
265 0 : );
266 0 : let response = self.request(Method::GET, &uri, ()).await?;
267 0 : let body = response.json().await.map_err(Error::ReceiveBody)?;
268 0 : Ok(body)
269 0 : }
270 :
271 0 : pub async fn tenant_config(&self, req: &TenantConfigRequest) -> Result<()> {
272 0 : let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
273 0 : self.request(Method::PUT, &uri, req).await?;
274 0 : Ok(())
275 0 : }
276 :
277 0 : pub async fn tenant_secondary_download(
278 0 : &self,
279 0 : tenant_id: TenantShardId,
280 0 : wait: Option<std::time::Duration>,
281 0 : ) -> Result<(StatusCode, SecondaryProgress)> {
282 0 : let mut path = reqwest::Url::parse(&format!(
283 0 : "{}/v1/tenant/{}/secondary/download",
284 0 : self.mgmt_api_endpoint, tenant_id
285 0 : ))
286 0 : .expect("Cannot build URL");
287 :
288 0 : if let Some(wait) = wait {
289 0 : path.query_pairs_mut()
290 0 : .append_pair("wait_ms", &format!("{}", wait.as_millis()));
291 0 : }
292 :
293 0 : let response = self.request(Method::POST, path, ()).await?;
294 0 : let status = response.status();
295 0 : let progress: SecondaryProgress = response.json().await.map_err(Error::ReceiveBody)?;
296 0 : Ok((status, progress))
297 0 : }
298 :
299 0 : pub async fn tenant_secondary_status(
300 0 : &self,
301 0 : tenant_shard_id: TenantShardId,
302 0 : ) -> Result<SecondaryProgress> {
303 0 : let path = reqwest::Url::parse(&format!(
304 0 : "{}/v1/tenant/{}/secondary/status",
305 0 : self.mgmt_api_endpoint, tenant_shard_id
306 0 : ))
307 0 : .expect("Cannot build URL");
308 0 :
309 0 : self.request(Method::GET, path, ())
310 0 : .await?
311 0 : .json()
312 0 : .await
313 0 : .map_err(Error::ReceiveBody)
314 0 : }
315 :
316 0 : pub async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
317 0 : let path = reqwest::Url::parse(&format!(
318 0 : "{}/v1/tenant/{}/heatmap_upload",
319 0 : self.mgmt_api_endpoint, tenant_id
320 0 : ))
321 0 : .expect("Cannot build URL");
322 0 :
323 0 : self.request(Method::POST, path, ()).await?;
324 0 : Ok(())
325 0 : }
326 :
327 0 : pub async fn location_config(
328 0 : &self,
329 0 : tenant_shard_id: TenantShardId,
330 0 : config: LocationConfig,
331 0 : flush_ms: Option<std::time::Duration>,
332 0 : lazy: bool,
333 0 : ) -> Result<()> {
334 0 : let req_body = TenantLocationConfigRequest { config };
335 0 :
336 0 : let mut path = reqwest::Url::parse(&format!(
337 0 : "{}/v1/tenant/{}/location_config",
338 0 : self.mgmt_api_endpoint, tenant_shard_id
339 0 : ))
340 0 : // Should always work: mgmt_api_endpoint is configuration, not user input.
341 0 : .expect("Cannot build URL");
342 0 :
343 0 : if lazy {
344 0 : path.query_pairs_mut().append_pair("lazy", "true");
345 0 : }
346 :
347 0 : if let Some(flush_ms) = flush_ms {
348 0 : path.query_pairs_mut()
349 0 : .append_pair("flush_ms", &format!("{}", flush_ms.as_millis()));
350 0 : }
351 :
352 0 : self.request(Method::PUT, path, &req_body).await?;
353 0 : Ok(())
354 0 : }
355 :
356 0 : pub async fn list_location_config(&self) -> Result<LocationConfigListResponse> {
357 0 : let path = format!("{}/v1/location_config", self.mgmt_api_endpoint);
358 0 : self.request(Method::GET, &path, ())
359 0 : .await?
360 0 : .json()
361 0 : .await
362 0 : .map_err(Error::ReceiveBody)
363 0 : }
364 :
365 0 : pub async fn get_location_config(
366 0 : &self,
367 0 : tenant_shard_id: TenantShardId,
368 0 : ) -> Result<Option<LocationConfig>> {
369 0 : let path = format!(
370 0 : "{}/v1/location_config/{tenant_shard_id}",
371 0 : self.mgmt_api_endpoint
372 0 : );
373 0 : self.request(Method::GET, &path, ())
374 0 : .await?
375 0 : .json()
376 0 : .await
377 0 : .map_err(Error::ReceiveBody)
378 0 : }
379 :
380 0 : pub async fn timeline_create(
381 0 : &self,
382 0 : tenant_shard_id: TenantShardId,
383 0 : req: &TimelineCreateRequest,
384 0 : ) -> Result<TimelineInfo> {
385 0 : let uri = format!(
386 0 : "{}/v1/tenant/{}/timeline",
387 0 : self.mgmt_api_endpoint, tenant_shard_id
388 0 : );
389 0 : self.request(Method::POST, &uri, req)
390 0 : .await?
391 0 : .json()
392 0 : .await
393 0 : .map_err(Error::ReceiveBody)
394 0 : }
395 :
396 : /// The timeline deletion API can return 201 if deletion is incomplete, or
397 : /// 403 if it is complete. Callers are responsible for checking the status
398 : /// code and retrying. Error codes other than 403 will return Err().
399 0 : pub async fn timeline_delete(
400 0 : &self,
401 0 : tenant_shard_id: TenantShardId,
402 0 : timeline_id: TimelineId,
403 0 : ) -> Result<StatusCode> {
404 0 : let uri = format!(
405 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
406 0 : self.mgmt_api_endpoint
407 0 : );
408 0 :
409 0 : match self.request(Method::DELETE, &uri, ()).await {
410 0 : Err(Error::ApiError(status_code, msg)) => {
411 0 : if status_code == StatusCode::NOT_FOUND {
412 0 : Ok(StatusCode::NOT_FOUND)
413 : } else {
414 0 : Err(Error::ApiError(status_code, msg))
415 : }
416 : }
417 0 : Err(e) => Err(e),
418 0 : Ok(response) => Ok(response.status()),
419 : }
420 0 : }
421 :
422 0 : pub async fn timeline_detach_ancestor(
423 0 : &self,
424 0 : tenant_shard_id: TenantShardId,
425 0 : timeline_id: TimelineId,
426 0 : ) -> Result<AncestorDetached> {
427 0 : let uri = format!(
428 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/detach_ancestor",
429 0 : self.mgmt_api_endpoint
430 0 : );
431 0 :
432 0 : self.request(Method::PUT, &uri, ())
433 0 : .await?
434 0 : .json()
435 0 : .await
436 0 : .map_err(Error::ReceiveBody)
437 0 : }
438 :
439 0 : pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
440 0 : let uri = format!(
441 0 : "{}/v1/tenant/{}/reset",
442 0 : self.mgmt_api_endpoint, tenant_shard_id
443 0 : );
444 0 : self.request(Method::POST, &uri, ())
445 0 : .await?
446 0 : .json()
447 0 : .await
448 0 : .map_err(Error::ReceiveBody)
449 0 : }
450 :
451 0 : pub async fn tenant_shard_split(
452 0 : &self,
453 0 : tenant_shard_id: TenantShardId,
454 0 : req: TenantShardSplitRequest,
455 0 : ) -> Result<TenantShardSplitResponse> {
456 0 : let uri = format!(
457 0 : "{}/v1/tenant/{}/shard_split",
458 0 : self.mgmt_api_endpoint, tenant_shard_id
459 0 : );
460 0 : self.request(Method::PUT, &uri, req)
461 0 : .await?
462 0 : .json()
463 0 : .await
464 0 : .map_err(Error::ReceiveBody)
465 0 : }
466 :
467 0 : pub async fn timeline_list(
468 0 : &self,
469 0 : tenant_shard_id: &TenantShardId,
470 0 : ) -> Result<Vec<TimelineInfo>> {
471 0 : let uri = format!(
472 0 : "{}/v1/tenant/{}/timeline",
473 0 : self.mgmt_api_endpoint, tenant_shard_id
474 0 : );
475 0 : self.get(&uri)
476 0 : .await?
477 0 : .json()
478 0 : .await
479 0 : .map_err(Error::ReceiveBody)
480 0 : }
481 :
482 0 : pub async fn tenant_synthetic_size(
483 0 : &self,
484 0 : tenant_shard_id: TenantShardId,
485 0 : ) -> Result<TenantHistorySize> {
486 0 : let uri = format!(
487 0 : "{}/v1/tenant/{}/synthetic_size",
488 0 : self.mgmt_api_endpoint, tenant_shard_id
489 0 : );
490 0 : self.get(&uri)
491 0 : .await?
492 0 : .json()
493 0 : .await
494 0 : .map_err(Error::ReceiveBody)
495 0 : }
496 :
497 0 : pub async fn put_io_engine(
498 0 : &self,
499 0 : engine: &pageserver_api::models::virtual_file::IoEngineKind,
500 0 : ) -> Result<()> {
501 0 : let uri = format!("{}/v1/io_engine", self.mgmt_api_endpoint);
502 0 : self.request(Method::PUT, uri, engine)
503 0 : .await?
504 0 : .json()
505 0 : .await
506 0 : .map_err(Error::ReceiveBody)
507 0 : }
508 :
509 0 : pub async fn get_utilization(&self) -> Result<PageserverUtilization> {
510 0 : let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint);
511 0 : self.get(uri)
512 0 : .await?
513 0 : .json()
514 0 : .await
515 0 : .map_err(Error::ReceiveBody)
516 0 : }
517 :
518 0 : pub async fn top_tenant_shards(
519 0 : &self,
520 0 : request: TopTenantShardsRequest,
521 0 : ) -> Result<TopTenantShardsResponse> {
522 0 : let uri = format!("{}/v1/top_tenants", self.mgmt_api_endpoint);
523 0 : self.request(Method::POST, uri, request)
524 0 : .await?
525 0 : .json()
526 0 : .await
527 0 : .map_err(Error::ReceiveBody)
528 0 : }
529 :
530 0 : pub async fn layer_map_info(
531 0 : &self,
532 0 : tenant_shard_id: TenantShardId,
533 0 : timeline_id: TimelineId,
534 0 : ) -> Result<LayerMapInfo> {
535 0 : let uri = format!(
536 0 : "{}/v1/tenant/{}/timeline/{}/layer",
537 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id,
538 0 : );
539 0 : self.get(&uri)
540 0 : .await?
541 0 : .json()
542 0 : .await
543 0 : .map_err(Error::ReceiveBody)
544 0 : }
545 :
546 0 : pub async fn layer_evict(
547 0 : &self,
548 0 : tenant_shard_id: TenantShardId,
549 0 : timeline_id: TimelineId,
550 0 : layer_file_name: &str,
551 0 : ) -> Result<bool> {
552 0 : let uri = format!(
553 0 : "{}/v1/tenant/{}/timeline/{}/layer/{}",
554 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
555 0 : );
556 0 : let resp = self.request_noerror(Method::DELETE, &uri, ()).await?;
557 0 : match resp.status() {
558 0 : StatusCode::OK => Ok(true),
559 0 : StatusCode::NOT_MODIFIED => Ok(false),
560 : // TODO: dedupe this pattern / introduce separate error variant?
561 0 : status => Err(match resp.json::<HttpErrorBody>().await {
562 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
563 : Err(_) => {
564 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
565 : }
566 : }),
567 : }
568 0 : }
569 :
570 0 : pub async fn layer_ondemand_download(
571 0 : &self,
572 0 : tenant_shard_id: TenantShardId,
573 0 : timeline_id: TimelineId,
574 0 : layer_file_name: &str,
575 0 : ) -> Result<bool> {
576 0 : let uri = format!(
577 0 : "{}/v1/tenant/{}/timeline/{}/layer/{}",
578 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
579 0 : );
580 0 : let resp = self.request_noerror(Method::GET, &uri, ()).await?;
581 0 : match resp.status() {
582 0 : StatusCode::OK => Ok(true),
583 0 : StatusCode::NOT_MODIFIED => Ok(false),
584 : // TODO: dedupe this pattern / introduce separate error variant?
585 0 : status => Err(match resp.json::<HttpErrorBody>().await {
586 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
587 : Err(_) => {
588 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
589 : }
590 : }),
591 : }
592 0 : }
593 :
594 0 : pub async fn ingest_aux_files(
595 0 : &self,
596 0 : tenant_shard_id: TenantShardId,
597 0 : timeline_id: TimelineId,
598 0 : aux_files: HashMap<String, String>,
599 0 : ) -> Result<bool> {
600 0 : let uri = format!(
601 0 : "{}/v1/tenant/{}/timeline/{}/ingest_aux_files",
602 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
603 0 : );
604 0 : let resp = self
605 0 : .request_noerror(Method::POST, &uri, IngestAuxFilesRequest { aux_files })
606 0 : .await?;
607 0 : match resp.status() {
608 0 : StatusCode::OK => Ok(true),
609 0 : status => Err(match resp.json::<HttpErrorBody>().await {
610 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
611 : Err(_) => {
612 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
613 : }
614 : }),
615 : }
616 0 : }
617 :
618 0 : pub async fn list_aux_files(
619 0 : &self,
620 0 : tenant_shard_id: TenantShardId,
621 0 : timeline_id: TimelineId,
622 0 : lsn: Lsn,
623 0 : ) -> Result<HashMap<String, Bytes>> {
624 0 : let uri = format!(
625 0 : "{}/v1/tenant/{}/timeline/{}/list_aux_files",
626 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
627 0 : );
628 0 : let resp = self
629 0 : .request_noerror(Method::POST, &uri, ListAuxFilesRequest { lsn })
630 0 : .await?;
631 0 : match resp.status() {
632 : StatusCode::OK => {
633 0 : let resp: HashMap<String, Bytes> = resp.json().await.map_err(|e| {
634 0 : Error::ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{e}"))
635 0 : })?;
636 0 : Ok(resp)
637 : }
638 0 : status => Err(match resp.json::<HttpErrorBody>().await {
639 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
640 : Err(_) => {
641 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
642 : }
643 : }),
644 : }
645 0 : }
646 :
647 0 : pub async fn import_basebackup(
648 0 : &self,
649 0 : tenant_id: TenantId,
650 0 : timeline_id: TimelineId,
651 0 : base_lsn: Lsn,
652 0 : end_lsn: Lsn,
653 0 : pg_version: u32,
654 0 : basebackup_tarball: ReqwestBody,
655 0 : ) -> Result<()> {
656 0 : let uri = format!(
657 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_basebackup?base_lsn={base_lsn}&end_lsn={end_lsn}&pg_version={pg_version}",
658 0 : self.mgmt_api_endpoint,
659 0 : );
660 0 : self.start_request(Method::PUT, uri)
661 0 : .body(basebackup_tarball)
662 0 : .send()
663 0 : .await
664 0 : .map_err(Error::SendRequest)?
665 0 : .error_from_body()
666 0 : .await?
667 0 : .json()
668 0 : .await
669 0 : .map_err(Error::ReceiveBody)
670 0 : }
671 :
672 0 : pub async fn import_wal(
673 0 : &self,
674 0 : tenant_id: TenantId,
675 0 : timeline_id: TimelineId,
676 0 : start_lsn: Lsn,
677 0 : end_lsn: Lsn,
678 0 : wal_tarball: ReqwestBody,
679 0 : ) -> Result<()> {
680 0 : let uri = format!(
681 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_wal?start_lsn={start_lsn}&end_lsn={end_lsn}",
682 0 : self.mgmt_api_endpoint,
683 0 : );
684 0 : self.start_request(Method::PUT, uri)
685 0 : .body(wal_tarball)
686 0 : .send()
687 0 : .await
688 0 : .map_err(Error::SendRequest)?
689 0 : .error_from_body()
690 0 : .await?
691 0 : .json()
692 0 : .await
693 0 : .map_err(Error::ReceiveBody)
694 0 : }
695 : }
|