Line data Source code
1 : use pageserver_api::{models::*, shard::TenantShardId};
2 : use reqwest::{IntoUrl, Method, StatusCode};
3 : use utils::{
4 : http::error::HttpErrorBody,
5 : id::{TenantId, TimelineId},
6 : };
7 :
8 : pub mod util;
9 :
10 0 : #[derive(Debug)]
11 : pub struct Client {
12 : mgmt_api_endpoint: String,
13 : authorization_header: Option<String>,
14 : client: reqwest::Client,
15 : }
16 :
17 0 : #[derive(thiserror::Error, Debug)]
18 : pub enum Error {
19 : #[error("receive body: {0}")]
20 : ReceiveBody(reqwest::Error),
21 :
22 : #[error("receive error body: {0}")]
23 : ReceiveErrorBody(String),
24 :
25 : #[error("pageserver API: {1}")]
26 : ApiError(StatusCode, String),
27 : }
28 :
29 : pub type Result<T> = std::result::Result<T, Error>;
30 :
31 : pub trait ResponseErrorMessageExt: Sized {
32 : fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
33 : }
34 :
35 : impl ResponseErrorMessageExt for reqwest::Response {
36 0 : async fn error_from_body(self) -> Result<Self> {
37 0 : let status = self.status();
38 0 : if !(status.is_client_error() || status.is_server_error()) {
39 0 : return Ok(self);
40 0 : }
41 0 :
42 0 : let url = self.url().to_owned();
43 0 : Err(match self.json::<HttpErrorBody>().await {
44 0 : Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
45 : Err(_) => {
46 0 : Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), url))
47 : }
48 : })
49 0 : }
50 : }
51 :
52 : pub enum ForceAwaitLogicalSize {
53 : Yes,
54 : No,
55 : }
56 :
57 : impl Client {
58 0 : pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
59 0 : Self::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt)
60 0 : }
61 :
62 0 : pub fn from_client(
63 0 : client: reqwest::Client,
64 0 : mgmt_api_endpoint: String,
65 0 : jwt: Option<&str>,
66 0 : ) -> Self {
67 0 : Self {
68 0 : mgmt_api_endpoint,
69 0 : authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
70 0 : client,
71 0 : }
72 0 : }
73 :
74 0 : pub async fn list_tenants(&self) -> Result<Vec<pageserver_api::models::TenantInfo>> {
75 0 : let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
76 0 : let resp = self.get(&uri).await?;
77 0 : resp.json().await.map_err(Error::ReceiveBody)
78 0 : }
79 :
80 : /// Get an arbitrary path and returning a streaming Response. This function is suitable
81 : /// for pass-through/proxy use cases where we don't care what the response content looks
82 : /// like.
83 : ///
84 : /// Use/add one of the properly typed methods below if you know aren't proxying, and
85 : /// know what kind of response you expect.
86 0 : pub async fn get_raw(&self, path: String) -> Result<reqwest::Response> {
87 0 : debug_assert!(path.starts_with('/'));
88 0 : let uri = format!("{}{}", self.mgmt_api_endpoint, path);
89 0 :
90 0 : let req = self.client.request(Method::GET, uri);
91 0 : let req = if let Some(value) = &self.authorization_header {
92 0 : req.header(reqwest::header::AUTHORIZATION, value)
93 : } else {
94 0 : req
95 : };
96 0 : req.send().await.map_err(Error::ReceiveBody)
97 0 : }
98 :
99 0 : pub async fn tenant_details(
100 0 : &self,
101 0 : tenant_shard_id: TenantShardId,
102 0 : ) -> Result<pageserver_api::models::TenantDetails> {
103 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
104 0 : self.get(uri)
105 0 : .await?
106 0 : .json()
107 0 : .await
108 0 : .map_err(Error::ReceiveBody)
109 0 : }
110 :
111 0 : pub async fn list_timelines(
112 0 : &self,
113 0 : tenant_shard_id: TenantShardId,
114 0 : ) -> Result<Vec<pageserver_api::models::TimelineInfo>> {
115 0 : let uri = format!(
116 0 : "{}/v1/tenant/{tenant_shard_id}/timeline",
117 0 : self.mgmt_api_endpoint
118 0 : );
119 0 : self.get(&uri)
120 0 : .await?
121 0 : .json()
122 0 : .await
123 0 : .map_err(Error::ReceiveBody)
124 0 : }
125 :
126 0 : pub async fn timeline_info(
127 0 : &self,
128 0 : tenant_id: TenantId,
129 0 : timeline_id: TimelineId,
130 0 : force_await_logical_size: ForceAwaitLogicalSize,
131 0 : ) -> Result<pageserver_api::models::TimelineInfo> {
132 0 : let uri = format!(
133 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}",
134 0 : self.mgmt_api_endpoint
135 0 : );
136 :
137 0 : let uri = match force_await_logical_size {
138 0 : ForceAwaitLogicalSize::Yes => format!("{}?force-await-logical-size={}", uri, true),
139 0 : ForceAwaitLogicalSize::No => uri,
140 : };
141 :
142 0 : self.get(&uri)
143 0 : .await?
144 0 : .json()
145 0 : .await
146 0 : .map_err(Error::ReceiveBody)
147 0 : }
148 :
149 0 : pub async fn keyspace(
150 0 : &self,
151 0 : tenant_id: TenantId,
152 0 : timeline_id: TimelineId,
153 0 : ) -> Result<pageserver_api::models::partitioning::Partitioning> {
154 0 : let uri = format!(
155 0 : "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/keyspace",
156 0 : self.mgmt_api_endpoint
157 0 : );
158 0 : self.get(&uri)
159 0 : .await?
160 0 : .json()
161 0 : .await
162 0 : .map_err(Error::ReceiveBody)
163 0 : }
164 :
165 0 : async fn get<U: IntoUrl>(&self, uri: U) -> Result<reqwest::Response> {
166 0 : self.request(Method::GET, uri, ()).await
167 0 : }
168 :
169 0 : async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
170 0 : &self,
171 0 : method: Method,
172 0 : uri: U,
173 0 : body: B,
174 0 : ) -> Result<reqwest::Response> {
175 0 : let req = self.client.request(method, uri);
176 0 : let req = if let Some(value) = &self.authorization_header {
177 0 : req.header(reqwest::header::AUTHORIZATION, value)
178 : } else {
179 0 : req
180 : };
181 0 : let res = req.json(&body).send().await.map_err(Error::ReceiveBody)?;
182 0 : let response = res.error_from_body().await?;
183 0 : Ok(response)
184 0 : }
185 :
186 0 : pub async fn status(&self) -> Result<()> {
187 0 : let uri = format!("{}/v1/status", self.mgmt_api_endpoint);
188 0 : self.get(&uri).await?;
189 0 : Ok(())
190 0 : }
191 :
192 0 : pub async fn tenant_create(&self, req: &TenantCreateRequest) -> Result<TenantId> {
193 0 : let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
194 0 : self.request(Method::POST, &uri, req)
195 0 : .await?
196 0 : .json()
197 0 : .await
198 0 : .map_err(Error::ReceiveBody)
199 0 : }
200 :
201 : /// The tenant deletion API can return 202 if deletion is incomplete, or
202 : /// 404 if it is complete. Callers are responsible for checking the status
203 : /// code and retrying. Error codes other than 404 will return Err().
204 0 : pub async fn tenant_delete(&self, tenant_shard_id: TenantShardId) -> Result<StatusCode> {
205 0 : let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
206 0 :
207 0 : match self.request(Method::DELETE, &uri, ()).await {
208 0 : Err(Error::ApiError(status_code, msg)) => {
209 0 : if status_code == StatusCode::NOT_FOUND {
210 0 : Ok(StatusCode::NOT_FOUND)
211 : } else {
212 0 : Err(Error::ApiError(status_code, msg))
213 : }
214 : }
215 0 : Err(e) => Err(e),
216 0 : Ok(response) => Ok(response.status()),
217 : }
218 0 : }
219 :
220 0 : pub async fn tenant_time_travel_remote_storage(
221 0 : &self,
222 0 : tenant_shard_id: TenantShardId,
223 0 : timestamp: &str,
224 0 : done_if_after: &str,
225 0 : ) -> Result<()> {
226 0 : let uri = format!(
227 0 : "{}/v1/tenant/{tenant_shard_id}/time_travel_remote_storage?travel_to={timestamp}&done_if_after={done_if_after}",
228 0 : self.mgmt_api_endpoint
229 0 : );
230 0 : self.request(Method::PUT, &uri, ()).await?;
231 0 : Ok(())
232 0 : }
233 :
234 0 : pub async fn tenant_config(&self, req: &TenantConfigRequest) -> Result<()> {
235 0 : let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
236 0 : self.request(Method::PUT, &uri, req).await?;
237 0 : Ok(())
238 0 : }
239 :
240 0 : pub async fn tenant_secondary_download(&self, tenant_id: TenantShardId) -> Result<()> {
241 0 : let uri = format!(
242 0 : "{}/v1/tenant/{}/secondary/download",
243 0 : self.mgmt_api_endpoint, tenant_id
244 0 : );
245 0 : self.request(Method::POST, &uri, ()).await?;
246 0 : Ok(())
247 0 : }
248 :
249 0 : pub async fn location_config(
250 0 : &self,
251 0 : tenant_shard_id: TenantShardId,
252 0 : config: LocationConfig,
253 0 : flush_ms: Option<std::time::Duration>,
254 0 : ) -> Result<()> {
255 0 : let req_body = TenantLocationConfigRequest {
256 0 : tenant_id: tenant_shard_id,
257 0 : config,
258 0 : };
259 0 : let path = format!(
260 0 : "{}/v1/tenant/{}/location_config",
261 0 : self.mgmt_api_endpoint, tenant_shard_id
262 0 : );
263 0 : let path = if let Some(flush_ms) = flush_ms {
264 0 : format!("{}?flush_ms={}", path, flush_ms.as_millis())
265 : } else {
266 0 : path
267 : };
268 0 : self.request(Method::PUT, &path, &req_body).await?;
269 0 : Ok(())
270 0 : }
271 :
272 0 : pub async fn list_location_config(&self) -> Result<LocationConfigListResponse> {
273 0 : let path = format!("{}/v1/location_config", self.mgmt_api_endpoint);
274 0 : self.request(Method::GET, &path, ())
275 0 : .await?
276 0 : .json()
277 0 : .await
278 0 : .map_err(Error::ReceiveBody)
279 0 : }
280 :
281 0 : pub async fn timeline_create(
282 0 : &self,
283 0 : tenant_shard_id: TenantShardId,
284 0 : req: &TimelineCreateRequest,
285 0 : ) -> Result<TimelineInfo> {
286 0 : let uri = format!(
287 0 : "{}/v1/tenant/{}/timeline",
288 0 : self.mgmt_api_endpoint, tenant_shard_id
289 0 : );
290 0 : self.request(Method::POST, &uri, req)
291 0 : .await?
292 0 : .json()
293 0 : .await
294 0 : .map_err(Error::ReceiveBody)
295 0 : }
296 :
297 : /// The timeline deletion API can return 201 if deletion is incomplete, or
298 : /// 403 if it is complete. Callers are responsible for checking the status
299 : /// code and retrying. Error codes other than 403 will return Err().
300 0 : pub async fn timeline_delete(
301 0 : &self,
302 0 : tenant_shard_id: TenantShardId,
303 0 : timeline_id: TimelineId,
304 0 : ) -> Result<StatusCode> {
305 0 : let uri = format!(
306 0 : "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
307 0 : self.mgmt_api_endpoint
308 0 : );
309 0 :
310 0 : match self.request(Method::DELETE, &uri, ()).await {
311 0 : Err(Error::ApiError(status_code, msg)) => {
312 0 : if status_code == StatusCode::NOT_FOUND {
313 0 : Ok(StatusCode::NOT_FOUND)
314 : } else {
315 0 : Err(Error::ApiError(status_code, msg))
316 : }
317 : }
318 0 : Err(e) => Err(e),
319 0 : Ok(response) => Ok(response.status()),
320 : }
321 0 : }
322 :
323 0 : pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
324 0 : let uri = format!(
325 0 : "{}/v1/tenant/{}/reset",
326 0 : self.mgmt_api_endpoint, tenant_shard_id
327 0 : );
328 0 : self.request(Method::POST, &uri, ())
329 0 : .await?
330 0 : .json()
331 0 : .await
332 0 : .map_err(Error::ReceiveBody)
333 0 : }
334 :
335 0 : pub async fn tenant_shard_split(
336 0 : &self,
337 0 : tenant_shard_id: TenantShardId,
338 0 : req: TenantShardSplitRequest,
339 0 : ) -> Result<TenantShardSplitResponse> {
340 0 : let uri = format!(
341 0 : "{}/v1/tenant/{}/shard_split",
342 0 : self.mgmt_api_endpoint, tenant_shard_id
343 0 : );
344 0 : self.request(Method::PUT, &uri, req)
345 0 : .await?
346 0 : .json()
347 0 : .await
348 0 : .map_err(Error::ReceiveBody)
349 0 : }
350 :
351 0 : pub async fn timeline_list(
352 0 : &self,
353 0 : tenant_shard_id: &TenantShardId,
354 0 : ) -> Result<Vec<TimelineInfo>> {
355 0 : let uri = format!(
356 0 : "{}/v1/tenant/{}/timeline",
357 0 : self.mgmt_api_endpoint, tenant_shard_id
358 0 : );
359 0 : self.get(&uri)
360 0 : .await?
361 0 : .json()
362 0 : .await
363 0 : .map_err(Error::ReceiveBody)
364 0 : }
365 :
366 0 : pub async fn tenant_synthetic_size(
367 0 : &self,
368 0 : tenant_shard_id: TenantShardId,
369 0 : ) -> Result<TenantHistorySize> {
370 0 : let uri = format!(
371 0 : "{}/v1/tenant/{}/synthetic_size",
372 0 : self.mgmt_api_endpoint, tenant_shard_id
373 0 : );
374 0 : self.get(&uri)
375 0 : .await?
376 0 : .json()
377 0 : .await
378 0 : .map_err(Error::ReceiveBody)
379 0 : }
380 :
381 0 : pub async fn put_io_engine(
382 0 : &self,
383 0 : engine: &pageserver_api::models::virtual_file::IoEngineKind,
384 0 : ) -> Result<()> {
385 0 : let uri = format!("{}/v1/io_engine", self.mgmt_api_endpoint);
386 0 : self.request(Method::PUT, uri, engine)
387 0 : .await?
388 0 : .json()
389 0 : .await
390 0 : .map_err(Error::ReceiveBody)
391 0 : }
392 : }
|