Line data Source code
1 : use std::collections::{BTreeMap, HashMap};
2 : use std::error::Error as _;
3 : use std::time::Duration;
4 :
5 : use bytes::Bytes;
6 : use detach_ancestor::AncestorDetached;
7 : use http_utils::error::HttpErrorBody;
8 : use pageserver_api::models::*;
9 : use pageserver_api::shard::TenantShardId;
10 : use postgres_versioninfo::PgMajorVersion;
11 : pub use reqwest::Body as ReqwestBody;
12 : use reqwest::{IntoUrl, Method, StatusCode, Url};
13 : use utils::id::{TenantId, TimelineId};
14 : use utils::lsn::Lsn;
15 :
16 : use crate::BlockUnblock;
17 :
18 : pub mod util;
19 :
20 : #[derive(Debug, Clone)]
21 : pub struct Client {
22 : mgmt_api_endpoint: String,
23 : authorization_header: Option<String>,
24 : client: reqwest::Client,
25 : }
26 :
27 : #[derive(thiserror::Error, Debug)]
28 : pub enum Error {
29 0 : #[error("send request: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
30 : SendRequest(reqwest::Error),
31 :
32 0 : #[error("receive body: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
33 : ReceiveBody(reqwest::Error),
34 :
35 : #[error("receive error body: {0}")]
36 : ReceiveErrorBody(String),
37 :
38 : #[error("pageserver API: {1}")]
39 : ApiError(StatusCode, String),
40 :
41 : #[error("Cancelled")]
42 : Cancelled,
43 :
44 : #[error("request timed out: {0}")]
45 : Timeout(String),
46 : }
47 :
48 : pub type Result<T> = std::result::Result<T, Error>;
49 :
50 : pub trait ResponseErrorMessageExt: Sized {
51 : fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
52 : }
53 :
54 : impl ResponseErrorMessageExt for reqwest::Response {
55 0 : async fn error_from_body(self) -> Result<Self> {
56 0 : let status = self.status();
57 0 : if !(status.is_client_error() || status.is_server_error()) {
58 0 : return Ok(self);
59 0 : }
60 :
61 0 : let url = self.url().to_owned();
62 0 : Err(match self.json::<HttpErrorBody>().await {
63 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
64 : Err(_) => {
65 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), url))
66 : }
67 : })
68 0 : }
69 : }
70 :
71 : pub enum ForceAwaitLogicalSize {
72 : Yes,
73 : No,
74 : }
75 :
76 : impl Client {
77 0 : pub fn new(client: reqwest::Client, mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
78 : Self {
79 0 : mgmt_api_endpoint,
80 0 : authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
81 0 : client,
82 : }
83 0 : }
84 :
85 0 : pub async fn list_tenants(&self) -> Result<Vec<pageserver_api::models::TenantInfo>> {
86 0 : let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
87 0 : let resp = self.get(&uri).await?;
88 0 : resp.json().await.map_err(Error::ReceiveBody)
89 0 : }
90 :
91 : /// Send an HTTP request to an arbitrary path with a desired HTTP method and returning a streaming
92 : /// Response. This function is suitable for pass-through/proxy use cases where we don't care
93 : /// what the response content looks like.
94 : ///
95 : /// Use/add one of the properly typed methods below if you know aren't proxying, and
96 : /// know what kind of response you expect.
97 0 : pub async fn op_raw(&self, method: Method, path: String) -> Result<reqwest::Response> {
98 0 : debug_assert!(path.starts_with('/'));
99 0 : let uri = format!("{}{}", self.mgmt_api_endpoint, path);
100 :
101 0 : let mut req = self.client.request(method, uri);
102 0 : if let Some(value) = &self.authorization_header {
103 0 : req = req.header(reqwest::header::AUTHORIZATION, value);
104 0 : }
105 0 : req.send().await.map_err(Error::ReceiveBody)
106 0 : }
107 :
108 0 : pub async fn tenant_details(
109 0 : &self,
110 0 : tenant_shard_id: TenantShardId,
111 0 : ) -> Result<pageserver_api::models::TenantDetails> {
112 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
113 0 : self.get(uri)
114 0 : .await?
115 0 : .json()
116 0 : .await
117 0 : .map_err(Error::ReceiveBody)
118 0 : }
119 :
120 0 : pub async fn list_timelines(
121 0 : &self,
122 0 : tenant_shard_id: TenantShardId,
123 0 : ) -> Result<Vec<pageserver_api::models::TimelineInfo>> {
124 0 : let uri = format!(
125 0 : "{}/v1/tenant/{tenant_shard_id}/timeline",
126 : self.mgmt_api_endpoint
127 : );
128 0 : self.get(&uri)
129 0 : .await?
130 0 : .json()
131 0 : .await
132 0 : .map_err(Error::ReceiveBody)
133 0 : }
134 :
135 0 : pub async fn timeline_info(
136 0 : &self,
137 0 : tenant_shard_id: TenantShardId,
138 0 : timeline_id: TimelineId,
139 0 : force_await_logical_size: ForceAwaitLogicalSize,
140 0 : ) -> Result<pageserver_api::models::TimelineInfo> {
141 0 : let uri = format!(
142 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
143 : self.mgmt_api_endpoint
144 : );
145 :
146 0 : let uri = match force_await_logical_size {
147 0 : ForceAwaitLogicalSize::Yes => format!("{}?force-await-logical-size={}", uri, true),
148 0 : ForceAwaitLogicalSize::No => uri,
149 : };
150 :
151 0 : self.get(&uri)
152 0 : .await?
153 0 : .json()
154 0 : .await
155 0 : .map_err(Error::ReceiveBody)
156 0 : }
157 :
158 0 : pub async fn keyspace(
159 0 : &self,
160 0 : tenant_shard_id: TenantShardId,
161 0 : timeline_id: TimelineId,
162 0 : ) -> Result<pageserver_api::models::partitioning::Partitioning> {
163 0 : let uri = format!(
164 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/keyspace",
165 : self.mgmt_api_endpoint
166 : );
167 0 : self.get(&uri)
168 0 : .await?
169 0 : .json()
170 0 : .await
171 0 : .map_err(Error::ReceiveBody)
172 0 : }
173 :
174 0 : async fn get<U: IntoUrl>(&self, uri: U) -> Result<reqwest::Response> {
175 0 : self.request(Method::GET, uri, ()).await
176 0 : }
177 :
178 0 : fn start_request<U: reqwest::IntoUrl>(
179 0 : &self,
180 0 : method: Method,
181 0 : uri: U,
182 0 : ) -> reqwest::RequestBuilder {
183 0 : let req = self.client.request(method, uri);
184 0 : if let Some(value) = &self.authorization_header {
185 0 : req.header(reqwest::header::AUTHORIZATION, value)
186 : } else {
187 0 : req
188 : }
189 0 : }
190 :
191 0 : async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
192 0 : &self,
193 0 : method: Method,
194 0 : uri: U,
195 0 : body: B,
196 0 : ) -> Result<reqwest::Response> {
197 0 : self.start_request(method, uri)
198 0 : .json(&body)
199 0 : .send()
200 0 : .await
201 0 : .map_err(Error::ReceiveBody)
202 0 : }
203 :
204 0 : async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
205 0 : &self,
206 0 : method: Method,
207 0 : uri: U,
208 0 : body: B,
209 0 : ) -> Result<reqwest::Response> {
210 0 : let res = self.request_noerror(method, uri, body).await?;
211 0 : let response = res.error_from_body().await?;
212 0 : Ok(response)
213 0 : }
214 :
215 0 : pub async fn status(&self) -> Result<()> {
216 0 : let uri = format!("{}/v1/status", self.mgmt_api_endpoint);
217 0 : self.get(&uri).await?;
218 0 : Ok(())
219 0 : }
220 :
221 : /// The tenant deletion API can return 202 if deletion is incomplete, or
222 : /// 404 if it is complete. Callers are responsible for checking the status
223 : /// code and retrying. Error codes other than 404 will return Err().
224 0 : pub async fn tenant_delete(&self, tenant_shard_id: TenantShardId) -> Result<StatusCode> {
225 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
226 :
227 0 : match self.request(Method::DELETE, &uri, ()).await {
228 0 : Err(Error::ApiError(status_code, msg)) => {
229 0 : if status_code == StatusCode::NOT_FOUND {
230 0 : Ok(StatusCode::NOT_FOUND)
231 : } else {
232 0 : Err(Error::ApiError(status_code, msg))
233 : }
234 : }
235 0 : Err(e) => Err(e),
236 0 : Ok(response) => Ok(response.status()),
237 : }
238 0 : }
239 :
240 0 : pub async fn tenant_time_travel_remote_storage(
241 0 : &self,
242 0 : tenant_shard_id: TenantShardId,
243 0 : timestamp: &str,
244 0 : done_if_after: &str,
245 0 : ) -> Result<()> {
246 0 : let uri = format!(
247 0 : "{}/v1/tenant/{tenant_shard_id}/time_travel_remote_storage?travel_to={timestamp}&done_if_after={done_if_after}",
248 : self.mgmt_api_endpoint
249 : );
250 0 : self.request(Method::PUT, &uri, ()).await?;
251 0 : Ok(())
252 0 : }
253 :
254 0 : pub async fn tenant_timeline_compact(
255 0 : &self,
256 0 : tenant_shard_id: TenantShardId,
257 0 : timeline_id: TimelineId,
258 0 : force_image_layer_creation: bool,
259 0 : must_force_image_layer_creation: bool,
260 0 : scheduled: bool,
261 0 : wait_until_done: bool,
262 0 : ) -> Result<()> {
263 0 : let mut path = reqwest::Url::parse(&format!(
264 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/compact",
265 0 : self.mgmt_api_endpoint
266 0 : ))
267 0 : .expect("Cannot build URL");
268 :
269 0 : if force_image_layer_creation {
270 0 : path.query_pairs_mut()
271 0 : .append_pair("force_image_layer_creation", "true");
272 0 : }
273 :
274 0 : if must_force_image_layer_creation {
275 0 : path.query_pairs_mut()
276 0 : .append_pair("must_force_image_layer_creation", "true");
277 0 : }
278 :
279 0 : if scheduled {
280 0 : path.query_pairs_mut().append_pair("scheduled", "true");
281 0 : }
282 0 : if wait_until_done {
283 0 : path.query_pairs_mut()
284 0 : .append_pair("wait_until_scheduled_compaction_done", "true");
285 0 : path.query_pairs_mut()
286 0 : .append_pair("wait_until_uploaded", "true");
287 0 : }
288 0 : self.request(Method::PUT, path, ()).await?;
289 0 : Ok(())
290 0 : }
291 :
292 : /* BEGIN_HADRON */
293 0 : pub async fn tenant_timeline_describe(
294 0 : &self,
295 0 : tenant_shard_id: &TenantShardId,
296 0 : timeline_id: &TimelineId,
297 0 : ) -> Result<TimelineInfo> {
298 0 : let mut path = reqwest::Url::parse(&format!(
299 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
300 0 : self.mgmt_api_endpoint
301 0 : ))
302 0 : .expect("Cannot build URL");
303 0 : path.query_pairs_mut()
304 0 : .append_pair("include-image-consistent-lsn", "true");
305 :
306 0 : let response: reqwest::Response = self.request(Method::GET, path, ()).await?;
307 0 : let body = response.json().await.map_err(Error::ReceiveBody)?;
308 0 : Ok(body)
309 0 : }
310 :
311 0 : pub async fn list_tenant_visible_size(&self) -> Result<BTreeMap<TenantShardId, u64>> {
312 0 : let uri = format!("{}/v1/list_tenant_visible_size", self.mgmt_api_endpoint);
313 0 : let resp = self.get(&uri).await?;
314 0 : resp.json().await.map_err(Error::ReceiveBody)
315 0 : }
316 : /* END_HADRON */
317 :
318 0 : pub async fn tenant_scan_remote_storage(
319 0 : &self,
320 0 : tenant_id: TenantId,
321 0 : ) -> Result<TenantScanRemoteStorageResponse> {
322 0 : let uri = format!(
323 0 : "{}/v1/tenant/{tenant_id}/scan_remote_storage",
324 : self.mgmt_api_endpoint
325 : );
326 0 : let response = self.request(Method::GET, &uri, ()).await?;
327 0 : let body = response.json().await.map_err(Error::ReceiveBody)?;
328 0 : Ok(body)
329 0 : }
330 :
331 0 : pub async fn set_tenant_config(&self, req: &TenantConfigRequest) -> Result<()> {
332 0 : let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
333 0 : self.request(Method::PUT, &uri, req).await?;
334 0 : Ok(())
335 0 : }
336 :
337 0 : pub async fn patch_tenant_config(&self, req: &TenantConfigPatchRequest) -> Result<()> {
338 0 : let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
339 0 : self.request(Method::PATCH, &uri, req).await?;
340 0 : Ok(())
341 0 : }
342 :
343 0 : pub async fn tenant_secondary_download(
344 0 : &self,
345 0 : tenant_id: TenantShardId,
346 0 : wait: Option<std::time::Duration>,
347 0 : ) -> Result<(StatusCode, SecondaryProgress)> {
348 0 : let mut path = reqwest::Url::parse(&format!(
349 0 : "{}/v1/tenant/{}/secondary/download",
350 0 : self.mgmt_api_endpoint, tenant_id
351 0 : ))
352 0 : .expect("Cannot build URL");
353 :
354 0 : if let Some(wait) = wait {
355 0 : path.query_pairs_mut()
356 0 : .append_pair("wait_ms", &format!("{}", wait.as_millis()));
357 0 : }
358 :
359 0 : let response = self.request(Method::POST, path, ()).await?;
360 0 : let status = response.status();
361 0 : let progress: SecondaryProgress = response.json().await.map_err(Error::ReceiveBody)?;
362 0 : Ok((status, progress))
363 0 : }
364 :
365 0 : pub async fn tenant_secondary_status(
366 0 : &self,
367 0 : tenant_shard_id: TenantShardId,
368 0 : ) -> Result<SecondaryProgress> {
369 0 : let path = reqwest::Url::parse(&format!(
370 0 : "{}/v1/tenant/{}/secondary/status",
371 0 : self.mgmt_api_endpoint, tenant_shard_id
372 0 : ))
373 0 : .expect("Cannot build URL");
374 :
375 0 : self.request(Method::GET, path, ())
376 0 : .await?
377 0 : .json()
378 0 : .await
379 0 : .map_err(Error::ReceiveBody)
380 0 : }
381 :
382 0 : pub async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
383 0 : let path = reqwest::Url::parse(&format!(
384 0 : "{}/v1/tenant/{}/heatmap_upload",
385 0 : self.mgmt_api_endpoint, tenant_id
386 0 : ))
387 0 : .expect("Cannot build URL");
388 :
389 0 : self.request(Method::POST, path, ()).await?;
390 0 : Ok(())
391 0 : }
392 :
393 0 : pub async fn location_config(
394 0 : &self,
395 0 : tenant_shard_id: TenantShardId,
396 0 : config: LocationConfig,
397 0 : flush_ms: Option<std::time::Duration>,
398 0 : lazy: bool,
399 0 : ) -> Result<()> {
400 0 : let req_body = TenantLocationConfigRequest { config };
401 :
402 0 : let mut path = reqwest::Url::parse(&format!(
403 0 : "{}/v1/tenant/{}/location_config",
404 0 : self.mgmt_api_endpoint, tenant_shard_id
405 0 : ))
406 : // Should always work: mgmt_api_endpoint is configuration, not user input.
407 0 : .expect("Cannot build URL");
408 :
409 0 : if lazy {
410 0 : path.query_pairs_mut().append_pair("lazy", "true");
411 0 : }
412 :
413 0 : if let Some(flush_ms) = flush_ms {
414 0 : path.query_pairs_mut()
415 0 : .append_pair("flush_ms", &format!("{}", flush_ms.as_millis()));
416 0 : }
417 :
418 0 : self.request(Method::PUT, path, &req_body).await?;
419 0 : Ok(())
420 0 : }
421 :
422 0 : pub async fn list_location_config(&self) -> Result<LocationConfigListResponse> {
423 0 : let path = format!("{}/v1/location_config", self.mgmt_api_endpoint);
424 0 : self.request(Method::GET, &path, ())
425 0 : .await?
426 0 : .json()
427 0 : .await
428 0 : .map_err(Error::ReceiveBody)
429 0 : }
430 :
431 0 : pub async fn get_location_config(
432 0 : &self,
433 0 : tenant_shard_id: TenantShardId,
434 0 : ) -> Result<Option<LocationConfig>> {
435 0 : let path = format!(
436 0 : "{}/v1/location_config/{tenant_shard_id}",
437 : self.mgmt_api_endpoint
438 : );
439 0 : self.request(Method::GET, &path, ())
440 0 : .await?
441 0 : .json()
442 0 : .await
443 0 : .map_err(Error::ReceiveBody)
444 0 : }
445 :
446 0 : pub async fn timeline_create(
447 0 : &self,
448 0 : tenant_shard_id: TenantShardId,
449 0 : req: &TimelineCreateRequest,
450 0 : ) -> Result<TimelineInfo> {
451 0 : let uri = format!(
452 0 : "{}/v1/tenant/{}/timeline",
453 : self.mgmt_api_endpoint, tenant_shard_id
454 : );
455 0 : self.request(Method::POST, &uri, req)
456 0 : .await?
457 0 : .json()
458 0 : .await
459 0 : .map_err(Error::ReceiveBody)
460 0 : }
461 :
462 : /// The timeline deletion API can return 201 if deletion is incomplete, or
463 : /// 403 if it is complete. Callers are responsible for checking the status
464 : /// code and retrying. Error codes other than 403 will return Err().
465 0 : pub async fn timeline_delete(
466 0 : &self,
467 0 : tenant_shard_id: TenantShardId,
468 0 : timeline_id: TimelineId,
469 0 : ) -> Result<StatusCode> {
470 0 : let uri = format!(
471 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
472 : self.mgmt_api_endpoint
473 : );
474 :
475 0 : match self.request(Method::DELETE, &uri, ()).await {
476 0 : Err(Error::ApiError(status_code, msg)) => {
477 0 : if status_code == StatusCode::NOT_FOUND {
478 0 : Ok(StatusCode::NOT_FOUND)
479 : } else {
480 0 : Err(Error::ApiError(status_code, msg))
481 : }
482 : }
483 0 : Err(e) => Err(e),
484 0 : Ok(response) => Ok(response.status()),
485 : }
486 0 : }
487 :
488 0 : pub async fn timeline_detail(
489 0 : &self,
490 0 : tenant_shard_id: TenantShardId,
491 0 : timeline_id: TimelineId,
492 0 : ) -> Result<TimelineInfo> {
493 0 : let uri = format!(
494 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
495 : self.mgmt_api_endpoint
496 : );
497 :
498 0 : self.request(Method::GET, &uri, ())
499 0 : .await?
500 0 : .json()
501 0 : .await
502 0 : .map_err(Error::ReceiveBody)
503 0 : }
504 :
505 0 : pub async fn timeline_archival_config(
506 0 : &self,
507 0 : tenant_shard_id: TenantShardId,
508 0 : timeline_id: TimelineId,
509 0 : req: &TimelineArchivalConfigRequest,
510 0 : ) -> Result<()> {
511 0 : let uri = format!(
512 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/archival_config",
513 : self.mgmt_api_endpoint
514 : );
515 :
516 0 : self.request(Method::PUT, &uri, req)
517 0 : .await?
518 0 : .json()
519 0 : .await
520 0 : .map_err(Error::ReceiveBody)
521 0 : }
522 :
523 0 : pub async fn timeline_detach_ancestor(
524 0 : &self,
525 0 : tenant_shard_id: TenantShardId,
526 0 : timeline_id: TimelineId,
527 0 : behavior: Option<DetachBehavior>,
528 0 : ) -> Result<AncestorDetached> {
529 0 : let uri = format!(
530 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/detach_ancestor",
531 : self.mgmt_api_endpoint
532 : );
533 0 : let mut uri = Url::parse(&uri)
534 0 : .map_err(|e| Error::ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{e}")))?;
535 :
536 0 : if let Some(behavior) = behavior {
537 0 : uri.query_pairs_mut()
538 0 : .append_pair("detach_behavior", &behavior.to_string());
539 0 : }
540 :
541 0 : self.request(Method::PUT, uri, ())
542 0 : .await?
543 0 : .json()
544 0 : .await
545 0 : .map_err(Error::ReceiveBody)
546 0 : }
547 :
548 0 : pub async fn timeline_block_unblock_gc(
549 0 : &self,
550 0 : tenant_shard_id: TenantShardId,
551 0 : timeline_id: TimelineId,
552 0 : dir: BlockUnblock,
553 0 : ) -> Result<()> {
554 0 : let uri = format!(
555 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/{dir}_gc",
556 : self.mgmt_api_endpoint,
557 : );
558 :
559 0 : self.request(Method::POST, &uri, ()).await.map(|_| ())
560 0 : }
561 :
562 0 : pub async fn timeline_download_heatmap_layers(
563 0 : &self,
564 0 : tenant_shard_id: TenantShardId,
565 0 : timeline_id: TimelineId,
566 0 : concurrency: Option<usize>,
567 0 : recurse: bool,
568 0 : ) -> Result<()> {
569 0 : let mut path = reqwest::Url::parse(&format!(
570 0 : "{}/v1/tenant/{}/timeline/{}/download_heatmap_layers",
571 0 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
572 0 : ))
573 0 : .expect("Cannot build URL");
574 :
575 0 : path.query_pairs_mut()
576 0 : .append_pair("recurse", &format!("{recurse}"));
577 :
578 0 : if let Some(concurrency) = concurrency {
579 0 : path.query_pairs_mut()
580 0 : .append_pair("concurrency", &format!("{concurrency}"));
581 0 : }
582 :
583 0 : self.request(Method::POST, path, ()).await.map(|_| ())
584 0 : }
585 :
586 0 : pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
587 0 : let uri = format!(
588 0 : "{}/v1/tenant/{}/reset",
589 : self.mgmt_api_endpoint, tenant_shard_id
590 : );
591 0 : self.request(Method::POST, &uri, ())
592 0 : .await?
593 0 : .json()
594 0 : .await
595 0 : .map_err(Error::ReceiveBody)
596 0 : }
597 :
598 0 : pub async fn tenant_shard_split(
599 0 : &self,
600 0 : tenant_shard_id: TenantShardId,
601 0 : req: TenantShardSplitRequest,
602 0 : ) -> Result<TenantShardSplitResponse> {
603 0 : let uri = format!(
604 0 : "{}/v1/tenant/{}/shard_split",
605 : self.mgmt_api_endpoint, tenant_shard_id
606 : );
607 0 : self.request(Method::PUT, &uri, req)
608 0 : .await?
609 0 : .json()
610 0 : .await
611 0 : .map_err(Error::ReceiveBody)
612 0 : }
613 :
614 0 : pub async fn timeline_list(
615 0 : &self,
616 0 : tenant_shard_id: &TenantShardId,
617 0 : ) -> Result<Vec<TimelineInfo>> {
618 0 : let uri = format!(
619 0 : "{}/v1/tenant/{}/timeline",
620 : self.mgmt_api_endpoint, tenant_shard_id
621 : );
622 0 : self.get(&uri)
623 0 : .await?
624 0 : .json()
625 0 : .await
626 0 : .map_err(Error::ReceiveBody)
627 0 : }
628 :
629 0 : pub async fn tenant_synthetic_size(
630 0 : &self,
631 0 : tenant_shard_id: TenantShardId,
632 0 : ) -> Result<TenantHistorySize> {
633 0 : let uri = format!(
634 0 : "{}/v1/tenant/{}/synthetic_size",
635 : self.mgmt_api_endpoint, tenant_shard_id
636 : );
637 0 : self.get(&uri)
638 0 : .await?
639 0 : .json()
640 0 : .await
641 0 : .map_err(Error::ReceiveBody)
642 0 : }
643 :
644 0 : pub async fn put_io_engine(
645 0 : &self,
646 0 : engine: &pageserver_api::models::virtual_file::IoEngineKind,
647 0 : ) -> Result<()> {
648 0 : let uri = format!("{}/v1/io_engine", self.mgmt_api_endpoint);
649 0 : self.request(Method::PUT, uri, engine)
650 0 : .await?
651 0 : .json()
652 0 : .await
653 0 : .map_err(Error::ReceiveBody)
654 0 : }
655 :
656 : /// Configs io mode at runtime.
657 0 : pub async fn put_io_mode(
658 0 : &self,
659 0 : mode: &pageserver_api::models::virtual_file::IoMode,
660 0 : ) -> Result<()> {
661 0 : let uri = format!("{}/v1/io_mode", self.mgmt_api_endpoint);
662 0 : self.request(Method::PUT, uri, mode)
663 0 : .await?
664 0 : .json()
665 0 : .await
666 0 : .map_err(Error::ReceiveBody)
667 0 : }
668 :
669 0 : pub async fn get_utilization(&self) -> Result<PageserverUtilization> {
670 0 : let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint);
671 0 : self.get(uri)
672 0 : .await?
673 0 : .json()
674 0 : .await
675 0 : .map_err(Error::ReceiveBody)
676 0 : }
677 :
678 0 : pub async fn top_tenant_shards(
679 0 : &self,
680 0 : request: TopTenantShardsRequest,
681 0 : ) -> Result<TopTenantShardsResponse> {
682 0 : let uri = format!("{}/v1/top_tenants", self.mgmt_api_endpoint);
683 0 : self.request(Method::POST, uri, request)
684 0 : .await?
685 0 : .json()
686 0 : .await
687 0 : .map_err(Error::ReceiveBody)
688 0 : }
689 :
690 0 : pub async fn layer_map_info(
691 0 : &self,
692 0 : tenant_shard_id: TenantShardId,
693 0 : timeline_id: TimelineId,
694 0 : ) -> Result<LayerMapInfo> {
695 0 : let uri = format!(
696 0 : "{}/v1/tenant/{}/timeline/{}/layer",
697 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id,
698 : );
699 0 : self.get(&uri)
700 0 : .await?
701 0 : .json()
702 0 : .await
703 0 : .map_err(Error::ReceiveBody)
704 0 : }
705 :
706 0 : pub async fn layer_evict(
707 0 : &self,
708 0 : tenant_shard_id: TenantShardId,
709 0 : timeline_id: TimelineId,
710 0 : layer_file_name: &str,
711 0 : ) -> Result<bool> {
712 0 : let uri = format!(
713 0 : "{}/v1/tenant/{}/timeline/{}/layer/{}",
714 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
715 : );
716 0 : let resp = self.request_noerror(Method::DELETE, &uri, ()).await?;
717 0 : match resp.status() {
718 0 : StatusCode::OK => Ok(true),
719 0 : StatusCode::NOT_MODIFIED => Ok(false),
720 : // TODO: dedupe this pattern / introduce separate error variant?
721 0 : status => Err(match resp.json::<HttpErrorBody>().await {
722 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
723 : Err(_) => {
724 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
725 : }
726 : }),
727 : }
728 0 : }
729 :
730 0 : pub async fn layer_ondemand_download(
731 0 : &self,
732 0 : tenant_shard_id: TenantShardId,
733 0 : timeline_id: TimelineId,
734 0 : layer_file_name: &str,
735 0 : ) -> Result<bool> {
736 0 : let uri = format!(
737 0 : "{}/v1/tenant/{}/timeline/{}/layer/{}",
738 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
739 : );
740 0 : let resp = self.request_noerror(Method::GET, &uri, ()).await?;
741 0 : match resp.status() {
742 0 : StatusCode::OK => Ok(true),
743 0 : StatusCode::NOT_MODIFIED => Ok(false),
744 : // TODO: dedupe this pattern / introduce separate error variant?
745 0 : status => Err(match resp.json::<HttpErrorBody>().await {
746 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
747 : Err(_) => {
748 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
749 : }
750 : }),
751 : }
752 0 : }
753 :
754 0 : pub async fn ingest_aux_files(
755 0 : &self,
756 0 : tenant_shard_id: TenantShardId,
757 0 : timeline_id: TimelineId,
758 0 : aux_files: HashMap<String, String>,
759 0 : ) -> Result<bool> {
760 0 : let uri = format!(
761 0 : "{}/v1/tenant/{}/timeline/{}/ingest_aux_files",
762 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
763 : );
764 0 : let resp = self
765 0 : .request_noerror(Method::POST, &uri, IngestAuxFilesRequest { aux_files })
766 0 : .await?;
767 0 : match resp.status() {
768 0 : StatusCode::OK => Ok(true),
769 0 : status => Err(match resp.json::<HttpErrorBody>().await {
770 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
771 : Err(_) => {
772 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
773 : }
774 : }),
775 : }
776 0 : }
777 :
778 0 : pub async fn list_aux_files(
779 0 : &self,
780 0 : tenant_shard_id: TenantShardId,
781 0 : timeline_id: TimelineId,
782 0 : lsn: Lsn,
783 0 : ) -> Result<HashMap<String, Bytes>> {
784 0 : let uri = format!(
785 0 : "{}/v1/tenant/{}/timeline/{}/list_aux_files",
786 : self.mgmt_api_endpoint, tenant_shard_id, timeline_id
787 : );
788 0 : let resp = self
789 0 : .request_noerror(Method::POST, &uri, ListAuxFilesRequest { lsn })
790 0 : .await?;
791 0 : match resp.status() {
792 : StatusCode::OK => {
793 0 : let resp: HashMap<String, Bytes> = resp.json().await.map_err(|e| {
794 0 : Error::ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{e}"))
795 0 : })?;
796 0 : Ok(resp)
797 : }
798 0 : status => Err(match resp.json::<HttpErrorBody>().await {
799 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
800 : Err(_) => {
801 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
802 : }
803 : }),
804 : }
805 0 : }
806 :
807 0 : pub async fn import_basebackup(
808 0 : &self,
809 0 : tenant_id: TenantId,
810 0 : timeline_id: TimelineId,
811 0 : base_lsn: Lsn,
812 0 : end_lsn: Lsn,
813 0 : pg_version: PgMajorVersion,
814 0 : basebackup_tarball: ReqwestBody,
815 0 : ) -> Result<()> {
816 0 : let pg_version = pg_version.major_version_num();
817 :
818 0 : let uri = format!(
819 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_basebackup?base_lsn={base_lsn}&end_lsn={end_lsn}&pg_version={pg_version}",
820 : self.mgmt_api_endpoint,
821 : );
822 0 : self.start_request(Method::PUT, uri)
823 0 : .body(basebackup_tarball)
824 0 : .send()
825 0 : .await
826 0 : .map_err(Error::SendRequest)?
827 0 : .error_from_body()
828 0 : .await?
829 0 : .json()
830 0 : .await
831 0 : .map_err(Error::ReceiveBody)
832 0 : }
833 :
834 0 : pub async fn import_wal(
835 0 : &self,
836 0 : tenant_id: TenantId,
837 0 : timeline_id: TimelineId,
838 0 : start_lsn: Lsn,
839 0 : end_lsn: Lsn,
840 0 : wal_tarball: ReqwestBody,
841 0 : ) -> Result<()> {
842 0 : let uri = format!(
843 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_wal?start_lsn={start_lsn}&end_lsn={end_lsn}",
844 : self.mgmt_api_endpoint,
845 : );
846 0 : self.start_request(Method::PUT, uri)
847 0 : .body(wal_tarball)
848 0 : .send()
849 0 : .await
850 0 : .map_err(Error::SendRequest)?
851 0 : .error_from_body()
852 0 : .await?
853 0 : .json()
854 0 : .await
855 0 : .map_err(Error::ReceiveBody)
856 0 : }
857 :
858 0 : pub async fn timeline_init_lsn_lease(
859 0 : &self,
860 0 : tenant_shard_id: TenantShardId,
861 0 : timeline_id: TimelineId,
862 0 : lsn: Lsn,
863 0 : ) -> Result<LsnLease> {
864 0 : let uri = format!(
865 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/lsn_lease",
866 : self.mgmt_api_endpoint,
867 : );
868 :
869 0 : self.request(Method::POST, &uri, LsnLeaseRequest { lsn })
870 0 : .await?
871 0 : .json()
872 0 : .await
873 0 : .map_err(Error::ReceiveBody)
874 0 : }
875 :
876 0 : pub async fn wait_lsn(
877 0 : &self,
878 0 : tenant_shard_id: TenantShardId,
879 0 : request: TenantWaitLsnRequest,
880 0 : ) -> Result<StatusCode> {
881 0 : let uri = format!(
882 0 : "{}/v1/tenant/{tenant_shard_id}/wait_lsn",
883 : self.mgmt_api_endpoint,
884 : );
885 :
886 0 : self.request_noerror(Method::POST, uri, request)
887 0 : .await
888 0 : .map(|resp| resp.status())
889 0 : }
890 :
891 0 : pub async fn activate_post_import(
892 0 : &self,
893 0 : tenant_shard_id: TenantShardId,
894 0 : timeline_id: TimelineId,
895 0 : activate_timeline_timeout: Duration,
896 0 : ) -> Result<TimelineInfo> {
897 0 : let uri = format!(
898 0 : "{}/v1/tenant/{}/timeline/{}/activate_post_import?timeline_activate_timeout_ms={}",
899 : self.mgmt_api_endpoint,
900 : tenant_shard_id,
901 : timeline_id,
902 0 : activate_timeline_timeout.as_millis()
903 : );
904 :
905 0 : self.request(Method::PUT, uri, ())
906 0 : .await?
907 0 : .json()
908 0 : .await
909 0 : .map_err(Error::ReceiveBody)
910 0 : }
911 :
912 0 : pub async fn update_feature_flag_spec(&self, spec: String) -> Result<()> {
913 0 : let uri = format!("{}/v1/feature_flag_spec", self.mgmt_api_endpoint);
914 0 : self.request(Method::POST, uri, spec)
915 0 : .await?
916 0 : .json()
917 0 : .await
918 0 : .map_err(Error::ReceiveBody)
919 0 : }
920 : }
|