Line data Source code
1 : use std::time::Duration;
2 :
3 : use chrono::{DateTime, Utc};
4 : use hex::FromHex;
5 : use reqwest::{header, Client, StatusCode, Url};
6 : use serde::Deserialize;
7 : use tokio::sync::Semaphore;
8 :
9 : use utils::id::{TenantId, TimelineId};
10 : use utils::lsn::Lsn;
11 :
12 : use crate::ConsoleConfig;
13 :
14 0 : #[derive(Debug)]
15 : pub struct Error {
16 : context: String,
17 : kind: ErrorKind,
18 : }
19 :
20 : impl Error {
21 0 : fn new(context: String, kind: ErrorKind) -> Self {
22 0 : Self { context, kind }
23 0 : }
24 : }
25 :
26 : impl std::fmt::Display for Error {
27 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 0 : match &self.kind {
29 0 : ErrorKind::RequestSend(e) => write!(
30 0 : f,
31 0 : "Failed to send a request. Context: {}, error: {}",
32 0 : self.context, e
33 0 : ),
34 0 : ErrorKind::BodyRead(e) => {
35 0 : write!(
36 0 : f,
37 0 : "Failed to read a request body. Context: {}, error: {}",
38 0 : self.context, e
39 0 : )
40 : }
41 0 : ErrorKind::ResponseStatus(status) => {
42 0 : write!(f, "Bad response status {}: {}", status, self.context)
43 : }
44 0 : ErrorKind::UnexpectedState => write!(f, "Unexpected state: {}", self.context),
45 : }
46 0 : }
47 : }
48 :
49 0 : #[derive(Debug, Clone, serde::Deserialize, Hash, PartialEq, Eq)]
50 : #[serde(transparent)]
51 : pub struct ProjectId(pub String);
52 :
53 0 : #[derive(Clone, Debug, serde::Deserialize, Hash, PartialEq, Eq)]
54 : #[serde(transparent)]
55 : pub struct BranchId(pub String);
56 :
57 : impl std::error::Error for Error {}
58 :
59 0 : #[derive(Debug)]
60 : pub enum ErrorKind {
61 : RequestSend(reqwest::Error),
62 : BodyRead(reqwest::Error),
63 : ResponseStatus(StatusCode),
64 : UnexpectedState,
65 : }
66 :
67 : pub struct CloudAdminApiClient {
68 : request_limiter: Semaphore,
69 : token: String,
70 : base_url: Url,
71 : http_client: Client,
72 : }
73 :
74 0 : #[derive(Debug, serde::Deserialize)]
75 : struct AdminApiResponse<T> {
76 : data: T,
77 : total: Option<usize>,
78 : }
79 :
80 0 : #[derive(Debug, serde::Deserialize)]
81 : pub struct PageserverData {
82 : pub id: u64,
83 : pub created_at: DateTime<Utc>,
84 : pub updated_at: DateTime<Utc>,
85 : pub region_id: String,
86 : pub version: i64,
87 : pub instance_id: String,
88 : pub port: u16,
89 : pub http_host: String,
90 : pub http_port: u16,
91 : pub active: bool,
92 : pub projects_count: usize,
93 : pub availability_zone_id: String,
94 : }
95 :
96 0 : #[derive(Debug, Clone, serde::Deserialize)]
97 : pub struct SafekeeperData {
98 : pub id: u64,
99 : pub created_at: DateTime<Utc>,
100 : pub updated_at: DateTime<Utc>,
101 : pub region_id: String,
102 : pub version: i64,
103 : pub instance_id: String,
104 : pub active: bool,
105 : pub host: String,
106 : pub port: u16,
107 : pub projects_count: usize,
108 : pub availability_zone_id: String,
109 : }
110 :
111 : /// For ID fields, the Console API does not always return a value or null. It will
112 : /// sometimes return an empty string. Our native Id type does not consider this acceptable
113 : /// (nor should it), so we use a wrapper for talking to the Console API.
114 0 : fn from_nullable_id<'de, D>(deserializer: D) -> Result<TenantId, D::Error>
115 0 : where
116 0 : D: serde::de::Deserializer<'de>,
117 0 : {
118 0 : if deserializer.is_human_readable() {
119 0 : let id_str = String::deserialize(deserializer)?;
120 0 : if id_str.is_empty() {
121 : // This is a bogus value, but for the purposes of the scrubber all that
122 : // matters is that it doesn't collide with any real IDs.
123 0 : Ok(TenantId::from([0u8; 16]))
124 : } else {
125 0 : TenantId::from_hex(&id_str).map_err(|e| serde::de::Error::custom(format!("{e}")))
126 : }
127 : } else {
128 0 : let id_arr = <[u8; 16]>::deserialize(deserializer)?;
129 0 : Ok(TenantId::from(id_arr))
130 : }
131 0 : }
132 :
133 0 : #[derive(Debug, Clone, serde::Deserialize)]
134 : pub struct ProjectData {
135 : pub id: ProjectId,
136 : pub name: String,
137 : pub region_id: String,
138 : pub platform_id: String,
139 : pub user_id: String,
140 : pub pageserver_id: u64,
141 : #[serde(deserialize_with = "from_nullable_id")]
142 : pub tenant: TenantId,
143 : pub safekeepers: Vec<SafekeeperData>,
144 : pub deleted: bool,
145 : pub created_at: DateTime<Utc>,
146 : pub updated_at: DateTime<Utc>,
147 : pub pg_version: u32,
148 : pub max_project_size: u64,
149 : pub remote_storage_size: u64,
150 : pub resident_size: u64,
151 : pub synthetic_storage_size: u64,
152 : pub compute_time: u64,
153 : pub data_transfer: u64,
154 : pub data_storage: u64,
155 : pub maintenance_set: Option<String>,
156 : }
157 :
158 0 : #[derive(Debug, serde::Deserialize)]
159 : pub struct BranchData {
160 : pub id: BranchId,
161 : pub created_at: DateTime<Utc>,
162 : pub updated_at: DateTime<Utc>,
163 : pub name: String,
164 : pub project_id: ProjectId,
165 : pub timeline_id: TimelineId,
166 : #[serde(default)]
167 : pub parent_id: Option<BranchId>,
168 : #[serde(default)]
169 : pub parent_lsn: Option<Lsn>,
170 : pub default: bool,
171 : pub deleted: bool,
172 : pub logical_size: Option<u64>,
173 : pub physical_size: Option<u64>,
174 : pub written_size: Option<u64>,
175 : }
176 :
177 : pub trait MaybeDeleted {
178 : fn is_deleted(&self) -> bool;
179 : }
180 :
181 : impl MaybeDeleted for ProjectData {
182 0 : fn is_deleted(&self) -> bool {
183 0 : self.deleted
184 0 : }
185 : }
186 :
187 : impl MaybeDeleted for BranchData {
188 0 : fn is_deleted(&self) -> bool {
189 0 : self.deleted
190 0 : }
191 : }
192 :
193 : impl CloudAdminApiClient {
194 0 : pub fn new(config: ConsoleConfig) -> Self {
195 0 : Self {
196 0 : token: config.token,
197 0 : base_url: config.base_url,
198 0 : request_limiter: Semaphore::new(200),
199 0 : http_client: Client::new(), // TODO timeout configs at least
200 0 : }
201 0 : }
202 :
203 0 : pub async fn find_tenant_project(
204 0 : &self,
205 0 : tenant_id: TenantId,
206 0 : ) -> Result<Option<ProjectData>, Error> {
207 0 : let _permit = self
208 0 : .request_limiter
209 0 : .acquire()
210 0 : .await
211 0 : .expect("Semaphore is not closed");
212 :
213 0 : let response = self
214 0 : .http_client
215 0 : .get(self.append_url("/projects"))
216 0 : .query(&[
217 0 : ("tenant_id", tenant_id.to_string()),
218 0 : ("show_deleted", "true".to_string()),
219 0 : ])
220 0 : .header(header::ACCEPT, "application/json")
221 0 : .bearer_auth(&self.token)
222 0 : .send()
223 0 : .await
224 0 : .map_err(|e| {
225 0 : Error::new(
226 0 : "Find project for tenant".to_string(),
227 0 : ErrorKind::RequestSend(e),
228 0 : )
229 0 : })?;
230 :
231 0 : let response: AdminApiResponse<Vec<ProjectData>> = response.json().await.map_err(|e| {
232 0 : Error::new(
233 0 : "Find project for tenant".to_string(),
234 0 : ErrorKind::BodyRead(e),
235 0 : )
236 0 : })?;
237 0 : match response.data.len() {
238 0 : 0 => Ok(None),
239 0 : 1 => Ok(Some(
240 0 : response
241 0 : .data
242 0 : .into_iter()
243 0 : .next()
244 0 : .expect("Should have exactly one element"),
245 0 : )),
246 0 : too_many => Err(Error::new(
247 0 : format!("Find project for tenant returned {too_many} projects instead of 0 or 1"),
248 0 : ErrorKind::UnexpectedState,
249 0 : )),
250 : }
251 0 : }
252 :
253 0 : pub async fn list_projects(&self, region_id: String) -> Result<Vec<ProjectData>, Error> {
254 0 : let _permit = self
255 0 : .request_limiter
256 0 : .acquire()
257 0 : .await
258 0 : .expect("Semaphore is not closed");
259 0 :
260 0 : let mut pagination_offset = 0;
261 0 : const PAGINATION_LIMIT: usize = 512;
262 0 : let mut result: Vec<ProjectData> = Vec::with_capacity(PAGINATION_LIMIT);
263 : loop {
264 0 : let response = self
265 0 : .http_client
266 0 : .get(self.append_url("/projects"))
267 0 : .query(&[
268 0 : ("show_deleted", "false".to_string()),
269 0 : ("limit", format!("{PAGINATION_LIMIT}")),
270 0 : ("offset", format!("{pagination_offset}")),
271 0 : ])
272 0 : .header(header::ACCEPT, "application/json")
273 0 : .bearer_auth(&self.token)
274 0 : .send()
275 0 : .await
276 0 : .map_err(|e| {
277 0 : Error::new(
278 0 : "List active projects".to_string(),
279 0 : ErrorKind::RequestSend(e),
280 0 : )
281 0 : })?;
282 :
283 0 : match response.status() {
284 0 : StatusCode::OK => {}
285 : StatusCode::SERVICE_UNAVAILABLE | StatusCode::TOO_MANY_REQUESTS => {
286 0 : tokio::time::sleep(Duration::from_millis(500)).await;
287 0 : continue;
288 : }
289 0 : _status => {
290 0 : return Err(Error::new(
291 0 : "List active projects".to_string(),
292 0 : ErrorKind::ResponseStatus(response.status()),
293 0 : ))
294 : }
295 : }
296 :
297 0 : let response_bytes = response.bytes().await.map_err(|e| {
298 0 : Error::new("List active projects".to_string(), ErrorKind::BodyRead(e))
299 0 : })?;
300 :
301 0 : let decode_result =
302 0 : serde_json::from_slice::<AdminApiResponse<Vec<ProjectData>>>(&response_bytes);
303 :
304 0 : let mut response = match decode_result {
305 0 : Ok(r) => r,
306 0 : Err(decode) => {
307 0 : tracing::error!(
308 0 : "Failed to decode response body: {}\n{}",
309 0 : decode,
310 0 : String::from_utf8(response_bytes.to_vec()).unwrap()
311 0 : );
312 0 : panic!("we out");
313 : }
314 : };
315 :
316 0 : pagination_offset += response.data.len();
317 0 :
318 0 : result.extend(response.data.drain(..).filter(|t| t.region_id == region_id));
319 0 :
320 0 : if pagination_offset >= response.total.unwrap_or(0) {
321 0 : break;
322 0 : }
323 : }
324 :
325 0 : Ok(result)
326 0 : }
327 :
328 0 : pub async fn find_timeline_branch(
329 0 : &self,
330 0 : timeline_id: TimelineId,
331 0 : ) -> Result<Option<BranchData>, Error> {
332 0 : let _permit = self
333 0 : .request_limiter
334 0 : .acquire()
335 0 : .await
336 0 : .expect("Semaphore is not closed");
337 :
338 0 : let response = self
339 0 : .http_client
340 0 : .get(self.append_url("/branches"))
341 0 : .query(&[
342 0 : ("timeline_id", timeline_id.to_string()),
343 0 : ("show_deleted", "true".to_string()),
344 0 : ])
345 0 : .header(header::ACCEPT, "application/json")
346 0 : .bearer_auth(&self.token)
347 0 : .send()
348 0 : .await
349 0 : .map_err(|e| {
350 0 : Error::new(
351 0 : "Find branch for timeline".to_string(),
352 0 : ErrorKind::RequestSend(e),
353 0 : )
354 0 : })?;
355 :
356 0 : let response: AdminApiResponse<Vec<BranchData>> = response.json().await.map_err(|e| {
357 0 : Error::new(
358 0 : "Find branch for timeline".to_string(),
359 0 : ErrorKind::BodyRead(e),
360 0 : )
361 0 : })?;
362 0 : match response.data.len() {
363 0 : 0 => Ok(None),
364 0 : 1 => Ok(Some(
365 0 : response
366 0 : .data
367 0 : .into_iter()
368 0 : .next()
369 0 : .expect("Should have exactly one element"),
370 0 : )),
371 0 : too_many => Err(Error::new(
372 0 : format!("Find branch for timeline returned {too_many} branches instead of 0 or 1"),
373 0 : ErrorKind::UnexpectedState,
374 0 : )),
375 : }
376 0 : }
377 :
378 0 : pub async fn list_pageservers(&self) -> Result<Vec<PageserverData>, Error> {
379 0 : let _permit = self
380 0 : .request_limiter
381 0 : .acquire()
382 0 : .await
383 0 : .expect("Semaphore is not closed");
384 :
385 0 : let response = self
386 0 : .http_client
387 0 : .get(self.append_url("/pageservers"))
388 0 : .header(header::ACCEPT, "application/json")
389 0 : .bearer_auth(&self.token)
390 0 : .send()
391 0 : .await
392 0 : .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::RequestSend(e)))?;
393 :
394 0 : let response: AdminApiResponse<Vec<PageserverData>> = response
395 0 : .json()
396 0 : .await
397 0 : .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::BodyRead(e)))?;
398 :
399 0 : Ok(response.data)
400 0 : }
401 :
402 0 : pub async fn list_safekeepers(&self) -> Result<Vec<SafekeeperData>, Error> {
403 0 : let _permit = self
404 0 : .request_limiter
405 0 : .acquire()
406 0 : .await
407 0 : .expect("Semaphore is not closed");
408 :
409 0 : let response = self
410 0 : .http_client
411 0 : .get(self.append_url("/safekeepers"))
412 0 : .header(header::ACCEPT, "application/json")
413 0 : .bearer_auth(&self.token)
414 0 : .send()
415 0 : .await
416 0 : .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::RequestSend(e)))?;
417 :
418 0 : let response: AdminApiResponse<Vec<SafekeeperData>> = response
419 0 : .json()
420 0 : .await
421 0 : .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::BodyRead(e)))?;
422 :
423 0 : Ok(response.data)
424 0 : }
425 :
426 0 : pub async fn projects_for_pageserver(
427 0 : &self,
428 0 : pageserver_id: u64,
429 0 : show_deleted: bool,
430 0 : ) -> Result<Vec<ProjectData>, Error> {
431 0 : let _permit = self
432 0 : .request_limiter
433 0 : .acquire()
434 0 : .await
435 0 : .expect("Semaphore is not closed");
436 :
437 0 : let response = self
438 0 : .http_client
439 0 : .get(self.append_url("/projects"))
440 0 : .query(&[
441 0 : ("pageserver_id", &pageserver_id.to_string()),
442 0 : ("show_deleted", &show_deleted.to_string()),
443 0 : ])
444 0 : .header(header::ACCEPT, "application/json")
445 0 : .bearer_auth(&self.token)
446 0 : .send()
447 0 : .await
448 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
449 :
450 0 : let response: AdminApiResponse<Vec<ProjectData>> = response
451 0 : .json()
452 0 : .await
453 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
454 :
455 0 : Ok(response.data)
456 0 : }
457 :
458 0 : pub async fn project_for_tenant(
459 0 : &self,
460 0 : tenant_id: TenantId,
461 0 : show_deleted: bool,
462 0 : ) -> Result<Option<ProjectData>, Error> {
463 0 : let _permit = self
464 0 : .request_limiter
465 0 : .acquire()
466 0 : .await
467 0 : .expect("Semaphore is not closed");
468 :
469 0 : let response = self
470 0 : .http_client
471 0 : .get(self.append_url("/projects"))
472 0 : .query(&[
473 0 : ("search", &tenant_id.to_string()),
474 0 : ("show_deleted", &show_deleted.to_string()),
475 0 : ])
476 0 : .header(header::ACCEPT, "application/json")
477 0 : .bearer_auth(&self.token)
478 0 : .send()
479 0 : .await
480 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
481 :
482 0 : let response: AdminApiResponse<Vec<ProjectData>> = response
483 0 : .json()
484 0 : .await
485 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
486 :
487 0 : match response.data.as_slice() {
488 0 : [] => Ok(None),
489 0 : [_single] => Ok(Some(response.data.into_iter().next().unwrap())),
490 0 : multiple => Err(Error::new(
491 0 : format!("Got more than one project for tenant {tenant_id} : {multiple:?}"),
492 0 : ErrorKind::UnexpectedState,
493 0 : )),
494 : }
495 0 : }
496 :
497 0 : pub async fn branches_for_project(
498 0 : &self,
499 0 : project_id: &ProjectId,
500 0 : show_deleted: bool,
501 0 : ) -> Result<Vec<BranchData>, Error> {
502 0 : let _permit = self
503 0 : .request_limiter
504 0 : .acquire()
505 0 : .await
506 0 : .expect("Semaphore is not closed");
507 :
508 0 : let response = self
509 0 : .http_client
510 0 : .get(self.append_url("/branches"))
511 0 : .query(&[
512 0 : ("project_id", &project_id.0),
513 0 : ("show_deleted", &show_deleted.to_string()),
514 0 : ])
515 0 : .header(header::ACCEPT, "application/json")
516 0 : .bearer_auth(&self.token)
517 0 : .send()
518 0 : .await
519 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
520 :
521 0 : let response: AdminApiResponse<Vec<BranchData>> = response
522 0 : .json()
523 0 : .await
524 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
525 :
526 0 : Ok(response.data)
527 0 : }
528 :
529 0 : fn append_url(&self, subpath: &str) -> Url {
530 0 : // TODO fugly, but `.join` does not work when called
531 0 : (self.base_url.to_string() + subpath)
532 0 : .parse()
533 0 : .unwrap_or_else(|e| panic!("Could not append {subpath} to base url: {e}"))
534 0 : }
535 : }
|