TLA Line data Source code
1 : use pageserver_api::{models::*, shard::TenantShardId};
2 : use reqwest::{IntoUrl, Method};
3 : use utils::{
4 : http::error::HttpErrorBody,
5 : id::{TenantId, TimelineId},
6 : };
7 :
8 : pub mod util;
9 :
10 UBC 0 : #[derive(Debug)]
11 : pub struct Client {
12 : mgmt_api_endpoint: String,
13 : authorization_header: Option<String>,
14 : client: reqwest::Client,
15 : }
16 :
17 CBC 7 : #[derive(thiserror::Error, Debug)]
18 : pub enum Error {
19 : #[error("receive body: {0}")]
20 : ReceiveBody(reqwest::Error),
21 :
22 : #[error("receive error body: {0}")]
23 : ReceiveErrorBody(String),
24 :
25 : #[error("pageserver API: {0}")]
26 : ApiError(String),
27 : }
28 :
29 : pub type Result<T> = std::result::Result<T, Error>;
30 :
31 : #[async_trait::async_trait]
32 : pub trait ResponseErrorMessageExt: Sized {
33 : async fn error_from_body(self) -> Result<Self>;
34 : }
35 :
36 : #[async_trait::async_trait]
37 : impl ResponseErrorMessageExt for reqwest::Response {
38 1752 : async fn error_from_body(mut self) -> Result<Self> {
39 1752 : let status = self.status();
40 1752 : if !(status.is_client_error() || status.is_server_error()) {
41 1745 : return Ok(self);
42 7 : }
43 7 :
44 7 : let url = self.url().to_owned();
45 7 : Err(match self.json::<HttpErrorBody>().await {
46 7 : Ok(HttpErrorBody { msg }) => Error::ApiError(msg),
47 : Err(_) => {
48 UBC 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), url))
49 : }
50 : })
51 CBC 3504 : }
52 : }
53 :
54 : impl Client {
55 6612 : pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
56 6612 : Self {
57 6612 : mgmt_api_endpoint,
58 6612 : authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
59 6612 : client: reqwest::Client::new(),
60 6612 : }
61 6612 : }
62 :
63 6 : pub async fn list_tenants(&self) -> Result<Vec<pageserver_api::models::TenantInfo>> {
64 6 : let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
65 24 : let resp = self.get(&uri).await?;
66 6 : resp.json().await.map_err(Error::ReceiveBody)
67 6 : }
68 :
69 UBC 0 : pub async fn tenant_details(
70 0 : &self,
71 0 : tenant_id: TenantId,
72 0 : ) -> Result<pageserver_api::models::TenantDetails> {
73 0 : let uri = format!("{}/v1/tenant/{tenant_id}", self.mgmt_api_endpoint);
74 0 : self.get(uri)
75 0 : .await?
76 0 : .json()
77 0 : .await
78 0 : .map_err(Error::ReceiveBody)
79 0 : }
80 :
81 CBC 19 : pub async fn list_timelines(
82 19 : &self,
83 19 : tenant_id: TenantId,
84 19 : ) -> Result<Vec<pageserver_api::models::TimelineInfo>> {
85 19 : let uri = format!("{}/v1/tenant/{tenant_id}/timeline", self.mgmt_api_endpoint);
86 19 : self.get(&uri)
87 76 : .await?
88 19 : .json()
89 UBC 0 : .await
90 CBC 19 : .map_err(Error::ReceiveBody)
91 19 : }
92 :
93 UBC 0 : pub async fn timeline_info(
94 0 : &self,
95 0 : tenant_id: TenantId,
96 0 : timeline_id: TimelineId,
97 0 : ) -> Result<pageserver_api::models::TimelineInfo> {
98 0 : let uri = format!(
99 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}",
100 0 : self.mgmt_api_endpoint
101 0 : );
102 0 : self.get(&uri)
103 0 : .await?
104 0 : .json()
105 0 : .await
106 0 : .map_err(Error::ReceiveBody)
107 0 : }
108 :
109 0 : pub async fn keyspace(
110 0 : &self,
111 0 : tenant_id: TenantId,
112 0 : timeline_id: TimelineId,
113 0 : ) -> Result<pageserver_api::models::partitioning::Partitioning> {
114 0 : let uri = format!(
115 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/keyspace",
116 0 : self.mgmt_api_endpoint
117 0 : );
118 0 : self.get(&uri)
119 0 : .await?
120 0 : .json()
121 0 : .await
122 0 : .map_err(Error::ReceiveBody)
123 0 : }
124 :
125 CBC 1146 : async fn get<U: IntoUrl>(&self, uri: U) -> Result<reqwest::Response> {
126 3456 : self.request(Method::GET, uri, ()).await
127 1146 : }
128 :
129 2316 : async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
130 2316 : &self,
131 2316 : method: Method,
132 2316 : uri: U,
133 2316 : body: B,
134 2316 : ) -> Result<reqwest::Response> {
135 2316 : let req = self.client.request(method, uri);
136 2316 : let req = if let Some(value) = &self.authorization_header {
137 56 : req.header(reqwest::header::AUTHORIZATION, value)
138 : } else {
139 2260 : req
140 : };
141 6915 : let res = req.json(&body).send().await.map_err(Error::ReceiveBody)?;
142 1752 : let response = res.error_from_body().await?;
143 1745 : Ok(response)
144 2316 : }
145 :
146 1121 : pub async fn status(&self) -> Result<()> {
147 1121 : let uri = format!("{}/v1/status", self.mgmt_api_endpoint);
148 3356 : self.get(&uri).await?;
149 557 : Ok(())
150 1121 : }
151 :
152 408 : pub async fn tenant_create(&self, req: &TenantCreateRequest) -> Result<TenantId> {
153 408 : let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
154 408 : self.request(Method::POST, &uri, req)
155 1632 : .await?
156 407 : .json()
157 UBC 0 : .await
158 CBC 407 : .map_err(Error::ReceiveBody)
159 408 : }
160 :
161 14 : pub async fn tenant_config(&self, req: &TenantConfigRequest) -> Result<()> {
162 14 : let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
163 56 : self.request(Method::PUT, &uri, req).await?;
164 14 : Ok(())
165 14 : }
166 :
167 UBC 0 : pub async fn tenant_secondary_download(&self, tenant_id: TenantShardId) -> Result<()> {
168 0 : let uri = format!(
169 0 : "{}/v1/tenant/{}/secondary/download",
170 0 : self.mgmt_api_endpoint, tenant_id
171 0 : );
172 0 : self.request(Method::POST, &uri, ())
173 0 : .await?
174 0 : .error_for_status()
175 0 : .map(|_| ())
176 0 : .map_err(|e| Error::ApiError(format!("{}", e)))
177 0 : }
178 :
179 0 : pub async fn location_config(
180 0 : &self,
181 0 : tenant_id: TenantId,
182 0 : config: LocationConfig,
183 0 : flush_ms: Option<std::time::Duration>,
184 0 : ) -> Result<()> {
185 0 : let req_body = TenantLocationConfigRequest { tenant_id, config };
186 0 : let path = format!(
187 0 : "{}/v1/tenant/{}/location_config",
188 0 : self.mgmt_api_endpoint, tenant_id
189 0 : );
190 0 : let path = if let Some(flush_ms) = flush_ms {
191 0 : format!("{}?flush_ms={}", path, flush_ms.as_millis())
192 : } else {
193 0 : path
194 : };
195 0 : self.request(Method::PUT, &path, &req_body).await?;
196 0 : Ok(())
197 0 : }
198 :
199 CBC 748 : pub async fn timeline_create(
200 748 : &self,
201 748 : tenant_id: TenantId,
202 748 : req: &TimelineCreateRequest,
203 748 : ) -> Result<TimelineInfo> {
204 748 : let uri = format!(
205 748 : "{}/v1/tenant/{}/timeline",
206 748 : self.mgmt_api_endpoint, tenant_id
207 748 : );
208 748 : self.request(Method::POST, &uri, req)
209 1771 : .await?
210 742 : .json()
211 UBC 0 : .await
212 CBC 742 : .map_err(Error::ReceiveBody)
213 748 : }
214 : }
|