Line data Source code
1 : use std::collections::HashMap;
2 : use std::error::Error as _;
3 : use std::time::Duration;
4 :
5 : use bytes::Bytes;
6 : use detach_ancestor::AncestorDetached;
7 : use http_utils::error::HttpErrorBody;
8 : use pageserver_api::models::*;
9 : use pageserver_api::shard::TenantShardId;
10 : pub use reqwest::Body as ReqwestBody;
11 : use reqwest::{IntoUrl, Method, StatusCode, Url};
12 : use utils::id::{TenantId, TimelineId};
13 : use utils::lsn::Lsn;
14 :
15 : use crate::BlockUnblock;
16 :
17 : pub mod util;
18 :
19 : #[derive(Debug, Clone)]
20 : pub struct Client {
21 : mgmt_api_endpoint: String,
22 : authorization_header: Option<String>,
23 : client: reqwest::Client,
24 : }
25 :
26 : #[derive(thiserror::Error, Debug)]
27 : pub enum Error {
28 0 : #[error("send request: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
29 : SendRequest(reqwest::Error),
30 :
31 0 : #[error("receive body: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
32 : ReceiveBody(reqwest::Error),
33 :
34 : #[error("receive error body: {0}")]
35 : ReceiveErrorBody(String),
36 :
37 : #[error("pageserver API: {1}")]
38 : ApiError(StatusCode, String),
39 :
40 : #[error("Cancelled")]
41 : Cancelled,
42 :
43 : #[error("request timed out: {0}")]
44 : Timeout(String),
45 : }
46 :
47 : pub type Result<T> = std::result::Result<T, Error>;
48 :
49 : pub trait ResponseErrorMessageExt: Sized {
50 : fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
51 : }
52 :
53 : impl ResponseErrorMessageExt for reqwest::Response {
54 0 : async fn error_from_body(self) -> Result<Self> {
55 0 : let status = self.status();
56 0 : if !(status.is_client_error() || status.is_server_error()) {
57 0 : return Ok(self);
58 0 : }
59 0 :
60 0 : let url = self.url().to_owned();
61 0 : Err(match self.json::<HttpErrorBody>().await {
62 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
63 : Err(_) => {
64 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), url))
65 : }
66 : })
67 0 : }
68 : }
69 :
70 : pub enum ForceAwaitLogicalSize {
71 : Yes,
72 : No,
73 : }
74 :
75 : impl Client {
76 0 : pub fn new(client: reqwest::Client, mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
77 0 : Self {
78 0 : mgmt_api_endpoint,
79 0 : authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
80 0 : client,
81 0 : }
82 0 : }
83 :
84 0 : pub async fn list_tenants(&self) -> Result<Vec<pageserver_api::models::TenantInfo>> {
85 0 : let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
86 0 : let resp = self.get(&uri).await?;
87 0 : resp.json().await.map_err(Error::ReceiveBody)
88 0 : }
89 :
90 : /// Send an HTTP request to an arbitrary path with a desired HTTP method and returning a streaming
91 : /// Response. This function is suitable for pass-through/proxy use cases where we don't care
92 : /// what the response content looks like.
93 : ///
94 : /// Use/add one of the properly typed methods below if you know aren't proxying, and
95 : /// know what kind of response you expect.
96 0 : pub async fn op_raw(&self, method: Method, path: String) -> Result<reqwest::Response> {
97 0 : debug_assert!(path.starts_with('/'));
98 0 : let uri = format!("{}{}", self.mgmt_api_endpoint, path);
99 0 :
100 0 : let mut req = self.client.request(method, uri);
101 0 : if let Some(value) = &self.authorization_header {
102 0 : req = req.header(reqwest::header::AUTHORIZATION, value);
103 0 : }
104 0 : req.send().await.map_err(Error::ReceiveBody)
105 0 : }
106 :
107 0 : pub async fn tenant_details(
108 0 : &self,
109 0 : tenant_shard_id: TenantShardId,
110 0 : ) -> Result<pageserver_api::models::TenantDetails> {
111 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
112 0 : self.get(uri)
113 0 : .await?
114 0 : .json()
115 0 : .await
116 0 : .map_err(Error::ReceiveBody)
117 0 : }
118 :
119 0 : pub async fn list_timelines(
120 0 : &self,
121 0 : tenant_shard_id: TenantShardId,
122 0 : ) -> Result<Vec<pageserver_api::models::TimelineInfo>> {
123 0 : let uri = format!(
124 0 : "{}/v1/tenant/{tenant_shard_id}/timeline",
125 0 : self.mgmt_api_endpoint
126 0 : );
127 0 : self.get(&uri)
128 0 : .await?
129 0 : .json()
130 0 : .await
131 0 : .map_err(Error::ReceiveBody)
132 0 : }
133 :
134 0 : pub async fn timeline_info(
135 0 : &self,
136 0 : tenant_shard_id: TenantShardId,
137 0 : timeline_id: TimelineId,
138 0 : force_await_logical_size: ForceAwaitLogicalSize,
139 0 : ) -> Result<pageserver_api::models::TimelineInfo> {
140 0 : let uri = format!(
141 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
142 0 : self.mgmt_api_endpoint
143 0 : );
144 :
145 0 : let uri = match force_await_logical_size {
146 0 : ForceAwaitLogicalSize::Yes => format!("{}?force-await-logical-size={}", uri, true),
147 0 : ForceAwaitLogicalSize::No => uri,
148 : };
149 :
150 0 : self.get(&uri)
151 0 : .await?
152 0 : .json()
153 0 : .await
154 0 : .map_err(Error::ReceiveBody)
155 0 : }
156 :
157 0 : pub async fn keyspace(
158 0 : &self,
159 0 : tenant_shard_id: TenantShardId,
160 0 : timeline_id: TimelineId,
161 0 : ) -> Result<pageserver_api::models::partitioning::Partitioning> {
162 0 : let uri = format!(
163 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/keyspace",
164 0 : self.mgmt_api_endpoint
165 0 : );
166 0 : self.get(&uri)
167 0 : .await?
168 0 : .json()
169 0 : .await
170 0 : .map_err(Error::ReceiveBody)
171 0 : }
172 :
173 0 : async fn get<U: IntoUrl>(&self, uri: U) -> Result<reqwest::Response> {
174 0 : self.request(Method::GET, uri, ()).await
175 0 : }
176 :
177 0 : fn start_request<U: reqwest::IntoUrl>(
178 0 : &self,
179 0 : method: Method,
180 0 : uri: U,
181 0 : ) -> reqwest::RequestBuilder {
182 0 : let req = self.client.request(method, uri);
183 0 : if let Some(value) = &self.authorization_header {
184 0 : req.header(reqwest::header::AUTHORIZATION, value)
185 : } else {
186 0 : req
187 : }
188 0 : }
189 :
190 0 : async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
191 0 : &self,
192 0 : method: Method,
193 0 : uri: U,
194 0 : body: B,
195 0 : ) -> Result<reqwest::Response> {
196 0 : self.start_request(method, uri)
197 0 : .json(&body)
198 0 : .send()
199 0 : .await
200 0 : .map_err(Error::ReceiveBody)
201 0 : }
202 :
203 0 : async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
204 0 : &self,
205 0 : method: Method,
206 0 : uri: U,
207 0 : body: B,
208 0 : ) -> Result<reqwest::Response> {
209 0 : let res = self.request_noerror(method, uri, body).await?;
210 0 : let response = res.error_from_body().await?;
211 0 : Ok(response)
212 0 : }
213 :
214 0 : pub async fn status(&self) -> Result<()> {
215 0 : let uri = format!("{}/v1/status", self.mgmt_api_endpoint);
216 0 : self.get(&uri).await?;
217 0 : Ok(())
218 0 : }
219 :
220 : /// The tenant deletion API can return 202 if deletion is incomplete, or
221 : /// 404 if it is complete. Callers are responsible for checking the status
222 : /// code and retrying. Error codes other than 404 will return Err().
223 0 : pub async fn tenant_delete(&self, tenant_shard_id: TenantShardId) -> Result<StatusCode> {
224 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
225 0 :
226 0 : match self.request(Method::DELETE, &uri, ()).await {
227 0 : Err(Error::ApiError(status_code, msg)) => {
228 0 : if status_code == StatusCode::NOT_FOUND {
229 0 : Ok(StatusCode::NOT_FOUND)
230 : } else {
231 0 : Err(Error::ApiError(status_code, msg))
232 : }
233 : }
234 0 : Err(e) => Err(e),
235 0 : Ok(response) => Ok(response.status()),
236 : }
237 0 : }
238 :
239 0 : pub async fn tenant_time_travel_remote_storage(
240 0 : &self,
241 0 : tenant_shard_id: TenantShardId,
242 0 : timestamp: &str,
243 0 : done_if_after: &str,
244 0 : ) -> Result<()> {
245 0 : let uri = format!(
246 0 : "{}/v1/tenant/{tenant_shard_id}/time_travel_remote_storage?travel_to={timestamp}&done_if_after={done_if_after}",
247 0 : self.mgmt_api_endpoint
248 0 : );
249 0 : self.request(Method::PUT, &uri, ()).await?;
250 0 : Ok(())
251 0 : }
252 :
253 0 : pub async fn tenant_scan_remote_storage(
254 0 : &self,
255 0 : tenant_id: TenantId,
256 0 : ) -> Result<TenantScanRemoteStorageResponse> {
257 0 : let uri = format!(
258 0 : "{}/v1/tenant/{tenant_id}/scan_remote_storage",
259 0 : self.mgmt_api_endpoint
260 0 : );
261 0 : let response = self.request(Method::GET, &uri, ()).await?;
262 0 : let body = response.json().await.map_err(Error::ReceiveBody)?;
263 0 : Ok(body)
264 0 : }
265 :
266 0 : pub async fn set_tenant_config(&self, req: &TenantConfigRequest) -> Result<()> {
267 0 : let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
268 0 : self.request(Method::PUT, &uri, req).await?;
269 0 : Ok(())
270 0 : }
271 :
272 0 : pub async fn patch_tenant_config(&self, req: &TenantConfigPatchRequest) -> Result<()> {
273 0 : let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
274 0 : self.request(Method::PATCH, &uri, req).await?;
275 0 : Ok(())
276 0 : }
277 :
278 0 : pub async fn tenant_secondary_download(
279 0 : &self,
280 0 : tenant_id: TenantShardId,
281 0 : wait: Option<std::time::Duration>,
282 0 : ) -> Result<(StatusCode, SecondaryProgress)> {
283 0 : let mut path = reqwest::Url::parse(&format!(
284 0 : "{}/v1/tenant/{}/secondary/download",
285 0 : self.mgmt_api_endpoint, tenant_id
286 0 : ))
287 0 : .expect("Cannot build URL");
288 :
289 0 : if let Some(wait) = wait {
290 0 : path.query_pairs_mut()
291 0 : .append_pair("wait_ms", &format!("{}", wait.as_millis()));
292 0 : }
293 :
294 0 : let response = self.request(Method::POST, path, ()).await?;
295 0 : let status = response.status();
296 0 : let progress: SecondaryProgress = response.json().await.map_err(Error::ReceiveBody)?;
297 0 : Ok((status, progress))
298 0 : }
299 :
300 0 : pub async fn tenant_secondary_status(
301 0 : &self,
302 0 : tenant_shard_id: TenantShardId,
303 0 : ) -> Result<SecondaryProgress> {
304 0 : let path = reqwest::Url::parse(&format!(
305 0 : "{}/v1/tenant/{}/secondary/status",
306 0 : self.mgmt_api_endpoint, tenant_shard_id
307 0 : ))
308 0 : .expect("Cannot build URL");
309 0 :
310 0 : self.request(Method::GET, path, ())
311 0 : .await?
312 0 : .json()
313 0 : .await
314 0 : .map_err(Error::ReceiveBody)
315 0 : }
316 :
317 0 : pub async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
318 0 : let path = reqwest::Url::parse(&format!(
319 0 : "{}/v1/tenant/{}/heatmap_upload",
320 0 : self.mgmt_api_endpoint, tenant_id
321 0 : ))
322 0 : .expect("Cannot build URL");
323 0 :
324 0 : self.request(Method::POST, path, ()).await?;
325 0 : Ok(())
326 0 : }
327 :
328 0 : pub async fn location_config(
329 0 : &self,
330 0 : tenant_shard_id: TenantShardId,
331 0 : config: LocationConfig,
332 0 : flush_ms: Option<std::time::Duration>,
333 0 : lazy: bool,
334 0 : ) -> Result<()> {
335 0 : let req_body = TenantLocationConfigRequest { config };
336 0 :
337 0 : let mut path = reqwest::Url::parse(&format!(
338 0 : "{}/v1/tenant/{}/location_config",
339 0 : self.mgmt_api_endpoint, tenant_shard_id
340 0 : ))
341 0 : // Should always work: mgmt_api_endpoint is configuration, not user input.
342 0 : .expect("Cannot build URL");
343 0 :
344 0 : if lazy {
345 0 : path.query_pairs_mut().append_pair("lazy", "true");
346 0 : }
347 :
348 0 : if let Some(flush_ms) = flush_ms {
349 0 : path.query_pairs_mut()
350 0 : .append_pair("flush_ms", &format!("{}", flush_ms.as_millis()));
351 0 : }
352 :
353 0 : self.request(Method::PUT, path, &req_body).await?;
354 0 : Ok(())
355 0 : }
356 :
357 0 : pub async fn list_location_config(&self) -> Result<LocationConfigListResponse> {
358 0 : let path = format!("{}/v1/location_config", self.mgmt_api_endpoint);
359 0 : self.request(Method::GET, &path, ())
360 0 : .await?
361 0 : .json()
362 0 : .await
363 0 : .map_err(Error::ReceiveBody)
364 0 : }
365 :
366 0 : pub async fn get_location_config(
367 0 : &self,
368 0 : tenant_shard_id: TenantShardId,
369 0 : ) -> Result<Option<LocationConfig>> {
370 0 : let path = format!(
371 0 : "{}/v1/location_config/{tenant_shard_id}",
372 0 : self.mgmt_api_endpoint
373 0 : );
374 0 : self.request(Method::GET, &path, ())
375 0 : .await?
376 0 : .json()
377 0 : .await
378 0 : .map_err(Error::ReceiveBody)
379 0 : }
380 :
381 0 : pub async fn timeline_create(
382 0 : &self,
383 0 : tenant_shard_id: TenantShardId,
384 0 : req: &TimelineCreateRequest,
385 0 : ) -> Result<TimelineInfo> {
386 0 : let uri = format!(
387 0 : "{}/v1/tenant/{}/timeline",
388 0 : self.mgmt_api_endpoint, tenant_shard_id
389 0 : );
390 0 : self.request(Method::POST, &uri, req)
391 0 : .await?
392 0 : .json()
393 0 : .await
394 0 : .map_err(Error::ReceiveBody)
395 0 : }
396 :
397 : /// The timeline deletion API can return 201 if deletion is incomplete, or
398 : /// 403 if it is complete. Callers are responsible for checking the status
399 : /// code and retrying. Error codes other than 403 will return Err().
400 0 : pub async fn timeline_delete(
401 0 : &self,
402 0 : tenant_shard_id: TenantShardId,
403 0 : timeline_id: TimelineId,
404 0 : ) -> Result<StatusCode> {
405 0 : let uri = format!(
406 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
407 0 : self.mgmt_api_endpoint
408 0 : );
409 0 :
410 0 : match self.request(Method::DELETE, &uri, ()).await {
411 0 : Err(Error::ApiError(status_code, msg)) => {
412 0 : if status_code == StatusCode::NOT_FOUND {
413 0 : Ok(StatusCode::NOT_FOUND)
414 : } else {
415 0 : Err(Error::ApiError(status_code, msg))
416 : }
417 : }
418 0 : Err(e) => Err(e),
419 0 : Ok(response) => Ok(response.status()),
420 : }
421 0 : }
422 :
423 0 : pub async fn timeline_detail(
424 0 : &self,
425 0 : tenant_shard_id: TenantShardId,
426 0 : timeline_id: TimelineId,
427 0 : ) -> Result<TimelineInfo> {
428 0 : let uri = format!(
429 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
430 0 : self.mgmt_api_endpoint
431 0 : );
432 0 :
433 0 : self.request(Method::GET, &uri, ())
434 0 : .await?
435 0 : .json()
436 0 : .await
437 0 : .map_err(Error::ReceiveBody)
438 0 : }
439 :
440 0 : pub async fn timeline_archival_config(
441 0 : &self,
442 0 : tenant_shard_id: TenantShardId,
443 0 : timeline_id: TimelineId,
444 0 : req: &TimelineArchivalConfigRequest,
445 0 : ) -> Result<()> {
446 0 : let uri = format!(
447 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/archival_config",
448 0 : self.mgmt_api_endpoint
449 0 : );
450 0 :
451 0 : self.request(Method::PUT, &uri, req)
452 0 : .await?
453 0 : .json()
454 0 : .await
455 0 : .map_err(Error::ReceiveBody)
456 0 : }
457 :
458 0 : pub async fn timeline_detach_ancestor(
459 0 : &self,
460 0 : tenant_shard_id: TenantShardId,
461 0 : timeline_id: TimelineId,
462 0 : behavior: Option<DetachBehavior>,
463 0 : ) -> Result<AncestorDetached> {
464 0 : let uri = format!(
465 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/detach_ancestor",
466 0 : self.mgmt_api_endpoint
467 0 : );
468 0 : let mut uri = Url::parse(&uri)
469 0 : .map_err(|e| Error::ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{e}")))?;
470 :
471 0 : if let Some(behavior) = behavior {
472 0 : uri.query_pairs_mut()
473 0 : .append_pair("detach_behavior", &behavior.to_string());
474 0 : }
475 :
476 0 : self.request(Method::PUT, uri, ())
477 0 : .await?
478 0 : .json()
479 0 : .await
480 0 : .map_err(Error::ReceiveBody)
481 0 : }
482 :
483 0 : pub async fn timeline_block_unblock_gc(
484 0 : &self,
485 0 : tenant_shard_id: TenantShardId,
486 0 : timeline_id: TimelineId,
487 0 : dir: BlockUnblock,
488 0 : ) -> Result<()> {
489 0 : let uri = format!(
490 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/{dir}_gc",
491 0 : self.mgmt_api_endpoint,
492 0 : );
493 0 :
494 0 : self.request(Method::POST, &uri, ()).await.map(|_| ())
495 0 : }
496 :
497 0 : pub async fn timeline_download_heatmap_layers(
498 0 : &self,
499 0 : tenant_shard_id: TenantShardId,
500 0 : timeline_id: TimelineId,
501 0 : concurrency: Option<usize>,
502 0 : recurse: bool,
503 0 : ) -> Result<()> {
504 0 : let mut path = reqwest::Url::parse(&format!(
505 0 : "{}/v1/tenant/{}/timeline/{}/download_heatmap_layers",
506 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
507 0 : ))
508 0 : .expect("Cannot build URL");
509 0 :
510 0 : path.query_pairs_mut()
511 0 : .append_pair("recurse", &format!("{}", recurse));
512 :
513 0 : if let Some(concurrency) = concurrency {
514 0 : path.query_pairs_mut()
515 0 : .append_pair("concurrency", &format!("{}", concurrency));
516 0 : }
517 :
518 0 : self.request(Method::POST, path, ()).await.map(|_| ())
519 0 : }
520 :
521 0 : pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
522 0 : let uri = format!(
523 0 : "{}/v1/tenant/{}/reset",
524 0 : self.mgmt_api_endpoint, tenant_shard_id
525 0 : );
526 0 : self.request(Method::POST, &uri, ())
527 0 : .await?
528 0 : .json()
529 0 : .await
530 0 : .map_err(Error::ReceiveBody)
531 0 : }
532 :
533 0 : pub async fn tenant_shard_split(
534 0 : &self,
535 0 : tenant_shard_id: TenantShardId,
536 0 : req: TenantShardSplitRequest,
537 0 : ) -> Result<TenantShardSplitResponse> {
538 0 : let uri = format!(
539 0 : "{}/v1/tenant/{}/shard_split",
540 0 : self.mgmt_api_endpoint, tenant_shard_id
541 0 : );
542 0 : self.request(Method::PUT, &uri, req)
543 0 : .await?
544 0 : .json()
545 0 : .await
546 0 : .map_err(Error::ReceiveBody)
547 0 : }
548 :
549 0 : pub async fn timeline_list(
550 0 : &self,
551 0 : tenant_shard_id: &TenantShardId,
552 0 : ) -> Result<Vec<TimelineInfo>> {
553 0 : let uri = format!(
554 0 : "{}/v1/tenant/{}/timeline",
555 0 : self.mgmt_api_endpoint, tenant_shard_id
556 0 : );
557 0 : self.get(&uri)
558 0 : .await?
559 0 : .json()
560 0 : .await
561 0 : .map_err(Error::ReceiveBody)
562 0 : }
563 :
564 0 : pub async fn tenant_synthetic_size(
565 0 : &self,
566 0 : tenant_shard_id: TenantShardId,
567 0 : ) -> Result<TenantHistorySize> {
568 0 : let uri = format!(
569 0 : "{}/v1/tenant/{}/synthetic_size",
570 0 : self.mgmt_api_endpoint, tenant_shard_id
571 0 : );
572 0 : self.get(&uri)
573 0 : .await?
574 0 : .json()
575 0 : .await
576 0 : .map_err(Error::ReceiveBody)
577 0 : }
578 :
579 0 : pub async fn put_io_engine(
580 0 : &self,
581 0 : engine: &pageserver_api::models::virtual_file::IoEngineKind,
582 0 : ) -> Result<()> {
583 0 : let uri = format!("{}/v1/io_engine", self.mgmt_api_endpoint);
584 0 : self.request(Method::PUT, uri, engine)
585 0 : .await?
586 0 : .json()
587 0 : .await
588 0 : .map_err(Error::ReceiveBody)
589 0 : }
590 :
591 : /// Configs io mode at runtime.
592 0 : pub async fn put_io_mode(
593 0 : &self,
594 0 : mode: &pageserver_api::models::virtual_file::IoMode,
595 0 : ) -> Result<()> {
596 0 : let uri = format!("{}/v1/io_mode", self.mgmt_api_endpoint);
597 0 : self.request(Method::PUT, uri, mode)
598 0 : .await?
599 0 : .json()
600 0 : .await
601 0 : .map_err(Error::ReceiveBody)
602 0 : }
603 :
604 0 : pub async fn get_utilization(&self) -> Result<PageserverUtilization> {
605 0 : let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint);
606 0 : self.get(uri)
607 0 : .await?
608 0 : .json()
609 0 : .await
610 0 : .map_err(Error::ReceiveBody)
611 0 : }
612 :
613 0 : pub async fn top_tenant_shards(
614 0 : &self,
615 0 : request: TopTenantShardsRequest,
616 0 : ) -> Result<TopTenantShardsResponse> {
617 0 : let uri = format!("{}/v1/top_tenants", self.mgmt_api_endpoint);
618 0 : self.request(Method::POST, uri, request)
619 0 : .await?
620 0 : .json()
621 0 : .await
622 0 : .map_err(Error::ReceiveBody)
623 0 : }
624 :
625 0 : pub async fn layer_map_info(
626 0 : &self,
627 0 : tenant_shard_id: TenantShardId,
628 0 : timeline_id: TimelineId,
629 0 : ) -> Result<LayerMapInfo> {
630 0 : let uri = format!(
631 0 : "{}/v1/tenant/{}/timeline/{}/layer",
632 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id,
633 0 : );
634 0 : self.get(&uri)
635 0 : .await?
636 0 : .json()
637 0 : .await
638 0 : .map_err(Error::ReceiveBody)
639 0 : }
640 :
641 0 : pub async fn layer_evict(
642 0 : &self,
643 0 : tenant_shard_id: TenantShardId,
644 0 : timeline_id: TimelineId,
645 0 : layer_file_name: &str,
646 0 : ) -> Result<bool> {
647 0 : let uri = format!(
648 0 : "{}/v1/tenant/{}/timeline/{}/layer/{}",
649 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
650 0 : );
651 0 : let resp = self.request_noerror(Method::DELETE, &uri, ()).await?;
652 0 : match resp.status() {
653 0 : StatusCode::OK => Ok(true),
654 0 : StatusCode::NOT_MODIFIED => Ok(false),
655 : // TODO: dedupe this pattern / introduce separate error variant?
656 0 : status => Err(match resp.json::<HttpErrorBody>().await {
657 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
658 : Err(_) => {
659 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
660 : }
661 : }),
662 : }
663 0 : }
664 :
665 0 : pub async fn layer_ondemand_download(
666 0 : &self,
667 0 : tenant_shard_id: TenantShardId,
668 0 : timeline_id: TimelineId,
669 0 : layer_file_name: &str,
670 0 : ) -> Result<bool> {
671 0 : let uri = format!(
672 0 : "{}/v1/tenant/{}/timeline/{}/layer/{}",
673 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
674 0 : );
675 0 : let resp = self.request_noerror(Method::GET, &uri, ()).await?;
676 0 : match resp.status() {
677 0 : StatusCode::OK => Ok(true),
678 0 : StatusCode::NOT_MODIFIED => Ok(false),
679 : // TODO: dedupe this pattern / introduce separate error variant?
680 0 : status => Err(match resp.json::<HttpErrorBody>().await {
681 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
682 : Err(_) => {
683 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
684 : }
685 : }),
686 : }
687 0 : }
688 :
689 0 : pub async fn ingest_aux_files(
690 0 : &self,
691 0 : tenant_shard_id: TenantShardId,
692 0 : timeline_id: TimelineId,
693 0 : aux_files: HashMap<String, String>,
694 0 : ) -> Result<bool> {
695 0 : let uri = format!(
696 0 : "{}/v1/tenant/{}/timeline/{}/ingest_aux_files",
697 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
698 0 : );
699 0 : let resp = self
700 0 : .request_noerror(Method::POST, &uri, IngestAuxFilesRequest { aux_files })
701 0 : .await?;
702 0 : match resp.status() {
703 0 : StatusCode::OK => Ok(true),
704 0 : status => Err(match resp.json::<HttpErrorBody>().await {
705 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
706 : Err(_) => {
707 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
708 : }
709 : }),
710 : }
711 0 : }
712 :
713 0 : pub async fn list_aux_files(
714 0 : &self,
715 0 : tenant_shard_id: TenantShardId,
716 0 : timeline_id: TimelineId,
717 0 : lsn: Lsn,
718 0 : ) -> Result<HashMap<String, Bytes>> {
719 0 : let uri = format!(
720 0 : "{}/v1/tenant/{}/timeline/{}/list_aux_files",
721 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
722 0 : );
723 0 : let resp = self
724 0 : .request_noerror(Method::POST, &uri, ListAuxFilesRequest { lsn })
725 0 : .await?;
726 0 : match resp.status() {
727 : StatusCode::OK => {
728 0 : let resp: HashMap<String, Bytes> = resp.json().await.map_err(|e| {
729 0 : Error::ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{e}"))
730 0 : })?;
731 0 : Ok(resp)
732 : }
733 0 : status => Err(match resp.json::<HttpErrorBody>().await {
734 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
735 : Err(_) => {
736 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
737 : }
738 : }),
739 : }
740 0 : }
741 :
742 0 : pub async fn import_basebackup(
743 0 : &self,
744 0 : tenant_id: TenantId,
745 0 : timeline_id: TimelineId,
746 0 : base_lsn: Lsn,
747 0 : end_lsn: Lsn,
748 0 : pg_version: u32,
749 0 : basebackup_tarball: ReqwestBody,
750 0 : ) -> Result<()> {
751 0 : let uri = format!(
752 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_basebackup?base_lsn={base_lsn}&end_lsn={end_lsn}&pg_version={pg_version}",
753 0 : self.mgmt_api_endpoint,
754 0 : );
755 0 : self.start_request(Method::PUT, uri)
756 0 : .body(basebackup_tarball)
757 0 : .send()
758 0 : .await
759 0 : .map_err(Error::SendRequest)?
760 0 : .error_from_body()
761 0 : .await?
762 0 : .json()
763 0 : .await
764 0 : .map_err(Error::ReceiveBody)
765 0 : }
766 :
767 0 : pub async fn import_wal(
768 0 : &self,
769 0 : tenant_id: TenantId,
770 0 : timeline_id: TimelineId,
771 0 : start_lsn: Lsn,
772 0 : end_lsn: Lsn,
773 0 : wal_tarball: ReqwestBody,
774 0 : ) -> Result<()> {
775 0 : let uri = format!(
776 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_wal?start_lsn={start_lsn}&end_lsn={end_lsn}",
777 0 : self.mgmt_api_endpoint,
778 0 : );
779 0 : self.start_request(Method::PUT, uri)
780 0 : .body(wal_tarball)
781 0 : .send()
782 0 : .await
783 0 : .map_err(Error::SendRequest)?
784 0 : .error_from_body()
785 0 : .await?
786 0 : .json()
787 0 : .await
788 0 : .map_err(Error::ReceiveBody)
789 0 : }
790 :
791 0 : pub async fn timeline_init_lsn_lease(
792 0 : &self,
793 0 : tenant_shard_id: TenantShardId,
794 0 : timeline_id: TimelineId,
795 0 : lsn: Lsn,
796 0 : ) -> Result<LsnLease> {
797 0 : let uri = format!(
798 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/lsn_lease",
799 0 : self.mgmt_api_endpoint,
800 0 : );
801 0 :
802 0 : self.request(Method::POST, &uri, LsnLeaseRequest { lsn })
803 0 : .await?
804 0 : .json()
805 0 : .await
806 0 : .map_err(Error::ReceiveBody)
807 0 : }
808 :
809 0 : pub async fn wait_lsn(
810 0 : &self,
811 0 : tenant_shard_id: TenantShardId,
812 0 : request: TenantWaitLsnRequest,
813 0 : ) -> Result<StatusCode> {
814 0 : let uri = format!(
815 0 : "{}/v1/tenant/{tenant_shard_id}/wait_lsn",
816 0 : self.mgmt_api_endpoint,
817 0 : );
818 0 :
819 0 : self.request_noerror(Method::POST, uri, request)
820 0 : .await
821 0 : .map(|resp| resp.status())
822 0 : }
823 :
824 0 : pub async fn activate_post_import(
825 0 : &self,
826 0 : tenant_shard_id: TenantShardId,
827 0 : timeline_id: TimelineId,
828 0 : activate_timeline_timeout: Duration,
829 0 : ) -> Result<TimelineInfo> {
830 0 : let uri = format!(
831 0 : "{}/v1/tenant/{}/timeline/{}/activate_post_import?timeline_activate_timeout_ms={}",
832 0 : self.mgmt_api_endpoint,
833 0 : tenant_shard_id,
834 0 : timeline_id,
835 0 : activate_timeline_timeout.as_millis()
836 0 : );
837 0 :
838 0 : self.request(Method::PUT, uri, ())
839 0 : .await?
840 0 : .json()
841 0 : .await
842 0 : .map_err(Error::ReceiveBody)
843 0 : }
844 : }
|