TLA Line data Source code
1 : #![allow(unused)]
2 :
3 : use chrono::{DateTime, Utc};
4 : use reqwest::{header, Client, Url};
5 : use tokio::sync::Semaphore;
6 :
7 : use utils::id::{TenantId, TimelineId};
8 : use utils::lsn::Lsn;
9 :
10 UBC 0 : #[derive(Debug)]
11 : pub struct Error {
12 : context: String,
13 : kind: ErrorKind,
14 : }
15 :
16 : impl Error {
17 0 : fn new(context: String, kind: ErrorKind) -> Self {
18 0 : Self { context, kind }
19 0 : }
20 : }
21 :
22 : impl std::fmt::Display for Error {
23 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 0 : match &self.kind {
25 0 : ErrorKind::RequestSend(e) => write!(
26 0 : f,
27 0 : "Failed to send a request. Context: {}, error: {}",
28 0 : self.context, e
29 0 : ),
30 0 : ErrorKind::BodyRead(e) => {
31 0 : write!(
32 0 : f,
33 0 : "Failed to read a request body. Context: {}, error: {}",
34 0 : self.context, e
35 0 : )
36 : }
37 0 : ErrorKind::UnexpectedState => write!(f, "Unexpected state: {}", self.context),
38 : }
39 0 : }
40 : }
41 :
42 0 : #[derive(Debug, Clone, serde::Deserialize, Hash, PartialEq, Eq)]
43 : #[serde(transparent)]
44 : pub struct ProjectId(pub String);
45 :
46 0 : #[derive(Clone, Debug, serde::Deserialize, Hash, PartialEq, Eq)]
47 : #[serde(transparent)]
48 : pub struct BranchId(pub String);
49 :
50 : impl std::error::Error for Error {}
51 :
52 0 : #[derive(Debug)]
53 : pub enum ErrorKind {
54 : RequestSend(reqwest::Error),
55 : BodyRead(reqwest::Error),
56 : UnexpectedState,
57 : }
58 :
59 : pub struct CloudAdminApiClient {
60 : request_limiter: Semaphore,
61 : token: String,
62 : base_url: Url,
63 : http_client: Client,
64 : }
65 :
66 0 : #[derive(Debug, serde::Deserialize)]
67 : struct AdminApiResponse<T> {
68 : data: T,
69 : total: Option<usize>,
70 : }
71 :
72 0 : #[derive(Debug, serde::Deserialize)]
73 : pub struct PageserverData {
74 : pub id: u64,
75 : pub created_at: DateTime<Utc>,
76 : pub updated_at: DateTime<Utc>,
77 : pub region_id: String,
78 : pub version: i64,
79 : pub instance_id: String,
80 : pub port: u16,
81 : pub http_host: String,
82 : pub http_port: u16,
83 : pub active: bool,
84 : pub projects_count: usize,
85 : pub availability_zone_id: String,
86 : }
87 :
88 0 : #[derive(Debug, Clone, serde::Deserialize)]
89 : pub struct SafekeeperData {
90 : pub id: u64,
91 : pub created_at: DateTime<Utc>,
92 : pub updated_at: DateTime<Utc>,
93 : pub region_id: String,
94 : pub version: i64,
95 : pub instance_id: String,
96 : pub active: bool,
97 : pub host: String,
98 : pub port: u16,
99 : pub projects_count: usize,
100 : pub availability_zone_id: String,
101 : }
102 :
103 : #[serde_with::serde_as]
104 0 : #[derive(Debug, Clone, serde::Deserialize)]
105 : pub struct ProjectData {
106 : pub id: ProjectId,
107 : pub name: String,
108 : pub region_id: String,
109 : pub platform_id: String,
110 : pub user_id: String,
111 : pub pageserver_id: u64,
112 : #[serde_as(as = "serde_with::DisplayFromStr")]
113 : pub tenant: TenantId,
114 : pub safekeepers: Vec<SafekeeperData>,
115 : pub deleted: bool,
116 : pub created_at: DateTime<Utc>,
117 : pub updated_at: DateTime<Utc>,
118 : pub pg_version: u32,
119 : pub max_project_size: u64,
120 : pub remote_storage_size: u64,
121 : pub resident_size: u64,
122 : pub synthetic_storage_size: u64,
123 : pub compute_time: u64,
124 : pub data_transfer: u64,
125 : pub data_storage: u64,
126 : pub maintenance_set: Option<String>,
127 : }
128 :
129 : #[serde_with::serde_as]
130 0 : #[derive(Debug, serde::Deserialize)]
131 : pub struct BranchData {
132 : pub id: BranchId,
133 : pub created_at: DateTime<Utc>,
134 : pub updated_at: DateTime<Utc>,
135 : pub name: String,
136 : pub project_id: ProjectId,
137 : #[serde_as(as = "serde_with::DisplayFromStr")]
138 : pub timeline_id: TimelineId,
139 : #[serde(default)]
140 : pub parent_id: Option<BranchId>,
141 : #[serde(default)]
142 : #[serde_as(as = "Option<serde_with::DisplayFromStr>")]
143 : pub parent_lsn: Option<Lsn>,
144 : pub default: bool,
145 : pub deleted: bool,
146 : pub logical_size: Option<u64>,
147 : pub physical_size: Option<u64>,
148 : pub written_size: Option<u64>,
149 : }
150 :
151 : impl CloudAdminApiClient {
152 0 : pub fn new(token: String, base_url: Url) -> Self {
153 0 : Self {
154 0 : token,
155 0 : base_url,
156 0 : request_limiter: Semaphore::new(200),
157 0 : http_client: Client::new(), // TODO timeout configs at least
158 0 : }
159 0 : }
160 :
161 0 : pub async fn find_tenant_project(
162 0 : &self,
163 0 : tenant_id: TenantId,
164 0 : ) -> Result<Option<ProjectData>, Error> {
165 0 : let _permit = self
166 0 : .request_limiter
167 0 : .acquire()
168 0 : .await
169 0 : .expect("Semaphore is not closed");
170 :
171 0 : let response = self
172 0 : .http_client
173 0 : .get(self.append_url("/projects"))
174 0 : .query(&[
175 0 : ("tenant_id", tenant_id.to_string()),
176 0 : ("show_deleted", "true".to_string()),
177 0 : ])
178 0 : .header(header::ACCEPT, "application/json")
179 0 : .bearer_auth(&self.token)
180 0 : .send()
181 0 : .await
182 0 : .map_err(|e| {
183 0 : Error::new(
184 0 : "Find project for tenant".to_string(),
185 0 : ErrorKind::RequestSend(e),
186 0 : )
187 0 : })?;
188 :
189 0 : let response: AdminApiResponse<Vec<ProjectData>> = response.json().await.map_err(|e| {
190 0 : Error::new(
191 0 : "Find project for tenant".to_string(),
192 0 : ErrorKind::BodyRead(e),
193 0 : )
194 0 : })?;
195 0 : match response.data.len() {
196 0 : 0 => Ok(None),
197 0 : 1 => Ok(Some(
198 0 : response
199 0 : .data
200 0 : .into_iter()
201 0 : .next()
202 0 : .expect("Should have exactly one element"),
203 0 : )),
204 0 : too_many => Err(Error::new(
205 0 : format!("Find project for tenant returned {too_many} projects instead of 0 or 1"),
206 0 : ErrorKind::UnexpectedState,
207 0 : )),
208 : }
209 0 : }
210 :
211 0 : pub async fn find_timeline_branch(
212 0 : &self,
213 0 : timeline_id: TimelineId,
214 0 : ) -> Result<Option<BranchData>, Error> {
215 0 : let _permit = self
216 0 : .request_limiter
217 0 : .acquire()
218 0 : .await
219 0 : .expect("Semaphore is not closed");
220 :
221 0 : let response = self
222 0 : .http_client
223 0 : .get(self.append_url("/branches"))
224 0 : .query(&[
225 0 : ("timeline_id", timeline_id.to_string()),
226 0 : ("show_deleted", "true".to_string()),
227 0 : ])
228 0 : .header(header::ACCEPT, "application/json")
229 0 : .bearer_auth(&self.token)
230 0 : .send()
231 0 : .await
232 0 : .map_err(|e| {
233 0 : Error::new(
234 0 : "Find branch for timeline".to_string(),
235 0 : ErrorKind::RequestSend(e),
236 0 : )
237 0 : })?;
238 :
239 0 : let response: AdminApiResponse<Vec<BranchData>> = response.json().await.map_err(|e| {
240 0 : Error::new(
241 0 : "Find branch for timeline".to_string(),
242 0 : ErrorKind::BodyRead(e),
243 0 : )
244 0 : })?;
245 0 : match response.data.len() {
246 0 : 0 => Ok(None),
247 0 : 1 => Ok(Some(
248 0 : response
249 0 : .data
250 0 : .into_iter()
251 0 : .next()
252 0 : .expect("Should have exactly one element"),
253 0 : )),
254 0 : too_many => Err(Error::new(
255 0 : format!("Find branch for timeline returned {too_many} branches instead of 0 or 1"),
256 0 : ErrorKind::UnexpectedState,
257 0 : )),
258 : }
259 0 : }
260 :
261 0 : pub async fn list_pageservers(&self) -> Result<Vec<PageserverData>, Error> {
262 0 : let _permit = self
263 0 : .request_limiter
264 0 : .acquire()
265 0 : .await
266 0 : .expect("Semaphore is not closed");
267 :
268 0 : let response = self
269 0 : .http_client
270 0 : .get(self.append_url("/pageservers"))
271 0 : .header(header::ACCEPT, "application/json")
272 0 : .bearer_auth(&self.token)
273 0 : .send()
274 0 : .await
275 0 : .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::RequestSend(e)))?;
276 :
277 0 : let response: AdminApiResponse<Vec<PageserverData>> = response
278 0 : .json()
279 0 : .await
280 0 : .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::BodyRead(e)))?;
281 :
282 0 : Ok(response.data)
283 0 : }
284 :
285 0 : pub async fn list_safekeepers(&self) -> Result<Vec<SafekeeperData>, Error> {
286 0 : let _permit = self
287 0 : .request_limiter
288 0 : .acquire()
289 0 : .await
290 0 : .expect("Semaphore is not closed");
291 :
292 0 : let response = self
293 0 : .http_client
294 0 : .get(self.append_url("/safekeepers"))
295 0 : .header(header::ACCEPT, "application/json")
296 0 : .bearer_auth(&self.token)
297 0 : .send()
298 0 : .await
299 0 : .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::RequestSend(e)))?;
300 :
301 0 : let response: AdminApiResponse<Vec<SafekeeperData>> = response
302 0 : .json()
303 0 : .await
304 0 : .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::BodyRead(e)))?;
305 :
306 0 : Ok(response.data)
307 0 : }
308 :
309 0 : pub async fn projects_for_pageserver(
310 0 : &self,
311 0 : pageserver_id: u64,
312 0 : show_deleted: bool,
313 0 : ) -> Result<Vec<ProjectData>, Error> {
314 0 : let _permit = self
315 0 : .request_limiter
316 0 : .acquire()
317 0 : .await
318 0 : .expect("Semaphore is not closed");
319 :
320 0 : let response = self
321 0 : .http_client
322 0 : .get(self.append_url("/projects"))
323 0 : .query(&[
324 0 : ("pageserver_id", &pageserver_id.to_string()),
325 0 : ("show_deleted", &show_deleted.to_string()),
326 0 : ])
327 0 : .header(header::ACCEPT, "application/json")
328 0 : .bearer_auth(&self.token)
329 0 : .send()
330 0 : .await
331 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
332 :
333 0 : let response: AdminApiResponse<Vec<ProjectData>> = response
334 0 : .json()
335 0 : .await
336 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
337 :
338 0 : Ok(response.data)
339 0 : }
340 :
341 0 : pub async fn project_for_tenant(
342 0 : &self,
343 0 : tenant_id: TenantId,
344 0 : show_deleted: bool,
345 0 : ) -> Result<Option<ProjectData>, Error> {
346 0 : let _permit = self
347 0 : .request_limiter
348 0 : .acquire()
349 0 : .await
350 0 : .expect("Semaphore is not closed");
351 :
352 0 : let response = self
353 0 : .http_client
354 0 : .get(self.append_url("/projects"))
355 0 : .query(&[
356 0 : ("search", &tenant_id.to_string()),
357 0 : ("show_deleted", &show_deleted.to_string()),
358 0 : ])
359 0 : .header(header::ACCEPT, "application/json")
360 0 : .bearer_auth(&self.token)
361 0 : .send()
362 0 : .await
363 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
364 :
365 0 : let response: AdminApiResponse<Vec<ProjectData>> = response
366 0 : .json()
367 0 : .await
368 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
369 :
370 0 : match response.data.as_slice() {
371 0 : [] => Ok(None),
372 0 : [_single] => Ok(Some(response.data.into_iter().next().unwrap())),
373 0 : multiple => Err(Error::new(
374 0 : format!("Got more than one project for tenant {tenant_id} : {multiple:?}"),
375 0 : ErrorKind::UnexpectedState,
376 0 : )),
377 : }
378 0 : }
379 :
380 0 : pub async fn branches_for_project(
381 0 : &self,
382 0 : project_id: &ProjectId,
383 0 : show_deleted: bool,
384 0 : ) -> Result<Vec<BranchData>, Error> {
385 0 : let _permit = self
386 0 : .request_limiter
387 0 : .acquire()
388 0 : .await
389 0 : .expect("Semaphore is not closed");
390 :
391 0 : let response = self
392 0 : .http_client
393 0 : .get(self.append_url("/branches"))
394 0 : .query(&[
395 0 : ("project_id", &project_id.0),
396 0 : ("show_deleted", &show_deleted.to_string()),
397 0 : ])
398 0 : .header(header::ACCEPT, "application/json")
399 0 : .bearer_auth(&self.token)
400 0 : .send()
401 0 : .await
402 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
403 :
404 0 : let response: AdminApiResponse<Vec<BranchData>> = response
405 0 : .json()
406 0 : .await
407 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
408 :
409 0 : Ok(response.data)
410 0 : }
411 :
412 0 : fn append_url(&self, subpath: &str) -> Url {
413 0 : // TODO fugly, but `.join` does not work when called
414 0 : (self.base_url.to_string() + subpath)
415 0 : .parse()
416 0 : .unwrap_or_else(|e| panic!("Could not append {subpath} to base url: {e}"))
417 0 : }
418 : }
|