Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use bytes::Bytes;
4 : use detach_ancestor::AncestorDetached;
5 : use pageserver_api::{models::*, shard::TenantShardId};
6 : use reqwest::{IntoUrl, Method, StatusCode};
7 : use utils::{
8 : http::error::HttpErrorBody,
9 : id::{TenantId, TimelineId},
10 : lsn::Lsn,
11 : };
12 :
13 : pub use reqwest::Body as ReqwestBody;
14 :
15 : use crate::BlockUnblock;
16 :
17 : pub mod util;
18 :
19 : #[derive(Debug, Clone)]
20 : pub struct Client {
21 : mgmt_api_endpoint: String,
22 : authorization_header: Option<String>,
23 : client: reqwest::Client,
24 : }
25 :
26 0 : #[derive(thiserror::Error, Debug)]
27 : pub enum Error {
28 : #[error("send request: {0}")]
29 : SendRequest(reqwest::Error),
30 :
31 : #[error("receive body: {0}")]
32 : ReceiveBody(reqwest::Error),
33 :
34 : #[error("receive error body: {0}")]
35 : ReceiveErrorBody(String),
36 :
37 : #[error("pageserver API: {1}")]
38 : ApiError(StatusCode, String),
39 :
40 : #[error("Cancelled")]
41 : Cancelled,
42 : }
43 :
44 : pub type Result<T> = std::result::Result<T, Error>;
45 :
46 : pub trait ResponseErrorMessageExt: Sized {
47 : fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
48 : }
49 :
50 : impl ResponseErrorMessageExt for reqwest::Response {
51 0 : async fn error_from_body(self) -> Result<Self> {
52 0 : let status = self.status();
53 0 : if !(status.is_client_error() || status.is_server_error()) {
54 0 : return Ok(self);
55 0 : }
56 0 :
57 0 : let url = self.url().to_owned();
58 0 : Err(match self.json::<HttpErrorBody>().await {
59 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
60 : Err(_) => {
61 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), url))
62 : }
63 : })
64 0 : }
65 : }
66 :
67 : pub enum ForceAwaitLogicalSize {
68 : Yes,
69 : No,
70 : }
71 :
72 : impl Client {
73 0 : pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
74 0 : Self::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt)
75 0 : }
76 :
77 0 : pub fn from_client(
78 0 : client: reqwest::Client,
79 0 : mgmt_api_endpoint: String,
80 0 : jwt: Option<&str>,
81 0 : ) -> Self {
82 0 : Self {
83 0 : mgmt_api_endpoint,
84 0 : authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
85 0 : client,
86 0 : }
87 0 : }
88 :
89 0 : pub async fn list_tenants(&self) -> Result<Vec<pageserver_api::models::TenantInfo>> {
90 0 : let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
91 0 : let resp = self.get(&uri).await?;
92 0 : resp.json().await.map_err(Error::ReceiveBody)
93 0 : }
94 :
95 : /// Get an arbitrary path and returning a streaming Response. This function is suitable
96 : /// for pass-through/proxy use cases where we don't care what the response content looks
97 : /// like.
98 : ///
99 : /// Use/add one of the properly typed methods below if you know aren't proxying, and
100 : /// know what kind of response you expect.
101 0 : pub async fn get_raw(&self, path: String) -> Result<reqwest::Response> {
102 0 : debug_assert!(path.starts_with('/'));
103 0 : let uri = format!("{}{}", self.mgmt_api_endpoint, path);
104 0 :
105 0 : let req = self.client.request(Method::GET, uri);
106 0 : let req = if let Some(value) = &self.authorization_header {
107 0 : req.header(reqwest::header::AUTHORIZATION, value)
108 : } else {
109 0 : req
110 : };
111 0 : req.send().await.map_err(Error::ReceiveBody)
112 0 : }
113 :
114 0 : pub async fn tenant_details(
115 0 : &self,
116 0 : tenant_shard_id: TenantShardId,
117 0 : ) -> Result<pageserver_api::models::TenantDetails> {
118 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
119 0 : self.get(uri)
120 0 : .await?
121 0 : .json()
122 0 : .await
123 0 : .map_err(Error::ReceiveBody)
124 0 : }
125 :
126 0 : pub async fn list_timelines(
127 0 : &self,
128 0 : tenant_shard_id: TenantShardId,
129 0 : ) -> Result<Vec<pageserver_api::models::TimelineInfo>> {
130 0 : let uri = format!(
131 0 : "{}/v1/tenant/{tenant_shard_id}/timeline",
132 0 : self.mgmt_api_endpoint
133 0 : );
134 0 : self.get(&uri)
135 0 : .await?
136 0 : .json()
137 0 : .await
138 0 : .map_err(Error::ReceiveBody)
139 0 : }
140 :
141 0 : pub async fn timeline_info(
142 0 : &self,
143 0 : tenant_shard_id: TenantShardId,
144 0 : timeline_id: TimelineId,
145 0 : force_await_logical_size: ForceAwaitLogicalSize,
146 0 : ) -> Result<pageserver_api::models::TimelineInfo> {
147 0 : let uri = format!(
148 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
149 0 : self.mgmt_api_endpoint
150 0 : );
151 :
152 0 : let uri = match force_await_logical_size {
153 0 : ForceAwaitLogicalSize::Yes => format!("{}?force-await-logical-size={}", uri, true),
154 0 : ForceAwaitLogicalSize::No => uri,
155 : };
156 :
157 0 : self.get(&uri)
158 0 : .await?
159 0 : .json()
160 0 : .await
161 0 : .map_err(Error::ReceiveBody)
162 0 : }
163 :
164 0 : pub async fn keyspace(
165 0 : &self,
166 0 : tenant_shard_id: TenantShardId,
167 0 : timeline_id: TimelineId,
168 0 : ) -> Result<pageserver_api::models::partitioning::Partitioning> {
169 0 : let uri = format!(
170 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/keyspace",
171 0 : self.mgmt_api_endpoint
172 0 : );
173 0 : self.get(&uri)
174 0 : .await?
175 0 : .json()
176 0 : .await
177 0 : .map_err(Error::ReceiveBody)
178 0 : }
179 :
180 0 : async fn get<U: IntoUrl>(&self, uri: U) -> Result<reqwest::Response> {
181 0 : self.request(Method::GET, uri, ()).await
182 0 : }
183 :
184 0 : fn start_request<U: reqwest::IntoUrl>(
185 0 : &self,
186 0 : method: Method,
187 0 : uri: U,
188 0 : ) -> reqwest::RequestBuilder {
189 0 : let req = self.client.request(method, uri);
190 0 : if let Some(value) = &self.authorization_header {
191 0 : req.header(reqwest::header::AUTHORIZATION, value)
192 : } else {
193 0 : req
194 : }
195 0 : }
196 :
197 0 : async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
198 0 : &self,
199 0 : method: Method,
200 0 : uri: U,
201 0 : body: B,
202 0 : ) -> Result<reqwest::Response> {
203 0 : self.start_request(method, uri)
204 0 : .json(&body)
205 0 : .send()
206 0 : .await
207 0 : .map_err(Error::ReceiveBody)
208 0 : }
209 :
210 0 : async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
211 0 : &self,
212 0 : method: Method,
213 0 : uri: U,
214 0 : body: B,
215 0 : ) -> Result<reqwest::Response> {
216 0 : let res = self.request_noerror(method, uri, body).await?;
217 0 : let response = res.error_from_body().await?;
218 0 : Ok(response)
219 0 : }
220 :
221 0 : pub async fn status(&self) -> Result<()> {
222 0 : let uri = format!("{}/v1/status", self.mgmt_api_endpoint);
223 0 : self.get(&uri).await?;
224 0 : Ok(())
225 0 : }
226 :
227 : /// The tenant deletion API can return 202 if deletion is incomplete, or
228 : /// 404 if it is complete. Callers are responsible for checking the status
229 : /// code and retrying. Error codes other than 404 will return Err().
230 0 : pub async fn tenant_delete(&self, tenant_shard_id: TenantShardId) -> Result<StatusCode> {
231 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
232 0 :
233 0 : match self.request(Method::DELETE, &uri, ()).await {
234 0 : Err(Error::ApiError(status_code, msg)) => {
235 0 : if status_code == StatusCode::NOT_FOUND {
236 0 : Ok(StatusCode::NOT_FOUND)
237 : } else {
238 0 : Err(Error::ApiError(status_code, msg))
239 : }
240 : }
241 0 : Err(e) => Err(e),
242 0 : Ok(response) => Ok(response.status()),
243 : }
244 0 : }
245 :
246 0 : pub async fn tenant_time_travel_remote_storage(
247 0 : &self,
248 0 : tenant_shard_id: TenantShardId,
249 0 : timestamp: &str,
250 0 : done_if_after: &str,
251 0 : ) -> Result<()> {
252 0 : let uri = format!(
253 0 : "{}/v1/tenant/{tenant_shard_id}/time_travel_remote_storage?travel_to={timestamp}&done_if_after={done_if_after}",
254 0 : self.mgmt_api_endpoint
255 0 : );
256 0 : self.request(Method::PUT, &uri, ()).await?;
257 0 : Ok(())
258 0 : }
259 :
260 0 : pub async fn tenant_scan_remote_storage(
261 0 : &self,
262 0 : tenant_id: TenantId,
263 0 : ) -> Result<TenantScanRemoteStorageResponse> {
264 0 : let uri = format!(
265 0 : "{}/v1/tenant/{tenant_id}/scan_remote_storage",
266 0 : self.mgmt_api_endpoint
267 0 : );
268 0 : let response = self.request(Method::GET, &uri, ()).await?;
269 0 : let body = response.json().await.map_err(Error::ReceiveBody)?;
270 0 : Ok(body)
271 0 : }
272 :
273 0 : pub async fn tenant_config(&self, req: &TenantConfigRequest) -> Result<()> {
274 0 : let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
275 0 : self.request(Method::PUT, &uri, req).await?;
276 0 : Ok(())
277 0 : }
278 :
279 0 : pub async fn tenant_secondary_download(
280 0 : &self,
281 0 : tenant_id: TenantShardId,
282 0 : wait: Option<std::time::Duration>,
283 0 : ) -> Result<(StatusCode, SecondaryProgress)> {
284 0 : let mut path = reqwest::Url::parse(&format!(
285 0 : "{}/v1/tenant/{}/secondary/download",
286 0 : self.mgmt_api_endpoint, tenant_id
287 0 : ))
288 0 : .expect("Cannot build URL");
289 :
290 0 : if let Some(wait) = wait {
291 0 : path.query_pairs_mut()
292 0 : .append_pair("wait_ms", &format!("{}", wait.as_millis()));
293 0 : }
294 :
295 0 : let response = self.request(Method::POST, path, ()).await?;
296 0 : let status = response.status();
297 0 : let progress: SecondaryProgress = response.json().await.map_err(Error::ReceiveBody)?;
298 0 : Ok((status, progress))
299 0 : }
300 :
301 0 : pub async fn tenant_secondary_status(
302 0 : &self,
303 0 : tenant_shard_id: TenantShardId,
304 0 : ) -> Result<SecondaryProgress> {
305 0 : let path = reqwest::Url::parse(&format!(
306 0 : "{}/v1/tenant/{}/secondary/status",
307 0 : self.mgmt_api_endpoint, tenant_shard_id
308 0 : ))
309 0 : .expect("Cannot build URL");
310 0 :
311 0 : self.request(Method::GET, path, ())
312 0 : .await?
313 0 : .json()
314 0 : .await
315 0 : .map_err(Error::ReceiveBody)
316 0 : }
317 :
318 0 : pub async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
319 0 : let path = reqwest::Url::parse(&format!(
320 0 : "{}/v1/tenant/{}/heatmap_upload",
321 0 : self.mgmt_api_endpoint, tenant_id
322 0 : ))
323 0 : .expect("Cannot build URL");
324 0 :
325 0 : self.request(Method::POST, path, ()).await?;
326 0 : Ok(())
327 0 : }
328 :
329 0 : pub async fn location_config(
330 0 : &self,
331 0 : tenant_shard_id: TenantShardId,
332 0 : config: LocationConfig,
333 0 : flush_ms: Option<std::time::Duration>,
334 0 : lazy: bool,
335 0 : ) -> Result<()> {
336 0 : let req_body = TenantLocationConfigRequest { config };
337 0 :
338 0 : let mut path = reqwest::Url::parse(&format!(
339 0 : "{}/v1/tenant/{}/location_config",
340 0 : self.mgmt_api_endpoint, tenant_shard_id
341 0 : ))
342 0 : // Should always work: mgmt_api_endpoint is configuration, not user input.
343 0 : .expect("Cannot build URL");
344 0 :
345 0 : if lazy {
346 0 : path.query_pairs_mut().append_pair("lazy", "true");
347 0 : }
348 :
349 0 : if let Some(flush_ms) = flush_ms {
350 0 : path.query_pairs_mut()
351 0 : .append_pair("flush_ms", &format!("{}", flush_ms.as_millis()));
352 0 : }
353 :
354 0 : self.request(Method::PUT, path, &req_body).await?;
355 0 : Ok(())
356 0 : }
357 :
358 0 : pub async fn list_location_config(&self) -> Result<LocationConfigListResponse> {
359 0 : let path = format!("{}/v1/location_config", self.mgmt_api_endpoint);
360 0 : self.request(Method::GET, &path, ())
361 0 : .await?
362 0 : .json()
363 0 : .await
364 0 : .map_err(Error::ReceiveBody)
365 0 : }
366 :
367 0 : pub async fn get_location_config(
368 0 : &self,
369 0 : tenant_shard_id: TenantShardId,
370 0 : ) -> Result<Option<LocationConfig>> {
371 0 : let path = format!(
372 0 : "{}/v1/location_config/{tenant_shard_id}",
373 0 : self.mgmt_api_endpoint
374 0 : );
375 0 : self.request(Method::GET, &path, ())
376 0 : .await?
377 0 : .json()
378 0 : .await
379 0 : .map_err(Error::ReceiveBody)
380 0 : }
381 :
382 0 : pub async fn timeline_create(
383 0 : &self,
384 0 : tenant_shard_id: TenantShardId,
385 0 : req: &TimelineCreateRequest,
386 0 : ) -> Result<TimelineInfo> {
387 0 : let uri = format!(
388 0 : "{}/v1/tenant/{}/timeline",
389 0 : self.mgmt_api_endpoint, tenant_shard_id
390 0 : );
391 0 : self.request(Method::POST, &uri, req)
392 0 : .await?
393 0 : .json()
394 0 : .await
395 0 : .map_err(Error::ReceiveBody)
396 0 : }
397 :
398 : /// The timeline deletion API can return 201 if deletion is incomplete, or
399 : /// 403 if it is complete. Callers are responsible for checking the status
400 : /// code and retrying. Error codes other than 403 will return Err().
401 0 : pub async fn timeline_delete(
402 0 : &self,
403 0 : tenant_shard_id: TenantShardId,
404 0 : timeline_id: TimelineId,
405 0 : ) -> Result<StatusCode> {
406 0 : let uri = format!(
407 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
408 0 : self.mgmt_api_endpoint
409 0 : );
410 0 :
411 0 : match self.request(Method::DELETE, &uri, ()).await {
412 0 : Err(Error::ApiError(status_code, msg)) => {
413 0 : if status_code == StatusCode::NOT_FOUND {
414 0 : Ok(StatusCode::NOT_FOUND)
415 : } else {
416 0 : Err(Error::ApiError(status_code, msg))
417 : }
418 : }
419 0 : Err(e) => Err(e),
420 0 : Ok(response) => Ok(response.status()),
421 : }
422 0 : }
423 :
424 0 : pub async fn timeline_archival_config(
425 0 : &self,
426 0 : tenant_shard_id: TenantShardId,
427 0 : timeline_id: TimelineId,
428 0 : req: &TimelineArchivalConfigRequest,
429 0 : ) -> Result<()> {
430 0 : let uri = format!(
431 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/archival_config",
432 0 : self.mgmt_api_endpoint
433 0 : );
434 0 :
435 0 : self.request(Method::PUT, &uri, req)
436 0 : .await?
437 0 : .json()
438 0 : .await
439 0 : .map_err(Error::ReceiveBody)
440 0 : }
441 :
442 0 : pub async fn timeline_detach_ancestor(
443 0 : &self,
444 0 : tenant_shard_id: TenantShardId,
445 0 : timeline_id: TimelineId,
446 0 : ) -> Result<AncestorDetached> {
447 0 : let uri = format!(
448 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/detach_ancestor",
449 0 : self.mgmt_api_endpoint
450 0 : );
451 0 :
452 0 : self.request(Method::PUT, &uri, ())
453 0 : .await?
454 0 : .json()
455 0 : .await
456 0 : .map_err(Error::ReceiveBody)
457 0 : }
458 :
459 0 : pub async fn timeline_block_unblock_gc(
460 0 : &self,
461 0 : tenant_shard_id: TenantShardId,
462 0 : timeline_id: TimelineId,
463 0 : dir: BlockUnblock,
464 0 : ) -> Result<()> {
465 0 : let uri = format!(
466 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/{dir}_gc",
467 0 : self.mgmt_api_endpoint,
468 0 : );
469 0 :
470 0 : self.request(Method::POST, &uri, ()).await.map(|_| ())
471 0 : }
472 :
473 0 : pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
474 0 : let uri = format!(
475 0 : "{}/v1/tenant/{}/reset",
476 0 : self.mgmt_api_endpoint, tenant_shard_id
477 0 : );
478 0 : self.request(Method::POST, &uri, ())
479 0 : .await?
480 0 : .json()
481 0 : .await
482 0 : .map_err(Error::ReceiveBody)
483 0 : }
484 :
485 0 : pub async fn tenant_shard_split(
486 0 : &self,
487 0 : tenant_shard_id: TenantShardId,
488 0 : req: TenantShardSplitRequest,
489 0 : ) -> Result<TenantShardSplitResponse> {
490 0 : let uri = format!(
491 0 : "{}/v1/tenant/{}/shard_split",
492 0 : self.mgmt_api_endpoint, tenant_shard_id
493 0 : );
494 0 : self.request(Method::PUT, &uri, req)
495 0 : .await?
496 0 : .json()
497 0 : .await
498 0 : .map_err(Error::ReceiveBody)
499 0 : }
500 :
501 0 : pub async fn timeline_list(
502 0 : &self,
503 0 : tenant_shard_id: &TenantShardId,
504 0 : ) -> Result<Vec<TimelineInfo>> {
505 0 : let uri = format!(
506 0 : "{}/v1/tenant/{}/timeline",
507 0 : self.mgmt_api_endpoint, tenant_shard_id
508 0 : );
509 0 : self.get(&uri)
510 0 : .await?
511 0 : .json()
512 0 : .await
513 0 : .map_err(Error::ReceiveBody)
514 0 : }
515 :
516 0 : pub async fn tenant_synthetic_size(
517 0 : &self,
518 0 : tenant_shard_id: TenantShardId,
519 0 : ) -> Result<TenantHistorySize> {
520 0 : let uri = format!(
521 0 : "{}/v1/tenant/{}/synthetic_size",
522 0 : self.mgmt_api_endpoint, tenant_shard_id
523 0 : );
524 0 : self.get(&uri)
525 0 : .await?
526 0 : .json()
527 0 : .await
528 0 : .map_err(Error::ReceiveBody)
529 0 : }
530 :
531 0 : pub async fn put_io_engine(
532 0 : &self,
533 0 : engine: &pageserver_api::models::virtual_file::IoEngineKind,
534 0 : ) -> Result<()> {
535 0 : let uri = format!("{}/v1/io_engine", self.mgmt_api_endpoint);
536 0 : self.request(Method::PUT, uri, engine)
537 0 : .await?
538 0 : .json()
539 0 : .await
540 0 : .map_err(Error::ReceiveBody)
541 0 : }
542 :
543 : /// Configs io mode at runtime.
544 0 : pub async fn put_io_mode(
545 0 : &self,
546 0 : mode: &pageserver_api::models::virtual_file::IoMode,
547 0 : ) -> Result<()> {
548 0 : let uri = format!("{}/v1/io_mode", self.mgmt_api_endpoint);
549 0 : self.request(Method::PUT, uri, mode)
550 0 : .await?
551 0 : .json()
552 0 : .await
553 0 : .map_err(Error::ReceiveBody)
554 0 : }
555 :
556 0 : pub async fn get_utilization(&self) -> Result<PageserverUtilization> {
557 0 : let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint);
558 0 : self.get(uri)
559 0 : .await?
560 0 : .json()
561 0 : .await
562 0 : .map_err(Error::ReceiveBody)
563 0 : }
564 :
565 0 : pub async fn top_tenant_shards(
566 0 : &self,
567 0 : request: TopTenantShardsRequest,
568 0 : ) -> Result<TopTenantShardsResponse> {
569 0 : let uri = format!("{}/v1/top_tenants", self.mgmt_api_endpoint);
570 0 : self.request(Method::POST, uri, request)
571 0 : .await?
572 0 : .json()
573 0 : .await
574 0 : .map_err(Error::ReceiveBody)
575 0 : }
576 :
577 0 : pub async fn layer_map_info(
578 0 : &self,
579 0 : tenant_shard_id: TenantShardId,
580 0 : timeline_id: TimelineId,
581 0 : ) -> Result<LayerMapInfo> {
582 0 : let uri = format!(
583 0 : "{}/v1/tenant/{}/timeline/{}/layer",
584 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id,
585 0 : );
586 0 : self.get(&uri)
587 0 : .await?
588 0 : .json()
589 0 : .await
590 0 : .map_err(Error::ReceiveBody)
591 0 : }
592 :
593 0 : pub async fn layer_evict(
594 0 : &self,
595 0 : tenant_shard_id: TenantShardId,
596 0 : timeline_id: TimelineId,
597 0 : layer_file_name: &str,
598 0 : ) -> Result<bool> {
599 0 : let uri = format!(
600 0 : "{}/v1/tenant/{}/timeline/{}/layer/{}",
601 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
602 0 : );
603 0 : let resp = self.request_noerror(Method::DELETE, &uri, ()).await?;
604 0 : match resp.status() {
605 0 : StatusCode::OK => Ok(true),
606 0 : StatusCode::NOT_MODIFIED => Ok(false),
607 : // TODO: dedupe this pattern / introduce separate error variant?
608 0 : status => Err(match resp.json::<HttpErrorBody>().await {
609 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
610 : Err(_) => {
611 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
612 : }
613 : }),
614 : }
615 0 : }
616 :
617 0 : pub async fn layer_ondemand_download(
618 0 : &self,
619 0 : tenant_shard_id: TenantShardId,
620 0 : timeline_id: TimelineId,
621 0 : layer_file_name: &str,
622 0 : ) -> Result<bool> {
623 0 : let uri = format!(
624 0 : "{}/v1/tenant/{}/timeline/{}/layer/{}",
625 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
626 0 : );
627 0 : let resp = self.request_noerror(Method::GET, &uri, ()).await?;
628 0 : match resp.status() {
629 0 : StatusCode::OK => Ok(true),
630 0 : StatusCode::NOT_MODIFIED => Ok(false),
631 : // TODO: dedupe this pattern / introduce separate error variant?
632 0 : status => Err(match resp.json::<HttpErrorBody>().await {
633 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
634 : Err(_) => {
635 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
636 : }
637 : }),
638 : }
639 0 : }
640 :
641 0 : pub async fn ingest_aux_files(
642 0 : &self,
643 0 : tenant_shard_id: TenantShardId,
644 0 : timeline_id: TimelineId,
645 0 : aux_files: HashMap<String, String>,
646 0 : ) -> Result<bool> {
647 0 : let uri = format!(
648 0 : "{}/v1/tenant/{}/timeline/{}/ingest_aux_files",
649 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
650 0 : );
651 0 : let resp = self
652 0 : .request_noerror(Method::POST, &uri, IngestAuxFilesRequest { aux_files })
653 0 : .await?;
654 0 : match resp.status() {
655 0 : StatusCode::OK => Ok(true),
656 0 : status => Err(match resp.json::<HttpErrorBody>().await {
657 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
658 : Err(_) => {
659 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
660 : }
661 : }),
662 : }
663 0 : }
664 :
665 0 : pub async fn list_aux_files(
666 0 : &self,
667 0 : tenant_shard_id: TenantShardId,
668 0 : timeline_id: TimelineId,
669 0 : lsn: Lsn,
670 0 : ) -> Result<HashMap<String, Bytes>> {
671 0 : let uri = format!(
672 0 : "{}/v1/tenant/{}/timeline/{}/list_aux_files",
673 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
674 0 : );
675 0 : let resp = self
676 0 : .request_noerror(Method::POST, &uri, ListAuxFilesRequest { lsn })
677 0 : .await?;
678 0 : match resp.status() {
679 : StatusCode::OK => {
680 0 : let resp: HashMap<String, Bytes> = resp.json().await.map_err(|e| {
681 0 : Error::ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{e}"))
682 0 : })?;
683 0 : Ok(resp)
684 : }
685 0 : status => Err(match resp.json::<HttpErrorBody>().await {
686 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
687 : Err(_) => {
688 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
689 : }
690 : }),
691 : }
692 0 : }
693 :
694 0 : pub async fn import_basebackup(
695 0 : &self,
696 0 : tenant_id: TenantId,
697 0 : timeline_id: TimelineId,
698 0 : base_lsn: Lsn,
699 0 : end_lsn: Lsn,
700 0 : pg_version: u32,
701 0 : basebackup_tarball: ReqwestBody,
702 0 : ) -> Result<()> {
703 0 : let uri = format!(
704 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_basebackup?base_lsn={base_lsn}&end_lsn={end_lsn}&pg_version={pg_version}",
705 0 : self.mgmt_api_endpoint,
706 0 : );
707 0 : self.start_request(Method::PUT, uri)
708 0 : .body(basebackup_tarball)
709 0 : .send()
710 0 : .await
711 0 : .map_err(Error::SendRequest)?
712 0 : .error_from_body()
713 0 : .await?
714 0 : .json()
715 0 : .await
716 0 : .map_err(Error::ReceiveBody)
717 0 : }
718 :
719 0 : pub async fn import_wal(
720 0 : &self,
721 0 : tenant_id: TenantId,
722 0 : timeline_id: TimelineId,
723 0 : start_lsn: Lsn,
724 0 : end_lsn: Lsn,
725 0 : wal_tarball: ReqwestBody,
726 0 : ) -> Result<()> {
727 0 : let uri = format!(
728 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_wal?start_lsn={start_lsn}&end_lsn={end_lsn}",
729 0 : self.mgmt_api_endpoint,
730 0 : );
731 0 : self.start_request(Method::PUT, uri)
732 0 : .body(wal_tarball)
733 0 : .send()
734 0 : .await
735 0 : .map_err(Error::SendRequest)?
736 0 : .error_from_body()
737 0 : .await?
738 0 : .json()
739 0 : .await
740 0 : .map_err(Error::ReceiveBody)
741 0 : }
742 :
743 0 : pub async fn timeline_init_lsn_lease(
744 0 : &self,
745 0 : tenant_shard_id: TenantShardId,
746 0 : timeline_id: TimelineId,
747 0 : lsn: Lsn,
748 0 : ) -> Result<LsnLease> {
749 0 : let uri = format!(
750 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/lsn_lease",
751 0 : self.mgmt_api_endpoint,
752 0 : );
753 0 :
754 0 : self.request(Method::POST, &uri, LsnLeaseRequest { lsn })
755 0 : .await?
756 0 : .json()
757 0 : .await
758 0 : .map_err(Error::ReceiveBody)
759 0 : }
760 : }
|