Line data Source code
1 : use std::collections::HashMap;
2 : use std::error::Error as _;
3 :
4 : use bytes::Bytes;
5 : use detach_ancestor::AncestorDetached;
6 : use http_utils::error::HttpErrorBody;
7 : use pageserver_api::models::*;
8 : use pageserver_api::shard::TenantShardId;
9 : pub use reqwest::Body as ReqwestBody;
10 : use reqwest::{IntoUrl, Method, StatusCode};
11 : use utils::id::{TenantId, TimelineId};
12 : use utils::lsn::Lsn;
13 :
14 : use crate::BlockUnblock;
15 :
16 : pub mod util;
17 :
18 : #[derive(Debug, Clone)]
19 : pub struct Client {
20 : mgmt_api_endpoint: String,
21 : authorization_header: Option<String>,
22 : client: reqwest::Client,
23 : }
24 :
25 : #[derive(thiserror::Error, Debug)]
26 : pub enum Error {
27 0 : #[error("send request: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
28 : SendRequest(reqwest::Error),
29 :
30 0 : #[error("receive body: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
31 : ReceiveBody(reqwest::Error),
32 :
33 : #[error("receive error body: {0}")]
34 : ReceiveErrorBody(String),
35 :
36 : #[error("pageserver API: {1}")]
37 : ApiError(StatusCode, String),
38 :
39 : #[error("Cancelled")]
40 : Cancelled,
41 : }
42 :
43 : pub type Result<T> = std::result::Result<T, Error>;
44 :
45 : pub trait ResponseErrorMessageExt: Sized {
46 : fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
47 : }
48 :
49 : impl ResponseErrorMessageExt for reqwest::Response {
50 0 : async fn error_from_body(self) -> Result<Self> {
51 0 : let status = self.status();
52 0 : if !(status.is_client_error() || status.is_server_error()) {
53 0 : return Ok(self);
54 0 : }
55 0 :
56 0 : let url = self.url().to_owned();
57 0 : Err(match self.json::<HttpErrorBody>().await {
58 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
59 : Err(_) => {
60 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), url))
61 : }
62 : })
63 0 : }
64 : }
65 :
66 : pub enum ForceAwaitLogicalSize {
67 : Yes,
68 : No,
69 : }
70 :
71 : impl Client {
72 0 : pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
73 0 : Self::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt)
74 0 : }
75 :
76 0 : pub fn from_client(
77 0 : client: reqwest::Client,
78 0 : mgmt_api_endpoint: String,
79 0 : jwt: Option<&str>,
80 0 : ) -> Self {
81 0 : Self {
82 0 : mgmt_api_endpoint,
83 0 : authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
84 0 : client,
85 0 : }
86 0 : }
87 :
88 0 : pub async fn list_tenants(&self) -> Result<Vec<pageserver_api::models::TenantInfo>> {
89 0 : let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
90 0 : let resp = self.get(&uri).await?;
91 0 : resp.json().await.map_err(Error::ReceiveBody)
92 0 : }
93 :
94 : /// Get an arbitrary path and returning a streaming Response. This function is suitable
95 : /// for pass-through/proxy use cases where we don't care what the response content looks
96 : /// like.
97 : ///
98 : /// Use/add one of the properly typed methods below if you know aren't proxying, and
99 : /// know what kind of response you expect.
100 0 : pub async fn get_raw(&self, path: String) -> Result<reqwest::Response> {
101 0 : debug_assert!(path.starts_with('/'));
102 0 : let uri = format!("{}{}", self.mgmt_api_endpoint, path);
103 0 :
104 0 : let req = self.client.request(Method::GET, uri);
105 0 : let req = if let Some(value) = &self.authorization_header {
106 0 : req.header(reqwest::header::AUTHORIZATION, value)
107 : } else {
108 0 : req
109 : };
110 0 : req.send().await.map_err(Error::ReceiveBody)
111 0 : }
112 :
113 0 : pub async fn tenant_details(
114 0 : &self,
115 0 : tenant_shard_id: TenantShardId,
116 0 : ) -> Result<pageserver_api::models::TenantDetails> {
117 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
118 0 : self.get(uri)
119 0 : .await?
120 0 : .json()
121 0 : .await
122 0 : .map_err(Error::ReceiveBody)
123 0 : }
124 :
125 0 : pub async fn list_timelines(
126 0 : &self,
127 0 : tenant_shard_id: TenantShardId,
128 0 : ) -> Result<Vec<pageserver_api::models::TimelineInfo>> {
129 0 : let uri = format!(
130 0 : "{}/v1/tenant/{tenant_shard_id}/timeline",
131 0 : self.mgmt_api_endpoint
132 0 : );
133 0 : self.get(&uri)
134 0 : .await?
135 0 : .json()
136 0 : .await
137 0 : .map_err(Error::ReceiveBody)
138 0 : }
139 :
140 0 : pub async fn timeline_info(
141 0 : &self,
142 0 : tenant_shard_id: TenantShardId,
143 0 : timeline_id: TimelineId,
144 0 : force_await_logical_size: ForceAwaitLogicalSize,
145 0 : ) -> Result<pageserver_api::models::TimelineInfo> {
146 0 : let uri = format!(
147 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
148 0 : self.mgmt_api_endpoint
149 0 : );
150 :
151 0 : let uri = match force_await_logical_size {
152 0 : ForceAwaitLogicalSize::Yes => format!("{}?force-await-logical-size={}", uri, true),
153 0 : ForceAwaitLogicalSize::No => uri,
154 : };
155 :
156 0 : self.get(&uri)
157 0 : .await?
158 0 : .json()
159 0 : .await
160 0 : .map_err(Error::ReceiveBody)
161 0 : }
162 :
163 0 : pub async fn keyspace(
164 0 : &self,
165 0 : tenant_shard_id: TenantShardId,
166 0 : timeline_id: TimelineId,
167 0 : ) -> Result<pageserver_api::models::partitioning::Partitioning> {
168 0 : let uri = format!(
169 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/keyspace",
170 0 : self.mgmt_api_endpoint
171 0 : );
172 0 : self.get(&uri)
173 0 : .await?
174 0 : .json()
175 0 : .await
176 0 : .map_err(Error::ReceiveBody)
177 0 : }
178 :
179 0 : async fn get<U: IntoUrl>(&self, uri: U) -> Result<reqwest::Response> {
180 0 : self.request(Method::GET, uri, ()).await
181 0 : }
182 :
183 0 : fn start_request<U: reqwest::IntoUrl>(
184 0 : &self,
185 0 : method: Method,
186 0 : uri: U,
187 0 : ) -> reqwest::RequestBuilder {
188 0 : let req = self.client.request(method, uri);
189 0 : if let Some(value) = &self.authorization_header {
190 0 : req.header(reqwest::header::AUTHORIZATION, value)
191 : } else {
192 0 : req
193 : }
194 0 : }
195 :
196 0 : async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
197 0 : &self,
198 0 : method: Method,
199 0 : uri: U,
200 0 : body: B,
201 0 : ) -> Result<reqwest::Response> {
202 0 : self.start_request(method, uri)
203 0 : .json(&body)
204 0 : .send()
205 0 : .await
206 0 : .map_err(Error::ReceiveBody)
207 0 : }
208 :
209 0 : async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
210 0 : &self,
211 0 : method: Method,
212 0 : uri: U,
213 0 : body: B,
214 0 : ) -> Result<reqwest::Response> {
215 0 : let res = self.request_noerror(method, uri, body).await?;
216 0 : let response = res.error_from_body().await?;
217 0 : Ok(response)
218 0 : }
219 :
220 0 : pub async fn status(&self) -> Result<()> {
221 0 : let uri = format!("{}/v1/status", self.mgmt_api_endpoint);
222 0 : self.get(&uri).await?;
223 0 : Ok(())
224 0 : }
225 :
226 : /// The tenant deletion API can return 202 if deletion is incomplete, or
227 : /// 404 if it is complete. Callers are responsible for checking the status
228 : /// code and retrying. Error codes other than 404 will return Err().
229 0 : pub async fn tenant_delete(&self, tenant_shard_id: TenantShardId) -> Result<StatusCode> {
230 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
231 0 :
232 0 : match self.request(Method::DELETE, &uri, ()).await {
233 0 : Err(Error::ApiError(status_code, msg)) => {
234 0 : if status_code == StatusCode::NOT_FOUND {
235 0 : Ok(StatusCode::NOT_FOUND)
236 : } else {
237 0 : Err(Error::ApiError(status_code, msg))
238 : }
239 : }
240 0 : Err(e) => Err(e),
241 0 : Ok(response) => Ok(response.status()),
242 : }
243 0 : }
244 :
245 0 : pub async fn tenant_time_travel_remote_storage(
246 0 : &self,
247 0 : tenant_shard_id: TenantShardId,
248 0 : timestamp: &str,
249 0 : done_if_after: &str,
250 0 : ) -> Result<()> {
251 0 : let uri = format!(
252 0 : "{}/v1/tenant/{tenant_shard_id}/time_travel_remote_storage?travel_to={timestamp}&done_if_after={done_if_after}",
253 0 : self.mgmt_api_endpoint
254 0 : );
255 0 : self.request(Method::PUT, &uri, ()).await?;
256 0 : Ok(())
257 0 : }
258 :
259 0 : pub async fn tenant_scan_remote_storage(
260 0 : &self,
261 0 : tenant_id: TenantId,
262 0 : ) -> Result<TenantScanRemoteStorageResponse> {
263 0 : let uri = format!(
264 0 : "{}/v1/tenant/{tenant_id}/scan_remote_storage",
265 0 : self.mgmt_api_endpoint
266 0 : );
267 0 : let response = self.request(Method::GET, &uri, ()).await?;
268 0 : let body = response.json().await.map_err(Error::ReceiveBody)?;
269 0 : Ok(body)
270 0 : }
271 :
272 0 : pub async fn set_tenant_config(&self, req: &TenantConfigRequest) -> Result<()> {
273 0 : let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
274 0 : self.request(Method::PUT, &uri, req).await?;
275 0 : Ok(())
276 0 : }
277 :
278 0 : pub async fn patch_tenant_config(&self, req: &TenantConfigPatchRequest) -> Result<()> {
279 0 : let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
280 0 : self.request(Method::PATCH, &uri, req).await?;
281 0 : Ok(())
282 0 : }
283 :
284 0 : pub async fn tenant_secondary_download(
285 0 : &self,
286 0 : tenant_id: TenantShardId,
287 0 : wait: Option<std::time::Duration>,
288 0 : ) -> Result<(StatusCode, SecondaryProgress)> {
289 0 : let mut path = reqwest::Url::parse(&format!(
290 0 : "{}/v1/tenant/{}/secondary/download",
291 0 : self.mgmt_api_endpoint, tenant_id
292 0 : ))
293 0 : .expect("Cannot build URL");
294 :
295 0 : if let Some(wait) = wait {
296 0 : path.query_pairs_mut()
297 0 : .append_pair("wait_ms", &format!("{}", wait.as_millis()));
298 0 : }
299 :
300 0 : let response = self.request(Method::POST, path, ()).await?;
301 0 : let status = response.status();
302 0 : let progress: SecondaryProgress = response.json().await.map_err(Error::ReceiveBody)?;
303 0 : Ok((status, progress))
304 0 : }
305 :
306 0 : pub async fn tenant_secondary_status(
307 0 : &self,
308 0 : tenant_shard_id: TenantShardId,
309 0 : ) -> Result<SecondaryProgress> {
310 0 : let path = reqwest::Url::parse(&format!(
311 0 : "{}/v1/tenant/{}/secondary/status",
312 0 : self.mgmt_api_endpoint, tenant_shard_id
313 0 : ))
314 0 : .expect("Cannot build URL");
315 0 :
316 0 : self.request(Method::GET, path, ())
317 0 : .await?
318 0 : .json()
319 0 : .await
320 0 : .map_err(Error::ReceiveBody)
321 0 : }
322 :
323 0 : pub async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
324 0 : let path = reqwest::Url::parse(&format!(
325 0 : "{}/v1/tenant/{}/heatmap_upload",
326 0 : self.mgmt_api_endpoint, tenant_id
327 0 : ))
328 0 : .expect("Cannot build URL");
329 0 :
330 0 : self.request(Method::POST, path, ()).await?;
331 0 : Ok(())
332 0 : }
333 :
334 0 : pub async fn location_config(
335 0 : &self,
336 0 : tenant_shard_id: TenantShardId,
337 0 : config: LocationConfig,
338 0 : flush_ms: Option<std::time::Duration>,
339 0 : lazy: bool,
340 0 : ) -> Result<()> {
341 0 : let req_body = TenantLocationConfigRequest { config };
342 0 :
343 0 : let mut path = reqwest::Url::parse(&format!(
344 0 : "{}/v1/tenant/{}/location_config",
345 0 : self.mgmt_api_endpoint, tenant_shard_id
346 0 : ))
347 0 : // Should always work: mgmt_api_endpoint is configuration, not user input.
348 0 : .expect("Cannot build URL");
349 0 :
350 0 : if lazy {
351 0 : path.query_pairs_mut().append_pair("lazy", "true");
352 0 : }
353 :
354 0 : if let Some(flush_ms) = flush_ms {
355 0 : path.query_pairs_mut()
356 0 : .append_pair("flush_ms", &format!("{}", flush_ms.as_millis()));
357 0 : }
358 :
359 0 : self.request(Method::PUT, path, &req_body).await?;
360 0 : Ok(())
361 0 : }
362 :
363 0 : pub async fn list_location_config(&self) -> Result<LocationConfigListResponse> {
364 0 : let path = format!("{}/v1/location_config", self.mgmt_api_endpoint);
365 0 : self.request(Method::GET, &path, ())
366 0 : .await?
367 0 : .json()
368 0 : .await
369 0 : .map_err(Error::ReceiveBody)
370 0 : }
371 :
372 0 : pub async fn get_location_config(
373 0 : &self,
374 0 : tenant_shard_id: TenantShardId,
375 0 : ) -> Result<Option<LocationConfig>> {
376 0 : let path = format!(
377 0 : "{}/v1/location_config/{tenant_shard_id}",
378 0 : self.mgmt_api_endpoint
379 0 : );
380 0 : self.request(Method::GET, &path, ())
381 0 : .await?
382 0 : .json()
383 0 : .await
384 0 : .map_err(Error::ReceiveBody)
385 0 : }
386 :
387 0 : pub async fn timeline_create(
388 0 : &self,
389 0 : tenant_shard_id: TenantShardId,
390 0 : req: &TimelineCreateRequest,
391 0 : ) -> Result<TimelineInfo> {
392 0 : let uri = format!(
393 0 : "{}/v1/tenant/{}/timeline",
394 0 : self.mgmt_api_endpoint, tenant_shard_id
395 0 : );
396 0 : self.request(Method::POST, &uri, req)
397 0 : .await?
398 0 : .json()
399 0 : .await
400 0 : .map_err(Error::ReceiveBody)
401 0 : }
402 :
403 : /// The timeline deletion API can return 201 if deletion is incomplete, or
404 : /// 403 if it is complete. Callers are responsible for checking the status
405 : /// code and retrying. Error codes other than 403 will return Err().
406 0 : pub async fn timeline_delete(
407 0 : &self,
408 0 : tenant_shard_id: TenantShardId,
409 0 : timeline_id: TimelineId,
410 0 : ) -> Result<StatusCode> {
411 0 : let uri = format!(
412 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
413 0 : self.mgmt_api_endpoint
414 0 : );
415 0 :
416 0 : match self.request(Method::DELETE, &uri, ()).await {
417 0 : Err(Error::ApiError(status_code, msg)) => {
418 0 : if status_code == StatusCode::NOT_FOUND {
419 0 : Ok(StatusCode::NOT_FOUND)
420 : } else {
421 0 : Err(Error::ApiError(status_code, msg))
422 : }
423 : }
424 0 : Err(e) => Err(e),
425 0 : Ok(response) => Ok(response.status()),
426 : }
427 0 : }
428 :
429 0 : pub async fn timeline_archival_config(
430 0 : &self,
431 0 : tenant_shard_id: TenantShardId,
432 0 : timeline_id: TimelineId,
433 0 : req: &TimelineArchivalConfigRequest,
434 0 : ) -> Result<()> {
435 0 : let uri = format!(
436 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/archival_config",
437 0 : self.mgmt_api_endpoint
438 0 : );
439 0 :
440 0 : self.request(Method::PUT, &uri, req)
441 0 : .await?
442 0 : .json()
443 0 : .await
444 0 : .map_err(Error::ReceiveBody)
445 0 : }
446 :
447 0 : pub async fn timeline_detach_ancestor(
448 0 : &self,
449 0 : tenant_shard_id: TenantShardId,
450 0 : timeline_id: TimelineId,
451 0 : ) -> Result<AncestorDetached> {
452 0 : let uri = format!(
453 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/detach_ancestor",
454 0 : self.mgmt_api_endpoint
455 0 : );
456 0 :
457 0 : self.request(Method::PUT, &uri, ())
458 0 : .await?
459 0 : .json()
460 0 : .await
461 0 : .map_err(Error::ReceiveBody)
462 0 : }
463 :
464 0 : pub async fn timeline_block_unblock_gc(
465 0 : &self,
466 0 : tenant_shard_id: TenantShardId,
467 0 : timeline_id: TimelineId,
468 0 : dir: BlockUnblock,
469 0 : ) -> Result<()> {
470 0 : let uri = format!(
471 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/{dir}_gc",
472 0 : self.mgmt_api_endpoint,
473 0 : );
474 0 :
475 0 : self.request(Method::POST, &uri, ()).await.map(|_| ())
476 0 : }
477 :
478 0 : pub async fn timeline_download_heatmap_layers(
479 0 : &self,
480 0 : tenant_shard_id: TenantShardId,
481 0 : timeline_id: TimelineId,
482 0 : concurrency: Option<usize>,
483 0 : recurse: bool,
484 0 : ) -> Result<()> {
485 0 : let mut path = reqwest::Url::parse(&format!(
486 0 : "{}/v1/tenant/{}/timeline/{}/download_heatmap_layers",
487 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
488 0 : ))
489 0 : .expect("Cannot build URL");
490 0 :
491 0 : path.query_pairs_mut()
492 0 : .append_pair("recurse", &format!("{}", recurse));
493 :
494 0 : if let Some(concurrency) = concurrency {
495 0 : path.query_pairs_mut()
496 0 : .append_pair("concurrency", &format!("{}", concurrency));
497 0 : }
498 :
499 0 : self.request(Method::POST, path, ()).await.map(|_| ())
500 0 : }
501 :
502 0 : pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
503 0 : let uri = format!(
504 0 : "{}/v1/tenant/{}/reset",
505 0 : self.mgmt_api_endpoint, tenant_shard_id
506 0 : );
507 0 : self.request(Method::POST, &uri, ())
508 0 : .await?
509 0 : .json()
510 0 : .await
511 0 : .map_err(Error::ReceiveBody)
512 0 : }
513 :
514 0 : pub async fn tenant_shard_split(
515 0 : &self,
516 0 : tenant_shard_id: TenantShardId,
517 0 : req: TenantShardSplitRequest,
518 0 : ) -> Result<TenantShardSplitResponse> {
519 0 : let uri = format!(
520 0 : "{}/v1/tenant/{}/shard_split",
521 0 : self.mgmt_api_endpoint, tenant_shard_id
522 0 : );
523 0 : self.request(Method::PUT, &uri, req)
524 0 : .await?
525 0 : .json()
526 0 : .await
527 0 : .map_err(Error::ReceiveBody)
528 0 : }
529 :
530 0 : pub async fn timeline_list(
531 0 : &self,
532 0 : tenant_shard_id: &TenantShardId,
533 0 : ) -> Result<Vec<TimelineInfo>> {
534 0 : let uri = format!(
535 0 : "{}/v1/tenant/{}/timeline",
536 0 : self.mgmt_api_endpoint, tenant_shard_id
537 0 : );
538 0 : self.get(&uri)
539 0 : .await?
540 0 : .json()
541 0 : .await
542 0 : .map_err(Error::ReceiveBody)
543 0 : }
544 :
545 0 : pub async fn tenant_synthetic_size(
546 0 : &self,
547 0 : tenant_shard_id: TenantShardId,
548 0 : ) -> Result<TenantHistorySize> {
549 0 : let uri = format!(
550 0 : "{}/v1/tenant/{}/synthetic_size",
551 0 : self.mgmt_api_endpoint, tenant_shard_id
552 0 : );
553 0 : self.get(&uri)
554 0 : .await?
555 0 : .json()
556 0 : .await
557 0 : .map_err(Error::ReceiveBody)
558 0 : }
559 :
560 0 : pub async fn put_io_engine(
561 0 : &self,
562 0 : engine: &pageserver_api::models::virtual_file::IoEngineKind,
563 0 : ) -> Result<()> {
564 0 : let uri = format!("{}/v1/io_engine", self.mgmt_api_endpoint);
565 0 : self.request(Method::PUT, uri, engine)
566 0 : .await?
567 0 : .json()
568 0 : .await
569 0 : .map_err(Error::ReceiveBody)
570 0 : }
571 :
572 : /// Configs io mode at runtime.
573 0 : pub async fn put_io_mode(
574 0 : &self,
575 0 : mode: &pageserver_api::models::virtual_file::IoMode,
576 0 : ) -> Result<()> {
577 0 : let uri = format!("{}/v1/io_mode", self.mgmt_api_endpoint);
578 0 : self.request(Method::PUT, uri, mode)
579 0 : .await?
580 0 : .json()
581 0 : .await
582 0 : .map_err(Error::ReceiveBody)
583 0 : }
584 :
585 0 : pub async fn get_utilization(&self) -> Result<PageserverUtilization> {
586 0 : let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint);
587 0 : self.get(uri)
588 0 : .await?
589 0 : .json()
590 0 : .await
591 0 : .map_err(Error::ReceiveBody)
592 0 : }
593 :
594 0 : pub async fn top_tenant_shards(
595 0 : &self,
596 0 : request: TopTenantShardsRequest,
597 0 : ) -> Result<TopTenantShardsResponse> {
598 0 : let uri = format!("{}/v1/top_tenants", self.mgmt_api_endpoint);
599 0 : self.request(Method::POST, uri, request)
600 0 : .await?
601 0 : .json()
602 0 : .await
603 0 : .map_err(Error::ReceiveBody)
604 0 : }
605 :
606 0 : pub async fn layer_map_info(
607 0 : &self,
608 0 : tenant_shard_id: TenantShardId,
609 0 : timeline_id: TimelineId,
610 0 : ) -> Result<LayerMapInfo> {
611 0 : let uri = format!(
612 0 : "{}/v1/tenant/{}/timeline/{}/layer",
613 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id,
614 0 : );
615 0 : self.get(&uri)
616 0 : .await?
617 0 : .json()
618 0 : .await
619 0 : .map_err(Error::ReceiveBody)
620 0 : }
621 :
622 0 : pub async fn layer_evict(
623 0 : &self,
624 0 : tenant_shard_id: TenantShardId,
625 0 : timeline_id: TimelineId,
626 0 : layer_file_name: &str,
627 0 : ) -> Result<bool> {
628 0 : let uri = format!(
629 0 : "{}/v1/tenant/{}/timeline/{}/layer/{}",
630 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
631 0 : );
632 0 : let resp = self.request_noerror(Method::DELETE, &uri, ()).await?;
633 0 : match resp.status() {
634 0 : StatusCode::OK => Ok(true),
635 0 : StatusCode::NOT_MODIFIED => Ok(false),
636 : // TODO: dedupe this pattern / introduce separate error variant?
637 0 : status => Err(match resp.json::<HttpErrorBody>().await {
638 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
639 : Err(_) => {
640 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
641 : }
642 : }),
643 : }
644 0 : }
645 :
646 0 : pub async fn layer_ondemand_download(
647 0 : &self,
648 0 : tenant_shard_id: TenantShardId,
649 0 : timeline_id: TimelineId,
650 0 : layer_file_name: &str,
651 0 : ) -> Result<bool> {
652 0 : let uri = format!(
653 0 : "{}/v1/tenant/{}/timeline/{}/layer/{}",
654 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
655 0 : );
656 0 : let resp = self.request_noerror(Method::GET, &uri, ()).await?;
657 0 : match resp.status() {
658 0 : StatusCode::OK => Ok(true),
659 0 : StatusCode::NOT_MODIFIED => Ok(false),
660 : // TODO: dedupe this pattern / introduce separate error variant?
661 0 : status => Err(match resp.json::<HttpErrorBody>().await {
662 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
663 : Err(_) => {
664 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
665 : }
666 : }),
667 : }
668 0 : }
669 :
670 0 : pub async fn ingest_aux_files(
671 0 : &self,
672 0 : tenant_shard_id: TenantShardId,
673 0 : timeline_id: TimelineId,
674 0 : aux_files: HashMap<String, String>,
675 0 : ) -> Result<bool> {
676 0 : let uri = format!(
677 0 : "{}/v1/tenant/{}/timeline/{}/ingest_aux_files",
678 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
679 0 : );
680 0 : let resp = self
681 0 : .request_noerror(Method::POST, &uri, IngestAuxFilesRequest { aux_files })
682 0 : .await?;
683 0 : match resp.status() {
684 0 : StatusCode::OK => Ok(true),
685 0 : status => Err(match resp.json::<HttpErrorBody>().await {
686 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
687 : Err(_) => {
688 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
689 : }
690 : }),
691 : }
692 0 : }
693 :
694 0 : pub async fn list_aux_files(
695 0 : &self,
696 0 : tenant_shard_id: TenantShardId,
697 0 : timeline_id: TimelineId,
698 0 : lsn: Lsn,
699 0 : ) -> Result<HashMap<String, Bytes>> {
700 0 : let uri = format!(
701 0 : "{}/v1/tenant/{}/timeline/{}/list_aux_files",
702 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
703 0 : );
704 0 : let resp = self
705 0 : .request_noerror(Method::POST, &uri, ListAuxFilesRequest { lsn })
706 0 : .await?;
707 0 : match resp.status() {
708 : StatusCode::OK => {
709 0 : let resp: HashMap<String, Bytes> = resp.json().await.map_err(|e| {
710 0 : Error::ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{e}"))
711 0 : })?;
712 0 : Ok(resp)
713 : }
714 0 : status => Err(match resp.json::<HttpErrorBody>().await {
715 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
716 : Err(_) => {
717 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
718 : }
719 : }),
720 : }
721 0 : }
722 :
723 0 : pub async fn import_basebackup(
724 0 : &self,
725 0 : tenant_id: TenantId,
726 0 : timeline_id: TimelineId,
727 0 : base_lsn: Lsn,
728 0 : end_lsn: Lsn,
729 0 : pg_version: u32,
730 0 : basebackup_tarball: ReqwestBody,
731 0 : ) -> Result<()> {
732 0 : let uri = format!(
733 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_basebackup?base_lsn={base_lsn}&end_lsn={end_lsn}&pg_version={pg_version}",
734 0 : self.mgmt_api_endpoint,
735 0 : );
736 0 : self.start_request(Method::PUT, uri)
737 0 : .body(basebackup_tarball)
738 0 : .send()
739 0 : .await
740 0 : .map_err(Error::SendRequest)?
741 0 : .error_from_body()
742 0 : .await?
743 0 : .json()
744 0 : .await
745 0 : .map_err(Error::ReceiveBody)
746 0 : }
747 :
748 0 : pub async fn import_wal(
749 0 : &self,
750 0 : tenant_id: TenantId,
751 0 : timeline_id: TimelineId,
752 0 : start_lsn: Lsn,
753 0 : end_lsn: Lsn,
754 0 : wal_tarball: ReqwestBody,
755 0 : ) -> Result<()> {
756 0 : let uri = format!(
757 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_wal?start_lsn={start_lsn}&end_lsn={end_lsn}",
758 0 : self.mgmt_api_endpoint,
759 0 : );
760 0 : self.start_request(Method::PUT, uri)
761 0 : .body(wal_tarball)
762 0 : .send()
763 0 : .await
764 0 : .map_err(Error::SendRequest)?
765 0 : .error_from_body()
766 0 : .await?
767 0 : .json()
768 0 : .await
769 0 : .map_err(Error::ReceiveBody)
770 0 : }
771 :
772 0 : pub async fn timeline_init_lsn_lease(
773 0 : &self,
774 0 : tenant_shard_id: TenantShardId,
775 0 : timeline_id: TimelineId,
776 0 : lsn: Lsn,
777 0 : ) -> Result<LsnLease> {
778 0 : let uri = format!(
779 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/lsn_lease",
780 0 : self.mgmt_api_endpoint,
781 0 : );
782 0 :
783 0 : self.request(Method::POST, &uri, LsnLeaseRequest { lsn })
784 0 : .await?
785 0 : .json()
786 0 : .await
787 0 : .map_err(Error::ReceiveBody)
788 0 : }
789 :
790 0 : pub async fn wait_lsn(
791 0 : &self,
792 0 : tenant_shard_id: TenantShardId,
793 0 : request: TenantWaitLsnRequest,
794 0 : ) -> Result<StatusCode> {
795 0 : let uri = format!(
796 0 : "{}/v1/tenant/{tenant_shard_id}/wait_lsn",
797 0 : self.mgmt_api_endpoint,
798 0 : );
799 0 :
800 0 : self.request_noerror(Method::POST, uri, request)
801 0 : .await
802 0 : .map(|resp| resp.status())
803 0 : }
804 : }
|