Line data Source code
1 : use std::error::Error as _;
2 :
3 : use chrono::{DateTime, Utc};
4 : use futures::Future;
5 : use hex::FromHex;
6 : use reqwest::{Client, StatusCode, Url, header};
7 : use serde::Deserialize;
8 : use tokio::sync::Semaphore;
9 : use tokio_util::sync::CancellationToken;
10 : use utils::backoff;
11 : use utils::id::{TenantId, TimelineId};
12 : use utils::lsn::Lsn;
13 :
14 : use crate::ConsoleConfig;
15 :
16 : #[derive(Debug)]
17 : pub struct Error {
18 : context: String,
19 : kind: ErrorKind,
20 : }
21 :
22 : impl Error {
23 0 : fn new(context: String, kind: ErrorKind) -> Self {
24 0 : Self { context, kind }
25 0 : }
26 : }
27 :
28 : impl std::fmt::Display for Error {
29 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 0 : match &self.kind {
31 0 : ErrorKind::RequestSend(e) => write!(
32 0 : f,
33 0 : "Failed to send a request. Context: {}, error: {}{}",
34 0 : self.context,
35 0 : e,
36 0 : e.source().map(|e| format!(": {e}")).unwrap_or_default()
37 0 : ),
38 0 : ErrorKind::BodyRead(e) => {
39 0 : write!(
40 0 : f,
41 0 : "Failed to read a request body. Context: {}, error: {}{}",
42 0 : self.context,
43 0 : e,
44 0 : e.source().map(|e| format!(": {e}")).unwrap_or_default()
45 0 : )
46 : }
47 0 : ErrorKind::ResponseStatus(status) => {
48 0 : write!(f, "Bad response status {}: {}", status, self.context)
49 : }
50 0 : ErrorKind::UnexpectedState => write!(f, "Unexpected state: {}", self.context),
51 : }
52 0 : }
53 : }
54 :
55 0 : #[derive(Debug, Clone, serde::Deserialize, Hash, PartialEq, Eq)]
56 : #[serde(transparent)]
57 : pub struct ProjectId(pub String);
58 :
59 0 : #[derive(Clone, Debug, serde::Deserialize, Hash, PartialEq, Eq)]
60 : #[serde(transparent)]
61 : pub struct BranchId(pub String);
62 :
63 : impl std::error::Error for Error {}
64 :
65 : #[derive(Debug)]
66 : pub enum ErrorKind {
67 : RequestSend(reqwest::Error),
68 : BodyRead(reqwest::Error),
69 : ResponseStatus(StatusCode),
70 : UnexpectedState,
71 : }
72 :
73 : pub struct CloudAdminApiClient {
74 : request_limiter: Semaphore,
75 : token: String,
76 : base_url: Url,
77 : http_client: Client,
78 : }
79 :
80 0 : #[derive(Debug, serde::Deserialize)]
81 : struct AdminApiResponse<T> {
82 : data: T,
83 : total: Option<usize>,
84 : }
85 :
86 0 : #[derive(Debug, serde::Deserialize)]
87 : pub struct PageserverData {
88 : pub id: u64,
89 : pub created_at: DateTime<Utc>,
90 : pub updated_at: DateTime<Utc>,
91 : pub region_id: String,
92 : pub version: i64,
93 : pub instance_id: String,
94 : pub port: u16,
95 : pub http_host: String,
96 : pub http_port: u16,
97 : pub active: bool,
98 : pub projects_count: usize,
99 : pub availability_zone_id: String,
100 : }
101 :
102 0 : #[derive(Debug, Clone, serde::Deserialize)]
103 : pub struct SafekeeperData {
104 : pub id: u64,
105 : pub created_at: DateTime<Utc>,
106 : pub updated_at: DateTime<Utc>,
107 : pub region_id: String,
108 : pub version: i64,
109 : pub instance_id: String,
110 : pub active: bool,
111 : pub host: String,
112 : pub port: u16,
113 : pub projects_count: usize,
114 : pub availability_zone_id: String,
115 : }
116 :
117 : /// For ID fields, the Console API does not always return a value or null. It will
118 : /// sometimes return an empty string. Our native Id type does not consider this acceptable
119 : /// (nor should it), so we use a wrapper for talking to the Console API.
120 0 : fn from_nullable_id<'de, D>(deserializer: D) -> Result<TenantId, D::Error>
121 0 : where
122 0 : D: serde::de::Deserializer<'de>,
123 0 : {
124 0 : if deserializer.is_human_readable() {
125 0 : let id_str = String::deserialize(deserializer)?;
126 0 : if id_str.is_empty() {
127 : // This is a bogus value, but for the purposes of the scrubber all that
128 : // matters is that it doesn't collide with any real IDs.
129 0 : Ok(TenantId::from([0u8; 16]))
130 : } else {
131 0 : TenantId::from_hex(&id_str).map_err(|e| serde::de::Error::custom(format!("{e}")))
132 : }
133 : } else {
134 0 : let id_arr = <[u8; 16]>::deserialize(deserializer)?;
135 0 : Ok(TenantId::from(id_arr))
136 : }
137 0 : }
138 :
139 0 : #[derive(Debug, Clone, serde::Deserialize)]
140 : pub struct ProjectData {
141 : pub id: ProjectId,
142 : pub name: String,
143 : pub region_id: String,
144 : pub platform_id: String,
145 : pub user_id: Option<String>,
146 : pub pageserver_id: Option<u64>,
147 : #[serde(deserialize_with = "from_nullable_id")]
148 : pub tenant: TenantId,
149 : pub safekeepers: Vec<SafekeeperData>,
150 : pub deleted: bool,
151 : pub created_at: DateTime<Utc>,
152 : pub updated_at: DateTime<Utc>,
153 : pub pg_version: u32,
154 : pub max_project_size: i64,
155 : pub remote_storage_size: u64,
156 : pub resident_size: u64,
157 : pub synthetic_storage_size: u64,
158 : pub compute_time: u64,
159 : pub data_transfer: u64,
160 : pub data_storage: u64,
161 : pub maintenance_set: Option<String>,
162 : }
163 :
164 0 : #[derive(Debug, Clone, serde::Deserialize)]
165 : pub struct BranchData {
166 : pub id: BranchId,
167 : pub created_at: DateTime<Utc>,
168 : pub updated_at: DateTime<Utc>,
169 : pub name: String,
170 : pub project_id: ProjectId,
171 : pub timeline_id: TimelineId,
172 : #[serde(default)]
173 : pub parent_id: Option<BranchId>,
174 : #[serde(default)]
175 : pub parent_lsn: Option<Lsn>,
176 : pub default: bool,
177 : pub deleted: bool,
178 : pub logical_size: Option<u64>,
179 : pub physical_size: Option<u64>,
180 : pub written_size: Option<u64>,
181 : }
182 :
183 : pub trait MaybeDeleted {
184 : fn is_deleted(&self) -> bool;
185 : }
186 :
187 : impl MaybeDeleted for ProjectData {
188 0 : fn is_deleted(&self) -> bool {
189 0 : self.deleted
190 0 : }
191 : }
192 :
193 : impl MaybeDeleted for BranchData {
194 0 : fn is_deleted(&self) -> bool {
195 0 : self.deleted
196 0 : }
197 : }
198 :
199 : impl CloudAdminApiClient {
200 0 : pub fn new(config: ConsoleConfig) -> Self {
201 0 : Self {
202 0 : token: config.token,
203 0 : base_url: config.base_url,
204 0 : request_limiter: Semaphore::new(200),
205 0 : http_client: Client::new(), // TODO timeout configs at least
206 0 : }
207 0 : }
208 :
209 0 : pub async fn find_tenant_project(
210 0 : &self,
211 0 : tenant_id: TenantId,
212 0 : ) -> Result<Option<ProjectData>, Error> {
213 0 : let _permit = self
214 0 : .request_limiter
215 0 : .acquire()
216 0 : .await
217 0 : .expect("Semaphore is not closed");
218 :
219 0 : let response = CloudAdminApiClient::with_retries(
220 0 : || async {
221 0 : let response = self
222 0 : .http_client
223 0 : .get(self.append_url("/projects"))
224 0 : .query(&[
225 0 : ("tenant_id", tenant_id.to_string()),
226 0 : ("show_deleted", "true".to_string()),
227 0 : ])
228 0 : .header(header::ACCEPT, "application/json")
229 0 : .bearer_auth(&self.token)
230 0 : .send()
231 0 : .await
232 0 : .map_err(|e| {
233 0 : Error::new(
234 0 : "Find project for tenant".to_string(),
235 0 : ErrorKind::RequestSend(e),
236 0 : )
237 0 : })?;
238 :
239 0 : let response: AdminApiResponse<Vec<ProjectData>> =
240 0 : response.json().await.map_err(|e| {
241 0 : Error::new(
242 0 : "Find project for tenant".to_string(),
243 0 : ErrorKind::BodyRead(e),
244 0 : )
245 0 : })?;
246 0 : Ok(response)
247 0 : },
248 0 : "find_tenant_project",
249 0 : )
250 0 : .await?;
251 :
252 0 : match response.data.len() {
253 0 : 0 => Ok(None),
254 0 : 1 => Ok(Some(
255 0 : response
256 0 : .data
257 0 : .into_iter()
258 0 : .next()
259 0 : .expect("Should have exactly one element"),
260 0 : )),
261 0 : too_many => Err(Error::new(
262 0 : format!("Find project for tenant returned {too_many} projects instead of 0 or 1"),
263 0 : ErrorKind::UnexpectedState,
264 0 : )),
265 : }
266 0 : }
267 :
268 0 : pub async fn list_projects(&self) -> Result<Vec<ProjectData>, Error> {
269 0 : let _permit = self
270 0 : .request_limiter
271 0 : .acquire()
272 0 : .await
273 0 : .expect("Semaphore is not closed");
274 0 :
275 0 : let mut pagination_offset = 0;
276 : const PAGINATION_LIMIT: usize = 512;
277 0 : let mut result: Vec<ProjectData> = Vec::with_capacity(PAGINATION_LIMIT);
278 : loop {
279 0 : let response_bytes = CloudAdminApiClient::with_retries(
280 0 : || async {
281 0 : let response = self
282 0 : .http_client
283 0 : .get(self.append_url("/projects"))
284 0 : .query(&[
285 0 : ("show_deleted", "false".to_string()),
286 0 : ("limit", format!("{PAGINATION_LIMIT}")),
287 0 : ("offset", format!("{pagination_offset}")),
288 0 : ])
289 0 : .header(header::ACCEPT, "application/json")
290 0 : .bearer_auth(&self.token)
291 0 : .send()
292 0 : .await
293 0 : .map_err(|e| {
294 0 : Error::new(
295 0 : "List active projects".to_string(),
296 0 : ErrorKind::RequestSend(e),
297 0 : )
298 0 : })?;
299 :
300 0 : response.bytes().await.map_err(|e| {
301 0 : Error::new("List active projects".to_string(), ErrorKind::BodyRead(e))
302 0 : })
303 0 : },
304 0 : "list_projects",
305 0 : )
306 0 : .await?;
307 :
308 0 : let decode_result =
309 0 : serde_json::from_slice::<AdminApiResponse<Vec<ProjectData>>>(&response_bytes);
310 :
311 0 : let mut response = match decode_result {
312 0 : Ok(r) => r,
313 0 : Err(decode) => {
314 0 : tracing::error!(
315 0 : "Failed to decode response body: {}\n{}",
316 0 : decode,
317 0 : String::from_utf8(response_bytes.to_vec()).unwrap()
318 : );
319 0 : panic!("we out");
320 : }
321 : };
322 :
323 0 : pagination_offset += response.data.len();
324 0 :
325 0 : result.append(&mut response.data);
326 0 :
327 0 : if pagination_offset >= response.total.unwrap_or(0) {
328 0 : break;
329 0 : }
330 : }
331 :
332 0 : Ok(result)
333 0 : }
334 :
335 0 : pub async fn find_timeline_branch(
336 0 : &self,
337 0 : tenant_id: TenantId,
338 0 : timeline_id: TimelineId,
339 0 : ) -> Result<Option<BranchData>, Error> {
340 0 : let _permit = self
341 0 : .request_limiter
342 0 : .acquire()
343 0 : .await
344 0 : .expect("Semaphore is not closed");
345 :
346 0 : let response = CloudAdminApiClient::with_retries(
347 0 : || async {
348 0 : let response = self
349 0 : .http_client
350 0 : .get(self.append_url("/branches"))
351 0 : .query(&[
352 0 : ("timeline_id", timeline_id.to_string()),
353 0 : ("show_deleted", "true".to_string()),
354 0 : ])
355 0 : .header(header::ACCEPT, "application/json")
356 0 : .bearer_auth(&self.token)
357 0 : .send()
358 0 : .await
359 0 : .map_err(|e| {
360 0 : Error::new(
361 0 : "Find branch for timeline".to_string(),
362 0 : ErrorKind::RequestSend(e),
363 0 : )
364 0 : })?;
365 :
366 0 : let response: AdminApiResponse<Vec<BranchData>> =
367 0 : response.json().await.map_err(|e| {
368 0 : Error::new(
369 0 : "Find branch for timeline".to_string(),
370 0 : ErrorKind::BodyRead(e),
371 0 : )
372 0 : })?;
373 0 : Ok(response)
374 0 : },
375 0 : "find_timeline_branch",
376 0 : )
377 0 : .await?;
378 :
379 0 : let mut branches: Vec<BranchData> = response.data.into_iter().collect();
380 : // Normally timeline_id is unique. However, we do have at least one case
381 : // of the same timeline_id in two different projects, apparently after
382 : // manual recovery. So always recheck project_id (discovered through
383 : // tenant_id).
384 0 : let project_data = match self.find_tenant_project(tenant_id).await? {
385 0 : Some(pd) => pd,
386 0 : None => return Ok(None),
387 : };
388 0 : branches.retain(|b| b.project_id == project_data.id);
389 0 : if branches.len() < 2 {
390 0 : Ok(branches.first().cloned())
391 : } else {
392 0 : Err(Error::new(
393 0 : format!(
394 0 : "Find branch for timeline {}/{} returned {} branches instead of 0 or 1",
395 0 : tenant_id,
396 0 : timeline_id,
397 0 : branches.len()
398 0 : ),
399 0 : ErrorKind::UnexpectedState,
400 0 : ))
401 : }
402 0 : }
403 :
404 0 : pub async fn list_pageservers(&self) -> Result<Vec<PageserverData>, Error> {
405 0 : let _permit = self
406 0 : .request_limiter
407 0 : .acquire()
408 0 : .await
409 0 : .expect("Semaphore is not closed");
410 :
411 0 : let response = self
412 0 : .http_client
413 0 : .get(self.append_url("/pageservers"))
414 0 : .header(header::ACCEPT, "application/json")
415 0 : .bearer_auth(&self.token)
416 0 : .send()
417 0 : .await
418 0 : .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::RequestSend(e)))?;
419 :
420 0 : let response: AdminApiResponse<Vec<PageserverData>> = response
421 0 : .json()
422 0 : .await
423 0 : .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::BodyRead(e)))?;
424 :
425 0 : Ok(response.data)
426 0 : }
427 :
428 0 : pub async fn list_safekeepers(&self) -> Result<Vec<SafekeeperData>, Error> {
429 0 : let _permit = self
430 0 : .request_limiter
431 0 : .acquire()
432 0 : .await
433 0 : .expect("Semaphore is not closed");
434 :
435 0 : let response = self
436 0 : .http_client
437 0 : .get(self.append_url("/safekeepers"))
438 0 : .header(header::ACCEPT, "application/json")
439 0 : .bearer_auth(&self.token)
440 0 : .send()
441 0 : .await
442 0 : .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::RequestSend(e)))?;
443 :
444 0 : let response: AdminApiResponse<Vec<SafekeeperData>> = response
445 0 : .json()
446 0 : .await
447 0 : .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::BodyRead(e)))?;
448 :
449 0 : Ok(response.data)
450 0 : }
451 :
452 0 : pub async fn projects_for_pageserver(
453 0 : &self,
454 0 : pageserver_id: u64,
455 0 : show_deleted: bool,
456 0 : ) -> Result<Vec<ProjectData>, Error> {
457 0 : let _permit = self
458 0 : .request_limiter
459 0 : .acquire()
460 0 : .await
461 0 : .expect("Semaphore is not closed");
462 :
463 0 : let response = self
464 0 : .http_client
465 0 : .get(self.append_url("/projects"))
466 0 : .query(&[
467 0 : ("pageserver_id", &pageserver_id.to_string()),
468 0 : ("show_deleted", &show_deleted.to_string()),
469 0 : ])
470 0 : .header(header::ACCEPT, "application/json")
471 0 : .bearer_auth(&self.token)
472 0 : .send()
473 0 : .await
474 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
475 :
476 0 : let response: AdminApiResponse<Vec<ProjectData>> = response
477 0 : .json()
478 0 : .await
479 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
480 :
481 0 : Ok(response.data)
482 0 : }
483 :
484 0 : pub async fn project_for_tenant(
485 0 : &self,
486 0 : tenant_id: TenantId,
487 0 : show_deleted: bool,
488 0 : ) -> Result<Option<ProjectData>, Error> {
489 0 : let _permit = self
490 0 : .request_limiter
491 0 : .acquire()
492 0 : .await
493 0 : .expect("Semaphore is not closed");
494 :
495 0 : let response = self
496 0 : .http_client
497 0 : .get(self.append_url("/projects"))
498 0 : .query(&[
499 0 : ("search", &tenant_id.to_string()),
500 0 : ("show_deleted", &show_deleted.to_string()),
501 0 : ])
502 0 : .header(header::ACCEPT, "application/json")
503 0 : .bearer_auth(&self.token)
504 0 : .send()
505 0 : .await
506 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
507 :
508 0 : let response: AdminApiResponse<Vec<ProjectData>> = response
509 0 : .json()
510 0 : .await
511 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
512 :
513 0 : match response.data.as_slice() {
514 0 : [] => Ok(None),
515 0 : [_single] => Ok(Some(response.data.into_iter().next().unwrap())),
516 0 : multiple => Err(Error::new(
517 0 : format!("Got more than one project for tenant {tenant_id} : {multiple:?}"),
518 0 : ErrorKind::UnexpectedState,
519 0 : )),
520 : }
521 0 : }
522 :
523 0 : pub async fn branches_for_project(
524 0 : &self,
525 0 : project_id: &ProjectId,
526 0 : show_deleted: bool,
527 0 : ) -> Result<Vec<BranchData>, Error> {
528 0 : let _permit = self
529 0 : .request_limiter
530 0 : .acquire()
531 0 : .await
532 0 : .expect("Semaphore is not closed");
533 :
534 0 : let response = self
535 0 : .http_client
536 0 : .get(self.append_url("/branches"))
537 0 : .query(&[
538 0 : ("project_id", &project_id.0),
539 0 : ("show_deleted", &show_deleted.to_string()),
540 0 : ])
541 0 : .header(header::ACCEPT, "application/json")
542 0 : .bearer_auth(&self.token)
543 0 : .send()
544 0 : .await
545 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
546 :
547 0 : let response: AdminApiResponse<Vec<BranchData>> = response
548 0 : .json()
549 0 : .await
550 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
551 :
552 0 : Ok(response.data)
553 0 : }
554 :
555 0 : fn append_url(&self, subpath: &str) -> Url {
556 0 : // TODO fugly, but `.join` does not work when called
557 0 : (self.base_url.to_string() + subpath)
558 0 : .parse()
559 0 : .unwrap_or_else(|e| panic!("Could not append {subpath} to base url: {e}"))
560 0 : }
561 :
562 0 : async fn with_retries<T, O, F>(op: O, description: &str) -> Result<T, Error>
563 0 : where
564 0 : O: FnMut() -> F,
565 0 : F: Future<Output = Result<T, Error>>,
566 0 : {
567 0 : let cancel = CancellationToken::new(); // not really used
568 0 : backoff::retry(op, |_| false, 1, 20, description, &cancel)
569 0 : .await
570 0 : .expect("cancellations are disabled")
571 0 : }
572 : }
|