Line data Source code
1 : use chrono::{DateTime, Utc};
2 : use futures::Future;
3 : use hex::FromHex;
4 :
5 : use reqwest::{header, Client, StatusCode, Url};
6 : use serde::Deserialize;
7 : use tokio::sync::Semaphore;
8 :
9 : use tokio_util::sync::CancellationToken;
10 : use utils::backoff;
11 : use utils::id::{TenantId, TimelineId};
12 : use utils::lsn::Lsn;
13 :
14 : use crate::ConsoleConfig;
15 :
16 : #[derive(Debug)]
17 : pub struct Error {
18 : context: String,
19 : kind: ErrorKind,
20 : }
21 :
22 : impl Error {
23 0 : fn new(context: String, kind: ErrorKind) -> Self {
24 0 : Self { context, kind }
25 0 : }
26 : }
27 :
28 : impl std::fmt::Display for Error {
29 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 0 : match &self.kind {
31 0 : ErrorKind::RequestSend(e) => write!(
32 0 : f,
33 0 : "Failed to send a request. Context: {}, error: {}",
34 0 : self.context, e
35 0 : ),
36 0 : ErrorKind::BodyRead(e) => {
37 0 : write!(
38 0 : f,
39 0 : "Failed to read a request body. Context: {}, error: {}",
40 0 : self.context, e
41 0 : )
42 : }
43 0 : ErrorKind::ResponseStatus(status) => {
44 0 : write!(f, "Bad response status {}: {}", status, self.context)
45 : }
46 0 : ErrorKind::UnexpectedState => write!(f, "Unexpected state: {}", self.context),
47 : }
48 0 : }
49 : }
50 :
51 0 : #[derive(Debug, Clone, serde::Deserialize, Hash, PartialEq, Eq)]
52 : #[serde(transparent)]
53 : pub struct ProjectId(pub String);
54 :
55 0 : #[derive(Clone, Debug, serde::Deserialize, Hash, PartialEq, Eq)]
56 : #[serde(transparent)]
57 : pub struct BranchId(pub String);
58 :
59 : impl std::error::Error for Error {}
60 :
61 : #[derive(Debug)]
62 : pub enum ErrorKind {
63 : RequestSend(reqwest::Error),
64 : BodyRead(reqwest::Error),
65 : ResponseStatus(StatusCode),
66 : UnexpectedState,
67 : }
68 :
69 : pub struct CloudAdminApiClient {
70 : request_limiter: Semaphore,
71 : token: String,
72 : base_url: Url,
73 : http_client: Client,
74 : }
75 :
76 0 : #[derive(Debug, serde::Deserialize)]
77 : struct AdminApiResponse<T> {
78 : data: T,
79 : total: Option<usize>,
80 : }
81 :
82 0 : #[derive(Debug, serde::Deserialize)]
83 : pub struct PageserverData {
84 : pub id: u64,
85 : pub created_at: DateTime<Utc>,
86 : pub updated_at: DateTime<Utc>,
87 : pub region_id: String,
88 : pub version: i64,
89 : pub instance_id: String,
90 : pub port: u16,
91 : pub http_host: String,
92 : pub http_port: u16,
93 : pub active: bool,
94 : pub projects_count: usize,
95 : pub availability_zone_id: String,
96 : }
97 :
98 0 : #[derive(Debug, Clone, serde::Deserialize)]
99 : pub struct SafekeeperData {
100 : pub id: u64,
101 : pub created_at: DateTime<Utc>,
102 : pub updated_at: DateTime<Utc>,
103 : pub region_id: String,
104 : pub version: i64,
105 : pub instance_id: String,
106 : pub active: bool,
107 : pub host: String,
108 : pub port: u16,
109 : pub projects_count: usize,
110 : pub availability_zone_id: String,
111 : }
112 :
113 : /// For ID fields, the Console API does not always return a value or null. It will
114 : /// sometimes return an empty string. Our native Id type does not consider this acceptable
115 : /// (nor should it), so we use a wrapper for talking to the Console API.
116 0 : fn from_nullable_id<'de, D>(deserializer: D) -> Result<TenantId, D::Error>
117 0 : where
118 0 : D: serde::de::Deserializer<'de>,
119 0 : {
120 0 : if deserializer.is_human_readable() {
121 0 : let id_str = String::deserialize(deserializer)?;
122 0 : if id_str.is_empty() {
123 : // This is a bogus value, but for the purposes of the scrubber all that
124 : // matters is that it doesn't collide with any real IDs.
125 0 : Ok(TenantId::from([0u8; 16]))
126 : } else {
127 0 : TenantId::from_hex(&id_str).map_err(|e| serde::de::Error::custom(format!("{e}")))
128 : }
129 : } else {
130 0 : let id_arr = <[u8; 16]>::deserialize(deserializer)?;
131 0 : Ok(TenantId::from(id_arr))
132 : }
133 0 : }
134 :
135 0 : #[derive(Debug, Clone, serde::Deserialize)]
136 : pub struct ProjectData {
137 : pub id: ProjectId,
138 : pub name: String,
139 : pub region_id: String,
140 : pub platform_id: String,
141 : pub user_id: Option<String>,
142 : pub pageserver_id: Option<u64>,
143 : #[serde(deserialize_with = "from_nullable_id")]
144 : pub tenant: TenantId,
145 : pub safekeepers: Vec<SafekeeperData>,
146 : pub deleted: bool,
147 : pub created_at: DateTime<Utc>,
148 : pub updated_at: DateTime<Utc>,
149 : pub pg_version: u32,
150 : pub max_project_size: i64,
151 : pub remote_storage_size: u64,
152 : pub resident_size: u64,
153 : pub synthetic_storage_size: u64,
154 : pub compute_time: u64,
155 : pub data_transfer: u64,
156 : pub data_storage: u64,
157 : pub maintenance_set: Option<String>,
158 : }
159 :
160 0 : #[derive(Debug, Clone, serde::Deserialize)]
161 : pub struct BranchData {
162 : pub id: BranchId,
163 : pub created_at: DateTime<Utc>,
164 : pub updated_at: DateTime<Utc>,
165 : pub name: String,
166 : pub project_id: ProjectId,
167 : pub timeline_id: TimelineId,
168 : #[serde(default)]
169 : pub parent_id: Option<BranchId>,
170 : #[serde(default)]
171 : pub parent_lsn: Option<Lsn>,
172 : pub default: bool,
173 : pub deleted: bool,
174 : pub logical_size: Option<u64>,
175 : pub physical_size: Option<u64>,
176 : pub written_size: Option<u64>,
177 : }
178 :
179 : pub trait MaybeDeleted {
180 : fn is_deleted(&self) -> bool;
181 : }
182 :
183 : impl MaybeDeleted for ProjectData {
184 0 : fn is_deleted(&self) -> bool {
185 0 : self.deleted
186 0 : }
187 : }
188 :
189 : impl MaybeDeleted for BranchData {
190 0 : fn is_deleted(&self) -> bool {
191 0 : self.deleted
192 0 : }
193 : }
194 :
195 : impl CloudAdminApiClient {
196 0 : pub fn new(config: ConsoleConfig) -> Self {
197 0 : Self {
198 0 : token: config.token,
199 0 : base_url: config.base_url,
200 0 : request_limiter: Semaphore::new(200),
201 0 : http_client: Client::new(), // TODO timeout configs at least
202 0 : }
203 0 : }
204 :
205 0 : pub async fn find_tenant_project(
206 0 : &self,
207 0 : tenant_id: TenantId,
208 0 : ) -> Result<Option<ProjectData>, Error> {
209 0 : let _permit = self
210 0 : .request_limiter
211 0 : .acquire()
212 0 : .await
213 0 : .expect("Semaphore is not closed");
214 :
215 0 : let response = CloudAdminApiClient::with_retries(
216 0 : || async {
217 0 : let response = self
218 0 : .http_client
219 0 : .get(self.append_url("/projects"))
220 0 : .query(&[
221 0 : ("tenant_id", tenant_id.to_string()),
222 0 : ("show_deleted", "true".to_string()),
223 0 : ])
224 0 : .header(header::ACCEPT, "application/json")
225 0 : .bearer_auth(&self.token)
226 0 : .send()
227 0 : .await
228 0 : .map_err(|e| {
229 0 : Error::new(
230 0 : "Find project for tenant".to_string(),
231 0 : ErrorKind::RequestSend(e),
232 0 : )
233 0 : })?;
234 :
235 0 : let response: AdminApiResponse<Vec<ProjectData>> =
236 0 : response.json().await.map_err(|e| {
237 0 : Error::new(
238 0 : "Find project for tenant".to_string(),
239 0 : ErrorKind::BodyRead(e),
240 0 : )
241 0 : })?;
242 0 : Ok(response)
243 0 : },
244 0 : "find_tenant_project",
245 0 : )
246 0 : .await?;
247 :
248 0 : match response.data.len() {
249 0 : 0 => Ok(None),
250 0 : 1 => Ok(Some(
251 0 : response
252 0 : .data
253 0 : .into_iter()
254 0 : .next()
255 0 : .expect("Should have exactly one element"),
256 0 : )),
257 0 : too_many => Err(Error::new(
258 0 : format!("Find project for tenant returned {too_many} projects instead of 0 or 1"),
259 0 : ErrorKind::UnexpectedState,
260 0 : )),
261 : }
262 0 : }
263 :
264 0 : pub async fn list_projects(&self) -> Result<Vec<ProjectData>, Error> {
265 0 : let _permit = self
266 0 : .request_limiter
267 0 : .acquire()
268 0 : .await
269 0 : .expect("Semaphore is not closed");
270 0 :
271 0 : let mut pagination_offset = 0;
272 : const PAGINATION_LIMIT: usize = 512;
273 0 : let mut result: Vec<ProjectData> = Vec::with_capacity(PAGINATION_LIMIT);
274 : loop {
275 0 : let response_bytes = CloudAdminApiClient::with_retries(
276 0 : || async {
277 0 : let response = self
278 0 : .http_client
279 0 : .get(self.append_url("/projects"))
280 0 : .query(&[
281 0 : ("show_deleted", "false".to_string()),
282 0 : ("limit", format!("{PAGINATION_LIMIT}")),
283 0 : ("offset", format!("{pagination_offset}")),
284 0 : ])
285 0 : .header(header::ACCEPT, "application/json")
286 0 : .bearer_auth(&self.token)
287 0 : .send()
288 0 : .await
289 0 : .map_err(|e| {
290 0 : Error::new(
291 0 : "List active projects".to_string(),
292 0 : ErrorKind::RequestSend(e),
293 0 : )
294 0 : })?;
295 :
296 0 : response.bytes().await.map_err(|e| {
297 0 : Error::new("List active projects".to_string(), ErrorKind::BodyRead(e))
298 0 : })
299 0 : },
300 0 : "list_projects",
301 0 : )
302 0 : .await?;
303 :
304 0 : let decode_result =
305 0 : serde_json::from_slice::<AdminApiResponse<Vec<ProjectData>>>(&response_bytes);
306 :
307 0 : let mut response = match decode_result {
308 0 : Ok(r) => r,
309 0 : Err(decode) => {
310 0 : tracing::error!(
311 0 : "Failed to decode response body: {}\n{}",
312 0 : decode,
313 0 : String::from_utf8(response_bytes.to_vec()).unwrap()
314 : );
315 0 : panic!("we out");
316 : }
317 : };
318 :
319 0 : pagination_offset += response.data.len();
320 0 :
321 0 : result.append(&mut response.data);
322 0 :
323 0 : if pagination_offset >= response.total.unwrap_or(0) {
324 0 : break;
325 0 : }
326 : }
327 :
328 0 : Ok(result)
329 0 : }
330 :
331 0 : pub async fn find_timeline_branch(
332 0 : &self,
333 0 : tenant_id: TenantId,
334 0 : timeline_id: TimelineId,
335 0 : ) -> Result<Option<BranchData>, Error> {
336 0 : let _permit = self
337 0 : .request_limiter
338 0 : .acquire()
339 0 : .await
340 0 : .expect("Semaphore is not closed");
341 :
342 0 : let response = CloudAdminApiClient::with_retries(
343 0 : || async {
344 0 : let response = self
345 0 : .http_client
346 0 : .get(self.append_url("/branches"))
347 0 : .query(&[
348 0 : ("timeline_id", timeline_id.to_string()),
349 0 : ("show_deleted", "true".to_string()),
350 0 : ])
351 0 : .header(header::ACCEPT, "application/json")
352 0 : .bearer_auth(&self.token)
353 0 : .send()
354 0 : .await
355 0 : .map_err(|e| {
356 0 : Error::new(
357 0 : "Find branch for timeline".to_string(),
358 0 : ErrorKind::RequestSend(e),
359 0 : )
360 0 : })?;
361 :
362 0 : let response: AdminApiResponse<Vec<BranchData>> =
363 0 : response.json().await.map_err(|e| {
364 0 : Error::new(
365 0 : "Find branch for timeline".to_string(),
366 0 : ErrorKind::BodyRead(e),
367 0 : )
368 0 : })?;
369 0 : Ok(response)
370 0 : },
371 0 : "find_timeline_branch",
372 0 : )
373 0 : .await?;
374 :
375 0 : let mut branches: Vec<BranchData> = response.data.into_iter().collect();
376 : // Normally timeline_id is unique. However, we do have at least one case
377 : // of the same timeline_id in two different projects, apparently after
378 : // manual recovery. So always recheck project_id (discovered through
379 : // tenant_id).
380 0 : let project_data = match self.find_tenant_project(tenant_id).await? {
381 0 : Some(pd) => pd,
382 0 : None => return Ok(None),
383 : };
384 0 : branches.retain(|b| b.project_id == project_data.id);
385 0 : if branches.len() < 2 {
386 0 : Ok(branches.first().cloned())
387 : } else {
388 0 : Err(Error::new(
389 0 : format!(
390 0 : "Find branch for timeline {}/{} returned {} branches instead of 0 or 1",
391 0 : tenant_id,
392 0 : timeline_id,
393 0 : branches.len()
394 0 : ),
395 0 : ErrorKind::UnexpectedState,
396 0 : ))
397 : }
398 0 : }
399 :
400 0 : pub async fn list_pageservers(&self) -> Result<Vec<PageserverData>, Error> {
401 0 : let _permit = self
402 0 : .request_limiter
403 0 : .acquire()
404 0 : .await
405 0 : .expect("Semaphore is not closed");
406 :
407 0 : let response = self
408 0 : .http_client
409 0 : .get(self.append_url("/pageservers"))
410 0 : .header(header::ACCEPT, "application/json")
411 0 : .bearer_auth(&self.token)
412 0 : .send()
413 0 : .await
414 0 : .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::RequestSend(e)))?;
415 :
416 0 : let response: AdminApiResponse<Vec<PageserverData>> = response
417 0 : .json()
418 0 : .await
419 0 : .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::BodyRead(e)))?;
420 :
421 0 : Ok(response.data)
422 0 : }
423 :
424 0 : pub async fn list_safekeepers(&self) -> Result<Vec<SafekeeperData>, Error> {
425 0 : let _permit = self
426 0 : .request_limiter
427 0 : .acquire()
428 0 : .await
429 0 : .expect("Semaphore is not closed");
430 :
431 0 : let response = self
432 0 : .http_client
433 0 : .get(self.append_url("/safekeepers"))
434 0 : .header(header::ACCEPT, "application/json")
435 0 : .bearer_auth(&self.token)
436 0 : .send()
437 0 : .await
438 0 : .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::RequestSend(e)))?;
439 :
440 0 : let response: AdminApiResponse<Vec<SafekeeperData>> = response
441 0 : .json()
442 0 : .await
443 0 : .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::BodyRead(e)))?;
444 :
445 0 : Ok(response.data)
446 0 : }
447 :
448 0 : pub async fn projects_for_pageserver(
449 0 : &self,
450 0 : pageserver_id: u64,
451 0 : show_deleted: bool,
452 0 : ) -> Result<Vec<ProjectData>, Error> {
453 0 : let _permit = self
454 0 : .request_limiter
455 0 : .acquire()
456 0 : .await
457 0 : .expect("Semaphore is not closed");
458 :
459 0 : let response = self
460 0 : .http_client
461 0 : .get(self.append_url("/projects"))
462 0 : .query(&[
463 0 : ("pageserver_id", &pageserver_id.to_string()),
464 0 : ("show_deleted", &show_deleted.to_string()),
465 0 : ])
466 0 : .header(header::ACCEPT, "application/json")
467 0 : .bearer_auth(&self.token)
468 0 : .send()
469 0 : .await
470 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
471 :
472 0 : let response: AdminApiResponse<Vec<ProjectData>> = response
473 0 : .json()
474 0 : .await
475 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
476 :
477 0 : Ok(response.data)
478 0 : }
479 :
480 0 : pub async fn project_for_tenant(
481 0 : &self,
482 0 : tenant_id: TenantId,
483 0 : show_deleted: bool,
484 0 : ) -> Result<Option<ProjectData>, Error> {
485 0 : let _permit = self
486 0 : .request_limiter
487 0 : .acquire()
488 0 : .await
489 0 : .expect("Semaphore is not closed");
490 :
491 0 : let response = self
492 0 : .http_client
493 0 : .get(self.append_url("/projects"))
494 0 : .query(&[
495 0 : ("search", &tenant_id.to_string()),
496 0 : ("show_deleted", &show_deleted.to_string()),
497 0 : ])
498 0 : .header(header::ACCEPT, "application/json")
499 0 : .bearer_auth(&self.token)
500 0 : .send()
501 0 : .await
502 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
503 :
504 0 : let response: AdminApiResponse<Vec<ProjectData>> = response
505 0 : .json()
506 0 : .await
507 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
508 :
509 0 : match response.data.as_slice() {
510 0 : [] => Ok(None),
511 0 : [_single] => Ok(Some(response.data.into_iter().next().unwrap())),
512 0 : multiple => Err(Error::new(
513 0 : format!("Got more than one project for tenant {tenant_id} : {multiple:?}"),
514 0 : ErrorKind::UnexpectedState,
515 0 : )),
516 : }
517 0 : }
518 :
519 0 : pub async fn branches_for_project(
520 0 : &self,
521 0 : project_id: &ProjectId,
522 0 : show_deleted: bool,
523 0 : ) -> Result<Vec<BranchData>, Error> {
524 0 : let _permit = self
525 0 : .request_limiter
526 0 : .acquire()
527 0 : .await
528 0 : .expect("Semaphore is not closed");
529 :
530 0 : let response = self
531 0 : .http_client
532 0 : .get(self.append_url("/branches"))
533 0 : .query(&[
534 0 : ("project_id", &project_id.0),
535 0 : ("show_deleted", &show_deleted.to_string()),
536 0 : ])
537 0 : .header(header::ACCEPT, "application/json")
538 0 : .bearer_auth(&self.token)
539 0 : .send()
540 0 : .await
541 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
542 :
543 0 : let response: AdminApiResponse<Vec<BranchData>> = response
544 0 : .json()
545 0 : .await
546 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
547 :
548 0 : Ok(response.data)
549 0 : }
550 :
551 0 : fn append_url(&self, subpath: &str) -> Url {
552 0 : // TODO fugly, but `.join` does not work when called
553 0 : (self.base_url.to_string() + subpath)
554 0 : .parse()
555 0 : .unwrap_or_else(|e| panic!("Could not append {subpath} to base url: {e}"))
556 0 : }
557 :
558 0 : async fn with_retries<T, O, F>(op: O, description: &str) -> Result<T, Error>
559 0 : where
560 0 : O: FnMut() -> F,
561 0 : F: Future<Output = Result<T, Error>>,
562 0 : {
563 0 : let cancel = CancellationToken::new(); // not really used
564 0 : backoff::retry(op, |_| false, 1, 20, description, &cancel)
565 0 : .await
566 0 : .expect("cancellations are disabled")
567 0 : }
568 : }
|