Line data Source code
1 : use std::{collections::HashMap, error::Error as _};
2 :
3 : use bytes::Bytes;
4 : use reqwest::{IntoUrl, Method, StatusCode};
5 :
6 : use detach_ancestor::AncestorDetached;
7 : use http_utils::error::HttpErrorBody;
8 : use pageserver_api::{models::*, shard::TenantShardId};
9 : use utils::{
10 : id::{TenantId, TimelineId},
11 : lsn::Lsn,
12 : };
13 :
14 : pub use reqwest::Body as ReqwestBody;
15 :
16 : use crate::BlockUnblock;
17 :
18 : pub mod util;
19 :
20 : #[derive(Debug, Clone)]
21 : pub struct Client {
22 : mgmt_api_endpoint: String,
23 : authorization_header: Option<String>,
24 : client: reqwest::Client,
25 : }
26 :
27 : #[derive(thiserror::Error, Debug)]
28 : pub enum Error {
29 0 : #[error("send request: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
30 : SendRequest(reqwest::Error),
31 :
32 0 : #[error("receive body: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
33 : ReceiveBody(reqwest::Error),
34 :
35 : #[error("receive error body: {0}")]
36 : ReceiveErrorBody(String),
37 :
38 : #[error("pageserver API: {1}")]
39 : ApiError(StatusCode, String),
40 :
41 : #[error("Cancelled")]
42 : Cancelled,
43 : }
44 :
45 : pub type Result<T> = std::result::Result<T, Error>;
46 :
47 : pub trait ResponseErrorMessageExt: Sized {
48 : fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
49 : }
50 :
51 : impl ResponseErrorMessageExt for reqwest::Response {
52 0 : async fn error_from_body(self) -> Result<Self> {
53 0 : let status = self.status();
54 0 : if !(status.is_client_error() || status.is_server_error()) {
55 0 : return Ok(self);
56 0 : }
57 0 :
58 0 : let url = self.url().to_owned();
59 0 : Err(match self.json::<HttpErrorBody>().await {
60 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
61 : Err(_) => {
62 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), url))
63 : }
64 : })
65 0 : }
66 : }
67 :
68 : pub enum ForceAwaitLogicalSize {
69 : Yes,
70 : No,
71 : }
72 :
73 : impl Client {
74 0 : pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
75 0 : Self::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt)
76 0 : }
77 :
78 0 : pub fn from_client(
79 0 : client: reqwest::Client,
80 0 : mgmt_api_endpoint: String,
81 0 : jwt: Option<&str>,
82 0 : ) -> Self {
83 0 : Self {
84 0 : mgmt_api_endpoint,
85 0 : authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
86 0 : client,
87 0 : }
88 0 : }
89 :
90 0 : pub async fn list_tenants(&self) -> Result<Vec<pageserver_api::models::TenantInfo>> {
91 0 : let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
92 0 : let resp = self.get(&uri).await?;
93 0 : resp.json().await.map_err(Error::ReceiveBody)
94 0 : }
95 :
96 : /// Get an arbitrary path and returning a streaming Response. This function is suitable
97 : /// for pass-through/proxy use cases where we don't care what the response content looks
98 : /// like.
99 : ///
100 : /// Use/add one of the properly typed methods below if you know aren't proxying, and
101 : /// know what kind of response you expect.
102 0 : pub async fn get_raw(&self, path: String) -> Result<reqwest::Response> {
103 0 : debug_assert!(path.starts_with('/'));
104 0 : let uri = format!("{}{}", self.mgmt_api_endpoint, path);
105 0 :
106 0 : let req = self.client.request(Method::GET, uri);
107 0 : let req = if let Some(value) = &self.authorization_header {
108 0 : req.header(reqwest::header::AUTHORIZATION, value)
109 : } else {
110 0 : req
111 : };
112 0 : req.send().await.map_err(Error::ReceiveBody)
113 0 : }
114 :
115 0 : pub async fn tenant_details(
116 0 : &self,
117 0 : tenant_shard_id: TenantShardId,
118 0 : ) -> Result<pageserver_api::models::TenantDetails> {
119 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
120 0 : self.get(uri)
121 0 : .await?
122 0 : .json()
123 0 : .await
124 0 : .map_err(Error::ReceiveBody)
125 0 : }
126 :
127 0 : pub async fn list_timelines(
128 0 : &self,
129 0 : tenant_shard_id: TenantShardId,
130 0 : ) -> Result<Vec<pageserver_api::models::TimelineInfo>> {
131 0 : let uri = format!(
132 0 : "{}/v1/tenant/{tenant_shard_id}/timeline",
133 0 : self.mgmt_api_endpoint
134 0 : );
135 0 : self.get(&uri)
136 0 : .await?
137 0 : .json()
138 0 : .await
139 0 : .map_err(Error::ReceiveBody)
140 0 : }
141 :
142 0 : pub async fn timeline_info(
143 0 : &self,
144 0 : tenant_shard_id: TenantShardId,
145 0 : timeline_id: TimelineId,
146 0 : force_await_logical_size: ForceAwaitLogicalSize,
147 0 : ) -> Result<pageserver_api::models::TimelineInfo> {
148 0 : let uri = format!(
149 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
150 0 : self.mgmt_api_endpoint
151 0 : );
152 :
153 0 : let uri = match force_await_logical_size {
154 0 : ForceAwaitLogicalSize::Yes => format!("{}?force-await-logical-size={}", uri, true),
155 0 : ForceAwaitLogicalSize::No => uri,
156 : };
157 :
158 0 : self.get(&uri)
159 0 : .await?
160 0 : .json()
161 0 : .await
162 0 : .map_err(Error::ReceiveBody)
163 0 : }
164 :
165 0 : pub async fn keyspace(
166 0 : &self,
167 0 : tenant_shard_id: TenantShardId,
168 0 : timeline_id: TimelineId,
169 0 : ) -> Result<pageserver_api::models::partitioning::Partitioning> {
170 0 : let uri = format!(
171 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/keyspace",
172 0 : self.mgmt_api_endpoint
173 0 : );
174 0 : self.get(&uri)
175 0 : .await?
176 0 : .json()
177 0 : .await
178 0 : .map_err(Error::ReceiveBody)
179 0 : }
180 :
181 0 : async fn get<U: IntoUrl>(&self, uri: U) -> Result<reqwest::Response> {
182 0 : self.request(Method::GET, uri, ()).await
183 0 : }
184 :
185 0 : fn start_request<U: reqwest::IntoUrl>(
186 0 : &self,
187 0 : method: Method,
188 0 : uri: U,
189 0 : ) -> reqwest::RequestBuilder {
190 0 : let req = self.client.request(method, uri);
191 0 : if let Some(value) = &self.authorization_header {
192 0 : req.header(reqwest::header::AUTHORIZATION, value)
193 : } else {
194 0 : req
195 : }
196 0 : }
197 :
198 0 : async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
199 0 : &self,
200 0 : method: Method,
201 0 : uri: U,
202 0 : body: B,
203 0 : ) -> Result<reqwest::Response> {
204 0 : self.start_request(method, uri)
205 0 : .json(&body)
206 0 : .send()
207 0 : .await
208 0 : .map_err(Error::ReceiveBody)
209 0 : }
210 :
211 0 : async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
212 0 : &self,
213 0 : method: Method,
214 0 : uri: U,
215 0 : body: B,
216 0 : ) -> Result<reqwest::Response> {
217 0 : let res = self.request_noerror(method, uri, body).await?;
218 0 : let response = res.error_from_body().await?;
219 0 : Ok(response)
220 0 : }
221 :
222 0 : pub async fn status(&self) -> Result<()> {
223 0 : let uri = format!("{}/v1/status", self.mgmt_api_endpoint);
224 0 : self.get(&uri).await?;
225 0 : Ok(())
226 0 : }
227 :
228 : /// The tenant deletion API can return 202 if deletion is incomplete, or
229 : /// 404 if it is complete. Callers are responsible for checking the status
230 : /// code and retrying. Error codes other than 404 will return Err().
231 0 : pub async fn tenant_delete(&self, tenant_shard_id: TenantShardId) -> Result<StatusCode> {
232 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
233 0 :
234 0 : match self.request(Method::DELETE, &uri, ()).await {
235 0 : Err(Error::ApiError(status_code, msg)) => {
236 0 : if status_code == StatusCode::NOT_FOUND {
237 0 : Ok(StatusCode::NOT_FOUND)
238 : } else {
239 0 : Err(Error::ApiError(status_code, msg))
240 : }
241 : }
242 0 : Err(e) => Err(e),
243 0 : Ok(response) => Ok(response.status()),
244 : }
245 0 : }
246 :
247 0 : pub async fn tenant_time_travel_remote_storage(
248 0 : &self,
249 0 : tenant_shard_id: TenantShardId,
250 0 : timestamp: &str,
251 0 : done_if_after: &str,
252 0 : ) -> Result<()> {
253 0 : let uri = format!(
254 0 : "{}/v1/tenant/{tenant_shard_id}/time_travel_remote_storage?travel_to={timestamp}&done_if_after={done_if_after}",
255 0 : self.mgmt_api_endpoint
256 0 : );
257 0 : self.request(Method::PUT, &uri, ()).await?;
258 0 : Ok(())
259 0 : }
260 :
261 0 : pub async fn tenant_scan_remote_storage(
262 0 : &self,
263 0 : tenant_id: TenantId,
264 0 : ) -> Result<TenantScanRemoteStorageResponse> {
265 0 : let uri = format!(
266 0 : "{}/v1/tenant/{tenant_id}/scan_remote_storage",
267 0 : self.mgmt_api_endpoint
268 0 : );
269 0 : let response = self.request(Method::GET, &uri, ()).await?;
270 0 : let body = response.json().await.map_err(Error::ReceiveBody)?;
271 0 : Ok(body)
272 0 : }
273 :
274 0 : pub async fn set_tenant_config(&self, req: &TenantConfigRequest) -> Result<()> {
275 0 : let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
276 0 : self.request(Method::PUT, &uri, req).await?;
277 0 : Ok(())
278 0 : }
279 :
280 0 : pub async fn patch_tenant_config(&self, req: &TenantConfigPatchRequest) -> Result<()> {
281 0 : let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
282 0 : self.request(Method::PATCH, &uri, req).await?;
283 0 : Ok(())
284 0 : }
285 :
286 0 : pub async fn tenant_secondary_download(
287 0 : &self,
288 0 : tenant_id: TenantShardId,
289 0 : wait: Option<std::time::Duration>,
290 0 : ) -> Result<(StatusCode, SecondaryProgress)> {
291 0 : let mut path = reqwest::Url::parse(&format!(
292 0 : "{}/v1/tenant/{}/secondary/download",
293 0 : self.mgmt_api_endpoint, tenant_id
294 0 : ))
295 0 : .expect("Cannot build URL");
296 :
297 0 : if let Some(wait) = wait {
298 0 : path.query_pairs_mut()
299 0 : .append_pair("wait_ms", &format!("{}", wait.as_millis()));
300 0 : }
301 :
302 0 : let response = self.request(Method::POST, path, ()).await?;
303 0 : let status = response.status();
304 0 : let progress: SecondaryProgress = response.json().await.map_err(Error::ReceiveBody)?;
305 0 : Ok((status, progress))
306 0 : }
307 :
308 0 : pub async fn tenant_secondary_status(
309 0 : &self,
310 0 : tenant_shard_id: TenantShardId,
311 0 : ) -> Result<SecondaryProgress> {
312 0 : let path = reqwest::Url::parse(&format!(
313 0 : "{}/v1/tenant/{}/secondary/status",
314 0 : self.mgmt_api_endpoint, tenant_shard_id
315 0 : ))
316 0 : .expect("Cannot build URL");
317 0 :
318 0 : self.request(Method::GET, path, ())
319 0 : .await?
320 0 : .json()
321 0 : .await
322 0 : .map_err(Error::ReceiveBody)
323 0 : }
324 :
325 0 : pub async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
326 0 : let path = reqwest::Url::parse(&format!(
327 0 : "{}/v1/tenant/{}/heatmap_upload",
328 0 : self.mgmt_api_endpoint, tenant_id
329 0 : ))
330 0 : .expect("Cannot build URL");
331 0 :
332 0 : self.request(Method::POST, path, ()).await?;
333 0 : Ok(())
334 0 : }
335 :
336 0 : pub async fn location_config(
337 0 : &self,
338 0 : tenant_shard_id: TenantShardId,
339 0 : config: LocationConfig,
340 0 : flush_ms: Option<std::time::Duration>,
341 0 : lazy: bool,
342 0 : ) -> Result<()> {
343 0 : let req_body = TenantLocationConfigRequest { config };
344 0 :
345 0 : let mut path = reqwest::Url::parse(&format!(
346 0 : "{}/v1/tenant/{}/location_config",
347 0 : self.mgmt_api_endpoint, tenant_shard_id
348 0 : ))
349 0 : // Should always work: mgmt_api_endpoint is configuration, not user input.
350 0 : .expect("Cannot build URL");
351 0 :
352 0 : if lazy {
353 0 : path.query_pairs_mut().append_pair("lazy", "true");
354 0 : }
355 :
356 0 : if let Some(flush_ms) = flush_ms {
357 0 : path.query_pairs_mut()
358 0 : .append_pair("flush_ms", &format!("{}", flush_ms.as_millis()));
359 0 : }
360 :
361 0 : self.request(Method::PUT, path, &req_body).await?;
362 0 : Ok(())
363 0 : }
364 :
365 0 : pub async fn list_location_config(&self) -> Result<LocationConfigListResponse> {
366 0 : let path = format!("{}/v1/location_config", self.mgmt_api_endpoint);
367 0 : self.request(Method::GET, &path, ())
368 0 : .await?
369 0 : .json()
370 0 : .await
371 0 : .map_err(Error::ReceiveBody)
372 0 : }
373 :
374 0 : pub async fn get_location_config(
375 0 : &self,
376 0 : tenant_shard_id: TenantShardId,
377 0 : ) -> Result<Option<LocationConfig>> {
378 0 : let path = format!(
379 0 : "{}/v1/location_config/{tenant_shard_id}",
380 0 : self.mgmt_api_endpoint
381 0 : );
382 0 : self.request(Method::GET, &path, ())
383 0 : .await?
384 0 : .json()
385 0 : .await
386 0 : .map_err(Error::ReceiveBody)
387 0 : }
388 :
389 0 : pub async fn timeline_create(
390 0 : &self,
391 0 : tenant_shard_id: TenantShardId,
392 0 : req: &TimelineCreateRequest,
393 0 : ) -> Result<TimelineInfo> {
394 0 : let uri = format!(
395 0 : "{}/v1/tenant/{}/timeline",
396 0 : self.mgmt_api_endpoint, tenant_shard_id
397 0 : );
398 0 : self.request(Method::POST, &uri, req)
399 0 : .await?
400 0 : .json()
401 0 : .await
402 0 : .map_err(Error::ReceiveBody)
403 0 : }
404 :
405 : /// The timeline deletion API can return 201 if deletion is incomplete, or
406 : /// 403 if it is complete. Callers are responsible for checking the status
407 : /// code and retrying. Error codes other than 403 will return Err().
408 0 : pub async fn timeline_delete(
409 0 : &self,
410 0 : tenant_shard_id: TenantShardId,
411 0 : timeline_id: TimelineId,
412 0 : ) -> Result<StatusCode> {
413 0 : let uri = format!(
414 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
415 0 : self.mgmt_api_endpoint
416 0 : );
417 0 :
418 0 : match self.request(Method::DELETE, &uri, ()).await {
419 0 : Err(Error::ApiError(status_code, msg)) => {
420 0 : if status_code == StatusCode::NOT_FOUND {
421 0 : Ok(StatusCode::NOT_FOUND)
422 : } else {
423 0 : Err(Error::ApiError(status_code, msg))
424 : }
425 : }
426 0 : Err(e) => Err(e),
427 0 : Ok(response) => Ok(response.status()),
428 : }
429 0 : }
430 :
431 0 : pub async fn timeline_archival_config(
432 0 : &self,
433 0 : tenant_shard_id: TenantShardId,
434 0 : timeline_id: TimelineId,
435 0 : req: &TimelineArchivalConfigRequest,
436 0 : ) -> Result<()> {
437 0 : let uri = format!(
438 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/archival_config",
439 0 : self.mgmt_api_endpoint
440 0 : );
441 0 :
442 0 : self.request(Method::PUT, &uri, req)
443 0 : .await?
444 0 : .json()
445 0 : .await
446 0 : .map_err(Error::ReceiveBody)
447 0 : }
448 :
449 0 : pub async fn timeline_detach_ancestor(
450 0 : &self,
451 0 : tenant_shard_id: TenantShardId,
452 0 : timeline_id: TimelineId,
453 0 : ) -> Result<AncestorDetached> {
454 0 : let uri = format!(
455 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/detach_ancestor",
456 0 : self.mgmt_api_endpoint
457 0 : );
458 0 :
459 0 : self.request(Method::PUT, &uri, ())
460 0 : .await?
461 0 : .json()
462 0 : .await
463 0 : .map_err(Error::ReceiveBody)
464 0 : }
465 :
466 0 : pub async fn timeline_block_unblock_gc(
467 0 : &self,
468 0 : tenant_shard_id: TenantShardId,
469 0 : timeline_id: TimelineId,
470 0 : dir: BlockUnblock,
471 0 : ) -> Result<()> {
472 0 : let uri = format!(
473 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/{dir}_gc",
474 0 : self.mgmt_api_endpoint,
475 0 : );
476 0 :
477 0 : self.request(Method::POST, &uri, ()).await.map(|_| ())
478 0 : }
479 :
480 0 : pub async fn timeline_download_heatmap_layers(
481 0 : &self,
482 0 : tenant_shard_id: TenantShardId,
483 0 : timeline_id: TimelineId,
484 0 : concurrency: Option<usize>,
485 0 : ) -> Result<()> {
486 0 : let mut path = reqwest::Url::parse(&format!(
487 0 : "{}/v1/tenant/{}/timeline/{}/download_heatmap_layers",
488 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
489 0 : ))
490 0 : .expect("Cannot build URL");
491 :
492 0 : if let Some(concurrency) = concurrency {
493 0 : path.query_pairs_mut()
494 0 : .append_pair("concurrency", &format!("{}", concurrency));
495 0 : }
496 :
497 0 : self.request(Method::POST, path, ()).await.map(|_| ())
498 0 : }
499 :
500 0 : pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
501 0 : let uri = format!(
502 0 : "{}/v1/tenant/{}/reset",
503 0 : self.mgmt_api_endpoint, tenant_shard_id
504 0 : );
505 0 : self.request(Method::POST, &uri, ())
506 0 : .await?
507 0 : .json()
508 0 : .await
509 0 : .map_err(Error::ReceiveBody)
510 0 : }
511 :
512 0 : pub async fn tenant_shard_split(
513 0 : &self,
514 0 : tenant_shard_id: TenantShardId,
515 0 : req: TenantShardSplitRequest,
516 0 : ) -> Result<TenantShardSplitResponse> {
517 0 : let uri = format!(
518 0 : "{}/v1/tenant/{}/shard_split",
519 0 : self.mgmt_api_endpoint, tenant_shard_id
520 0 : );
521 0 : self.request(Method::PUT, &uri, req)
522 0 : .await?
523 0 : .json()
524 0 : .await
525 0 : .map_err(Error::ReceiveBody)
526 0 : }
527 :
528 0 : pub async fn timeline_list(
529 0 : &self,
530 0 : tenant_shard_id: &TenantShardId,
531 0 : ) -> Result<Vec<TimelineInfo>> {
532 0 : let uri = format!(
533 0 : "{}/v1/tenant/{}/timeline",
534 0 : self.mgmt_api_endpoint, tenant_shard_id
535 0 : );
536 0 : self.get(&uri)
537 0 : .await?
538 0 : .json()
539 0 : .await
540 0 : .map_err(Error::ReceiveBody)
541 0 : }
542 :
543 0 : pub async fn tenant_synthetic_size(
544 0 : &self,
545 0 : tenant_shard_id: TenantShardId,
546 0 : ) -> Result<TenantHistorySize> {
547 0 : let uri = format!(
548 0 : "{}/v1/tenant/{}/synthetic_size",
549 0 : self.mgmt_api_endpoint, tenant_shard_id
550 0 : );
551 0 : self.get(&uri)
552 0 : .await?
553 0 : .json()
554 0 : .await
555 0 : .map_err(Error::ReceiveBody)
556 0 : }
557 :
558 0 : pub async fn put_io_engine(
559 0 : &self,
560 0 : engine: &pageserver_api::models::virtual_file::IoEngineKind,
561 0 : ) -> Result<()> {
562 0 : let uri = format!("{}/v1/io_engine", self.mgmt_api_endpoint);
563 0 : self.request(Method::PUT, uri, engine)
564 0 : .await?
565 0 : .json()
566 0 : .await
567 0 : .map_err(Error::ReceiveBody)
568 0 : }
569 :
570 : /// Configs io mode at runtime.
571 0 : pub async fn put_io_mode(
572 0 : &self,
573 0 : mode: &pageserver_api::models::virtual_file::IoMode,
574 0 : ) -> Result<()> {
575 0 : let uri = format!("{}/v1/io_mode", self.mgmt_api_endpoint);
576 0 : self.request(Method::PUT, uri, mode)
577 0 : .await?
578 0 : .json()
579 0 : .await
580 0 : .map_err(Error::ReceiveBody)
581 0 : }
582 :
583 0 : pub async fn get_utilization(&self) -> Result<PageserverUtilization> {
584 0 : let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint);
585 0 : self.get(uri)
586 0 : .await?
587 0 : .json()
588 0 : .await
589 0 : .map_err(Error::ReceiveBody)
590 0 : }
591 :
592 0 : pub async fn top_tenant_shards(
593 0 : &self,
594 0 : request: TopTenantShardsRequest,
595 0 : ) -> Result<TopTenantShardsResponse> {
596 0 : let uri = format!("{}/v1/top_tenants", self.mgmt_api_endpoint);
597 0 : self.request(Method::POST, uri, request)
598 0 : .await?
599 0 : .json()
600 0 : .await
601 0 : .map_err(Error::ReceiveBody)
602 0 : }
603 :
604 0 : pub async fn layer_map_info(
605 0 : &self,
606 0 : tenant_shard_id: TenantShardId,
607 0 : timeline_id: TimelineId,
608 0 : ) -> Result<LayerMapInfo> {
609 0 : let uri = format!(
610 0 : "{}/v1/tenant/{}/timeline/{}/layer",
611 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id,
612 0 : );
613 0 : self.get(&uri)
614 0 : .await?
615 0 : .json()
616 0 : .await
617 0 : .map_err(Error::ReceiveBody)
618 0 : }
619 :
620 0 : pub async fn layer_evict(
621 0 : &self,
622 0 : tenant_shard_id: TenantShardId,
623 0 : timeline_id: TimelineId,
624 0 : layer_file_name: &str,
625 0 : ) -> Result<bool> {
626 0 : let uri = format!(
627 0 : "{}/v1/tenant/{}/timeline/{}/layer/{}",
628 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
629 0 : );
630 0 : let resp = self.request_noerror(Method::DELETE, &uri, ()).await?;
631 0 : match resp.status() {
632 0 : StatusCode::OK => Ok(true),
633 0 : StatusCode::NOT_MODIFIED => Ok(false),
634 : // TODO: dedupe this pattern / introduce separate error variant?
635 0 : status => Err(match resp.json::<HttpErrorBody>().await {
636 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
637 : Err(_) => {
638 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
639 : }
640 : }),
641 : }
642 0 : }
643 :
644 0 : pub async fn layer_ondemand_download(
645 0 : &self,
646 0 : tenant_shard_id: TenantShardId,
647 0 : timeline_id: TimelineId,
648 0 : layer_file_name: &str,
649 0 : ) -> Result<bool> {
650 0 : let uri = format!(
651 0 : "{}/v1/tenant/{}/timeline/{}/layer/{}",
652 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
653 0 : );
654 0 : let resp = self.request_noerror(Method::GET, &uri, ()).await?;
655 0 : match resp.status() {
656 0 : StatusCode::OK => Ok(true),
657 0 : StatusCode::NOT_MODIFIED => Ok(false),
658 : // TODO: dedupe this pattern / introduce separate error variant?
659 0 : status => Err(match resp.json::<HttpErrorBody>().await {
660 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
661 : Err(_) => {
662 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
663 : }
664 : }),
665 : }
666 0 : }
667 :
668 0 : pub async fn ingest_aux_files(
669 0 : &self,
670 0 : tenant_shard_id: TenantShardId,
671 0 : timeline_id: TimelineId,
672 0 : aux_files: HashMap<String, String>,
673 0 : ) -> Result<bool> {
674 0 : let uri = format!(
675 0 : "{}/v1/tenant/{}/timeline/{}/ingest_aux_files",
676 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
677 0 : );
678 0 : let resp = self
679 0 : .request_noerror(Method::POST, &uri, IngestAuxFilesRequest { aux_files })
680 0 : .await?;
681 0 : match resp.status() {
682 0 : StatusCode::OK => Ok(true),
683 0 : status => Err(match resp.json::<HttpErrorBody>().await {
684 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
685 : Err(_) => {
686 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
687 : }
688 : }),
689 : }
690 0 : }
691 :
692 0 : pub async fn list_aux_files(
693 0 : &self,
694 0 : tenant_shard_id: TenantShardId,
695 0 : timeline_id: TimelineId,
696 0 : lsn: Lsn,
697 0 : ) -> Result<HashMap<String, Bytes>> {
698 0 : let uri = format!(
699 0 : "{}/v1/tenant/{}/timeline/{}/list_aux_files",
700 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
701 0 : );
702 0 : let resp = self
703 0 : .request_noerror(Method::POST, &uri, ListAuxFilesRequest { lsn })
704 0 : .await?;
705 0 : match resp.status() {
706 : StatusCode::OK => {
707 0 : let resp: HashMap<String, Bytes> = resp.json().await.map_err(|e| {
708 0 : Error::ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{e}"))
709 0 : })?;
710 0 : Ok(resp)
711 : }
712 0 : status => Err(match resp.json::<HttpErrorBody>().await {
713 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
714 : Err(_) => {
715 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
716 : }
717 : }),
718 : }
719 0 : }
720 :
721 0 : pub async fn import_basebackup(
722 0 : &self,
723 0 : tenant_id: TenantId,
724 0 : timeline_id: TimelineId,
725 0 : base_lsn: Lsn,
726 0 : end_lsn: Lsn,
727 0 : pg_version: u32,
728 0 : basebackup_tarball: ReqwestBody,
729 0 : ) -> Result<()> {
730 0 : let uri = format!(
731 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_basebackup?base_lsn={base_lsn}&end_lsn={end_lsn}&pg_version={pg_version}",
732 0 : self.mgmt_api_endpoint,
733 0 : );
734 0 : self.start_request(Method::PUT, uri)
735 0 : .body(basebackup_tarball)
736 0 : .send()
737 0 : .await
738 0 : .map_err(Error::SendRequest)?
739 0 : .error_from_body()
740 0 : .await?
741 0 : .json()
742 0 : .await
743 0 : .map_err(Error::ReceiveBody)
744 0 : }
745 :
746 0 : pub async fn import_wal(
747 0 : &self,
748 0 : tenant_id: TenantId,
749 0 : timeline_id: TimelineId,
750 0 : start_lsn: Lsn,
751 0 : end_lsn: Lsn,
752 0 : wal_tarball: ReqwestBody,
753 0 : ) -> Result<()> {
754 0 : let uri = format!(
755 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_wal?start_lsn={start_lsn}&end_lsn={end_lsn}",
756 0 : self.mgmt_api_endpoint,
757 0 : );
758 0 : self.start_request(Method::PUT, uri)
759 0 : .body(wal_tarball)
760 0 : .send()
761 0 : .await
762 0 : .map_err(Error::SendRequest)?
763 0 : .error_from_body()
764 0 : .await?
765 0 : .json()
766 0 : .await
767 0 : .map_err(Error::ReceiveBody)
768 0 : }
769 :
770 0 : pub async fn timeline_init_lsn_lease(
771 0 : &self,
772 0 : tenant_shard_id: TenantShardId,
773 0 : timeline_id: TimelineId,
774 0 : lsn: Lsn,
775 0 : ) -> Result<LsnLease> {
776 0 : let uri = format!(
777 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/lsn_lease",
778 0 : self.mgmt_api_endpoint,
779 0 : );
780 0 :
781 0 : self.request(Method::POST, &uri, LsnLeaseRequest { lsn })
782 0 : .await?
783 0 : .json()
784 0 : .await
785 0 : .map_err(Error::ReceiveBody)
786 0 : }
787 :
788 0 : pub async fn wait_lsn(
789 0 : &self,
790 0 : tenant_shard_id: TenantShardId,
791 0 : request: TenantWaitLsnRequest,
792 0 : ) -> Result<StatusCode> {
793 0 : let uri = format!(
794 0 : "{}/v1/tenant/{tenant_shard_id}/wait_lsn",
795 0 : self.mgmt_api_endpoint,
796 0 : );
797 0 :
798 0 : self.request_noerror(Method::POST, uri, request)
799 0 : .await
800 0 : .map(|resp| resp.status())
801 0 : }
802 : }
|