Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use bytes::Bytes;
4 : use pageserver_api::{models::*, shard::TenantShardId};
5 : use reqwest::{IntoUrl, Method, StatusCode};
6 : use utils::{
7 : http::error::HttpErrorBody,
8 : id::{TenantId, TimelineId},
9 : lsn::Lsn,
10 : };
11 :
12 : pub mod util;
13 :
14 : #[derive(Debug, Clone)]
15 : pub struct Client {
16 : mgmt_api_endpoint: String,
17 : authorization_header: Option<String>,
18 : client: reqwest::Client,
19 : }
20 :
21 0 : #[derive(thiserror::Error, Debug)]
22 : pub enum Error {
23 : #[error("receive body: {0}")]
24 : ReceiveBody(reqwest::Error),
25 :
26 : #[error("receive error body: {0}")]
27 : ReceiveErrorBody(String),
28 :
29 : #[error("pageserver API: {1}")]
30 : ApiError(StatusCode, String),
31 :
32 : #[error("Cancelled")]
33 : Cancelled,
34 : }
35 :
36 : pub type Result<T> = std::result::Result<T, Error>;
37 :
38 : pub trait ResponseErrorMessageExt: Sized {
39 : fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
40 : }
41 :
42 : impl ResponseErrorMessageExt for reqwest::Response {
43 0 : async fn error_from_body(self) -> Result<Self> {
44 0 : let status = self.status();
45 0 : if !(status.is_client_error() || status.is_server_error()) {
46 0 : return Ok(self);
47 0 : }
48 0 :
49 0 : let url = self.url().to_owned();
50 0 : Err(match self.json::<HttpErrorBody>().await {
51 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
52 : Err(_) => {
53 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), url))
54 : }
55 : })
56 0 : }
57 : }
58 :
59 : pub enum ForceAwaitLogicalSize {
60 : Yes,
61 : No,
62 : }
63 :
64 : impl Client {
65 0 : pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
66 0 : Self::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt)
67 0 : }
68 :
69 0 : pub fn from_client(
70 0 : client: reqwest::Client,
71 0 : mgmt_api_endpoint: String,
72 0 : jwt: Option<&str>,
73 0 : ) -> Self {
74 0 : Self {
75 0 : mgmt_api_endpoint,
76 0 : authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
77 0 : client,
78 0 : }
79 0 : }
80 :
81 0 : pub async fn list_tenants(&self) -> Result<Vec<pageserver_api::models::TenantInfo>> {
82 0 : let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
83 0 : let resp = self.get(&uri).await?;
84 0 : resp.json().await.map_err(Error::ReceiveBody)
85 0 : }
86 :
87 : /// Get an arbitrary path and returning a streaming Response. This function is suitable
88 : /// for pass-through/proxy use cases where we don't care what the response content looks
89 : /// like.
90 : ///
91 : /// Use/add one of the properly typed methods below if you know aren't proxying, and
92 : /// know what kind of response you expect.
93 0 : pub async fn get_raw(&self, path: String) -> Result<reqwest::Response> {
94 0 : debug_assert!(path.starts_with('/'));
95 0 : let uri = format!("{}{}", self.mgmt_api_endpoint, path);
96 0 :
97 0 : let req = self.client.request(Method::GET, uri);
98 0 : let req = if let Some(value) = &self.authorization_header {
99 0 : req.header(reqwest::header::AUTHORIZATION, value)
100 : } else {
101 0 : req
102 : };
103 0 : req.send().await.map_err(Error::ReceiveBody)
104 0 : }
105 :
106 0 : pub async fn tenant_details(
107 0 : &self,
108 0 : tenant_shard_id: TenantShardId,
109 0 : ) -> Result<pageserver_api::models::TenantDetails> {
110 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
111 0 : self.get(uri)
112 0 : .await?
113 0 : .json()
114 0 : .await
115 0 : .map_err(Error::ReceiveBody)
116 0 : }
117 :
118 0 : pub async fn list_timelines(
119 0 : &self,
120 0 : tenant_shard_id: TenantShardId,
121 0 : ) -> Result<Vec<pageserver_api::models::TimelineInfo>> {
122 0 : let uri = format!(
123 0 : "{}/v1/tenant/{tenant_shard_id}/timeline",
124 0 : self.mgmt_api_endpoint
125 0 : );
126 0 : self.get(&uri)
127 0 : .await?
128 0 : .json()
129 0 : .await
130 0 : .map_err(Error::ReceiveBody)
131 0 : }
132 :
133 0 : pub async fn timeline_info(
134 0 : &self,
135 0 : tenant_shard_id: TenantShardId,
136 0 : timeline_id: TimelineId,
137 0 : force_await_logical_size: ForceAwaitLogicalSize,
138 0 : ) -> Result<pageserver_api::models::TimelineInfo> {
139 0 : let uri = format!(
140 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
141 0 : self.mgmt_api_endpoint
142 0 : );
143 :
144 0 : let uri = match force_await_logical_size {
145 0 : ForceAwaitLogicalSize::Yes => format!("{}?force-await-logical-size={}", uri, true),
146 0 : ForceAwaitLogicalSize::No => uri,
147 : };
148 :
149 0 : self.get(&uri)
150 0 : .await?
151 0 : .json()
152 0 : .await
153 0 : .map_err(Error::ReceiveBody)
154 0 : }
155 :
156 0 : pub async fn keyspace(
157 0 : &self,
158 0 : tenant_shard_id: TenantShardId,
159 0 : timeline_id: TimelineId,
160 0 : ) -> Result<pageserver_api::models::partitioning::Partitioning> {
161 0 : let uri = format!(
162 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/keyspace",
163 0 : self.mgmt_api_endpoint
164 0 : );
165 0 : self.get(&uri)
166 0 : .await?
167 0 : .json()
168 0 : .await
169 0 : .map_err(Error::ReceiveBody)
170 0 : }
171 :
172 0 : async fn get<U: IntoUrl>(&self, uri: U) -> Result<reqwest::Response> {
173 0 : self.request(Method::GET, uri, ()).await
174 0 : }
175 :
176 0 : async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
177 0 : &self,
178 0 : method: Method,
179 0 : uri: U,
180 0 : body: B,
181 0 : ) -> Result<reqwest::Response> {
182 0 : let req = self.client.request(method, uri);
183 0 : let req = if let Some(value) = &self.authorization_header {
184 0 : req.header(reqwest::header::AUTHORIZATION, value)
185 : } else {
186 0 : req
187 : };
188 0 : req.json(&body).send().await.map_err(Error::ReceiveBody)
189 0 : }
190 :
191 0 : async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
192 0 : &self,
193 0 : method: Method,
194 0 : uri: U,
195 0 : body: B,
196 0 : ) -> Result<reqwest::Response> {
197 0 : let res = self.request_noerror(method, uri, body).await?;
198 0 : let response = res.error_from_body().await?;
199 0 : Ok(response)
200 0 : }
201 :
202 0 : pub async fn status(&self) -> Result<()> {
203 0 : let uri = format!("{}/v1/status", self.mgmt_api_endpoint);
204 0 : self.get(&uri).await?;
205 0 : Ok(())
206 0 : }
207 :
208 0 : pub async fn tenant_create(&self, req: &TenantCreateRequest) -> Result<TenantId> {
209 0 : let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
210 0 : self.request(Method::POST, &uri, req)
211 0 : .await?
212 0 : .json()
213 0 : .await
214 0 : .map_err(Error::ReceiveBody)
215 0 : }
216 :
217 : /// The tenant deletion API can return 202 if deletion is incomplete, or
218 : /// 404 if it is complete. Callers are responsible for checking the status
219 : /// code and retrying. Error codes other than 404 will return Err().
220 0 : pub async fn tenant_delete(&self, tenant_shard_id: TenantShardId) -> Result<StatusCode> {
221 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
222 0 :
223 0 : match self.request(Method::DELETE, &uri, ()).await {
224 0 : Err(Error::ApiError(status_code, msg)) => {
225 0 : if status_code == StatusCode::NOT_FOUND {
226 0 : Ok(StatusCode::NOT_FOUND)
227 : } else {
228 0 : Err(Error::ApiError(status_code, msg))
229 : }
230 : }
231 0 : Err(e) => Err(e),
232 0 : Ok(response) => Ok(response.status()),
233 : }
234 0 : }
235 :
236 0 : pub async fn tenant_time_travel_remote_storage(
237 0 : &self,
238 0 : tenant_shard_id: TenantShardId,
239 0 : timestamp: &str,
240 0 : done_if_after: &str,
241 0 : ) -> Result<()> {
242 0 : let uri = format!(
243 0 : "{}/v1/tenant/{tenant_shard_id}/time_travel_remote_storage?travel_to={timestamp}&done_if_after={done_if_after}",
244 0 : self.mgmt_api_endpoint
245 0 : );
246 0 : self.request(Method::PUT, &uri, ()).await?;
247 0 : Ok(())
248 0 : }
249 :
250 0 : pub async fn tenant_scan_remote_storage(
251 0 : &self,
252 0 : tenant_id: TenantId,
253 0 : ) -> Result<TenantScanRemoteStorageResponse> {
254 0 : let uri = format!(
255 0 : "{}/v1/tenant/{tenant_id}/scan_remote_storage",
256 0 : self.mgmt_api_endpoint
257 0 : );
258 0 : let response = self.request(Method::GET, &uri, ()).await?;
259 0 : let body = response.json().await.map_err(Error::ReceiveBody)?;
260 0 : Ok(body)
261 0 : }
262 :
263 0 : pub async fn tenant_config(&self, req: &TenantConfigRequest) -> Result<()> {
264 0 : let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
265 0 : self.request(Method::PUT, &uri, req).await?;
266 0 : Ok(())
267 0 : }
268 :
269 0 : pub async fn tenant_secondary_download(
270 0 : &self,
271 0 : tenant_id: TenantShardId,
272 0 : wait: Option<std::time::Duration>,
273 0 : ) -> Result<(StatusCode, SecondaryProgress)> {
274 0 : let mut path = reqwest::Url::parse(&format!(
275 0 : "{}/v1/tenant/{}/secondary/download",
276 0 : self.mgmt_api_endpoint, tenant_id
277 0 : ))
278 0 : .expect("Cannot build URL");
279 :
280 0 : if let Some(wait) = wait {
281 0 : path.query_pairs_mut()
282 0 : .append_pair("wait_ms", &format!("{}", wait.as_millis()));
283 0 : }
284 :
285 0 : let response = self.request(Method::POST, path, ()).await?;
286 0 : let status = response.status();
287 0 : let progress: SecondaryProgress = response.json().await.map_err(Error::ReceiveBody)?;
288 0 : Ok((status, progress))
289 0 : }
290 :
291 0 : pub async fn tenant_secondary_status(
292 0 : &self,
293 0 : tenant_shard_id: TenantShardId,
294 0 : ) -> Result<SecondaryProgress> {
295 0 : let path = reqwest::Url::parse(&format!(
296 0 : "{}/v1/tenant/{}/secondary/status",
297 0 : self.mgmt_api_endpoint, tenant_shard_id
298 0 : ))
299 0 : .expect("Cannot build URL");
300 0 :
301 0 : self.request(Method::GET, path, ())
302 0 : .await?
303 0 : .json()
304 0 : .await
305 0 : .map_err(Error::ReceiveBody)
306 0 : }
307 :
308 0 : pub async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
309 0 : let path = reqwest::Url::parse(&format!(
310 0 : "{}/v1/tenant/{}/heatmap_upload",
311 0 : self.mgmt_api_endpoint, tenant_id
312 0 : ))
313 0 : .expect("Cannot build URL");
314 0 :
315 0 : self.request(Method::POST, path, ()).await?;
316 0 : Ok(())
317 0 : }
318 :
319 0 : pub async fn location_config(
320 0 : &self,
321 0 : tenant_shard_id: TenantShardId,
322 0 : config: LocationConfig,
323 0 : flush_ms: Option<std::time::Duration>,
324 0 : lazy: bool,
325 0 : ) -> Result<()> {
326 0 : let req_body = TenantLocationConfigRequest { config };
327 0 :
328 0 : let mut path = reqwest::Url::parse(&format!(
329 0 : "{}/v1/tenant/{}/location_config",
330 0 : self.mgmt_api_endpoint, tenant_shard_id
331 0 : ))
332 0 : // Should always work: mgmt_api_endpoint is configuration, not user input.
333 0 : .expect("Cannot build URL");
334 0 :
335 0 : if lazy {
336 0 : path.query_pairs_mut().append_pair("lazy", "true");
337 0 : }
338 :
339 0 : if let Some(flush_ms) = flush_ms {
340 0 : path.query_pairs_mut()
341 0 : .append_pair("flush_ms", &format!("{}", flush_ms.as_millis()));
342 0 : }
343 :
344 0 : self.request(Method::PUT, path, &req_body).await?;
345 0 : Ok(())
346 0 : }
347 :
348 0 : pub async fn list_location_config(&self) -> Result<LocationConfigListResponse> {
349 0 : let path = format!("{}/v1/location_config", self.mgmt_api_endpoint);
350 0 : self.request(Method::GET, &path, ())
351 0 : .await?
352 0 : .json()
353 0 : .await
354 0 : .map_err(Error::ReceiveBody)
355 0 : }
356 :
357 0 : pub async fn get_location_config(
358 0 : &self,
359 0 : tenant_shard_id: TenantShardId,
360 0 : ) -> Result<Option<LocationConfig>> {
361 0 : let path = format!(
362 0 : "{}/v1/location_config/{tenant_shard_id}",
363 0 : self.mgmt_api_endpoint
364 0 : );
365 0 : self.request(Method::GET, &path, ())
366 0 : .await?
367 0 : .json()
368 0 : .await
369 0 : .map_err(Error::ReceiveBody)
370 0 : }
371 :
372 0 : pub async fn timeline_create(
373 0 : &self,
374 0 : tenant_shard_id: TenantShardId,
375 0 : req: &TimelineCreateRequest,
376 0 : ) -> Result<TimelineInfo> {
377 0 : let uri = format!(
378 0 : "{}/v1/tenant/{}/timeline",
379 0 : self.mgmt_api_endpoint, tenant_shard_id
380 0 : );
381 0 : self.request(Method::POST, &uri, req)
382 0 : .await?
383 0 : .json()
384 0 : .await
385 0 : .map_err(Error::ReceiveBody)
386 0 : }
387 :
388 : /// The timeline deletion API can return 201 if deletion is incomplete, or
389 : /// 403 if it is complete. Callers are responsible for checking the status
390 : /// code and retrying. Error codes other than 403 will return Err().
391 0 : pub async fn timeline_delete(
392 0 : &self,
393 0 : tenant_shard_id: TenantShardId,
394 0 : timeline_id: TimelineId,
395 0 : ) -> Result<StatusCode> {
396 0 : let uri = format!(
397 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
398 0 : self.mgmt_api_endpoint
399 0 : );
400 0 :
401 0 : match self.request(Method::DELETE, &uri, ()).await {
402 0 : Err(Error::ApiError(status_code, msg)) => {
403 0 : if status_code == StatusCode::NOT_FOUND {
404 0 : Ok(StatusCode::NOT_FOUND)
405 : } else {
406 0 : Err(Error::ApiError(status_code, msg))
407 : }
408 : }
409 0 : Err(e) => Err(e),
410 0 : Ok(response) => Ok(response.status()),
411 : }
412 0 : }
413 :
414 0 : pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
415 0 : let uri = format!(
416 0 : "{}/v1/tenant/{}/reset",
417 0 : self.mgmt_api_endpoint, tenant_shard_id
418 0 : );
419 0 : self.request(Method::POST, &uri, ())
420 0 : .await?
421 0 : .json()
422 0 : .await
423 0 : .map_err(Error::ReceiveBody)
424 0 : }
425 :
426 0 : pub async fn tenant_shard_split(
427 0 : &self,
428 0 : tenant_shard_id: TenantShardId,
429 0 : req: TenantShardSplitRequest,
430 0 : ) -> Result<TenantShardSplitResponse> {
431 0 : let uri = format!(
432 0 : "{}/v1/tenant/{}/shard_split",
433 0 : self.mgmt_api_endpoint, tenant_shard_id
434 0 : );
435 0 : self.request(Method::PUT, &uri, req)
436 0 : .await?
437 0 : .json()
438 0 : .await
439 0 : .map_err(Error::ReceiveBody)
440 0 : }
441 :
442 0 : pub async fn timeline_list(
443 0 : &self,
444 0 : tenant_shard_id: &TenantShardId,
445 0 : ) -> Result<Vec<TimelineInfo>> {
446 0 : let uri = format!(
447 0 : "{}/v1/tenant/{}/timeline",
448 0 : self.mgmt_api_endpoint, tenant_shard_id
449 0 : );
450 0 : self.get(&uri)
451 0 : .await?
452 0 : .json()
453 0 : .await
454 0 : .map_err(Error::ReceiveBody)
455 0 : }
456 :
457 0 : pub async fn tenant_synthetic_size(
458 0 : &self,
459 0 : tenant_shard_id: TenantShardId,
460 0 : ) -> Result<TenantHistorySize> {
461 0 : let uri = format!(
462 0 : "{}/v1/tenant/{}/synthetic_size",
463 0 : self.mgmt_api_endpoint, tenant_shard_id
464 0 : );
465 0 : self.get(&uri)
466 0 : .await?
467 0 : .json()
468 0 : .await
469 0 : .map_err(Error::ReceiveBody)
470 0 : }
471 :
472 0 : pub async fn put_io_engine(
473 0 : &self,
474 0 : engine: &pageserver_api::models::virtual_file::IoEngineKind,
475 0 : ) -> Result<()> {
476 0 : let uri = format!("{}/v1/io_engine", self.mgmt_api_endpoint);
477 0 : self.request(Method::PUT, uri, engine)
478 0 : .await?
479 0 : .json()
480 0 : .await
481 0 : .map_err(Error::ReceiveBody)
482 0 : }
483 :
484 0 : pub async fn get_utilization(&self) -> Result<PageserverUtilization> {
485 0 : let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint);
486 0 : self.get(uri)
487 0 : .await?
488 0 : .json()
489 0 : .await
490 0 : .map_err(Error::ReceiveBody)
491 0 : }
492 :
493 0 : pub async fn top_tenant_shards(
494 0 : &self,
495 0 : request: TopTenantShardsRequest,
496 0 : ) -> Result<TopTenantShardsResponse> {
497 0 : let uri = format!("{}/v1/top_tenants", self.mgmt_api_endpoint);
498 0 : self.request(Method::POST, uri, request)
499 0 : .await?
500 0 : .json()
501 0 : .await
502 0 : .map_err(Error::ReceiveBody)
503 0 : }
504 :
505 0 : pub async fn layer_map_info(
506 0 : &self,
507 0 : tenant_shard_id: TenantShardId,
508 0 : timeline_id: TimelineId,
509 0 : ) -> Result<LayerMapInfo> {
510 0 : let uri = format!(
511 0 : "{}/v1/tenant/{}/timeline/{}/layer",
512 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id,
513 0 : );
514 0 : self.get(&uri)
515 0 : .await?
516 0 : .json()
517 0 : .await
518 0 : .map_err(Error::ReceiveBody)
519 0 : }
520 :
521 0 : pub async fn layer_evict(
522 0 : &self,
523 0 : tenant_shard_id: TenantShardId,
524 0 : timeline_id: TimelineId,
525 0 : layer_file_name: &str,
526 0 : ) -> Result<bool> {
527 0 : let uri = format!(
528 0 : "{}/v1/tenant/{}/timeline/{}/layer/{}",
529 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
530 0 : );
531 0 : let resp = self.request_noerror(Method::DELETE, &uri, ()).await?;
532 0 : match resp.status() {
533 0 : StatusCode::OK => Ok(true),
534 0 : StatusCode::NOT_MODIFIED => Ok(false),
535 : // TODO: dedupe this pattern / introduce separate error variant?
536 0 : status => Err(match resp.json::<HttpErrorBody>().await {
537 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
538 : Err(_) => {
539 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
540 : }
541 : }),
542 : }
543 0 : }
544 :
545 0 : pub async fn layer_ondemand_download(
546 0 : &self,
547 0 : tenant_shard_id: TenantShardId,
548 0 : timeline_id: TimelineId,
549 0 : layer_file_name: &str,
550 0 : ) -> Result<bool> {
551 0 : let uri = format!(
552 0 : "{}/v1/tenant/{}/timeline/{}/layer/{}",
553 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
554 0 : );
555 0 : let resp = self.request_noerror(Method::GET, &uri, ()).await?;
556 0 : match resp.status() {
557 0 : StatusCode::OK => Ok(true),
558 0 : StatusCode::NOT_MODIFIED => Ok(false),
559 : // TODO: dedupe this pattern / introduce separate error variant?
560 0 : status => Err(match resp.json::<HttpErrorBody>().await {
561 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
562 : Err(_) => {
563 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
564 : }
565 : }),
566 : }
567 0 : }
568 :
569 0 : pub async fn ingest_aux_files(
570 0 : &self,
571 0 : tenant_shard_id: TenantShardId,
572 0 : timeline_id: TimelineId,
573 0 : aux_files: HashMap<String, String>,
574 0 : ) -> Result<bool> {
575 0 : let uri = format!(
576 0 : "{}/v1/tenant/{}/timeline/{}/ingest_aux_files",
577 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
578 0 : );
579 0 : let resp = self
580 0 : .request_noerror(Method::POST, &uri, IngestAuxFilesRequest { aux_files })
581 0 : .await?;
582 0 : match resp.status() {
583 0 : StatusCode::OK => Ok(true),
584 0 : status => Err(match resp.json::<HttpErrorBody>().await {
585 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
586 : Err(_) => {
587 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
588 : }
589 : }),
590 : }
591 0 : }
592 :
593 0 : pub async fn list_aux_files(
594 0 : &self,
595 0 : tenant_shard_id: TenantShardId,
596 0 : timeline_id: TimelineId,
597 0 : lsn: Lsn,
598 0 : ) -> Result<HashMap<String, Bytes>> {
599 0 : let uri = format!(
600 0 : "{}/v1/tenant/{}/timeline/{}/list_aux_files",
601 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
602 0 : );
603 0 : let resp = self
604 0 : .request_noerror(Method::POST, &uri, ListAuxFilesRequest { lsn })
605 0 : .await?;
606 0 : match resp.status() {
607 : StatusCode::OK => {
608 0 : let resp: HashMap<String, Bytes> = resp.json().await.map_err(|e| {
609 0 : Error::ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{e}"))
610 0 : })?;
611 0 : Ok(resp)
612 : }
613 0 : status => Err(match resp.json::<HttpErrorBody>().await {
614 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
615 : Err(_) => {
616 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
617 : }
618 : }),
619 : }
620 0 : }
621 : }
|