Line data Source code
1 : use std::error::Error as _;
2 :
3 : use chrono::{DateTime, Utc};
4 : use futures::Future;
5 : use hex::FromHex;
6 :
7 : use reqwest::{header, Client, StatusCode, Url};
8 : use serde::Deserialize;
9 : use tokio::sync::Semaphore;
10 :
11 : use tokio_util::sync::CancellationToken;
12 : use utils::backoff;
13 : use utils::id::{TenantId, TimelineId};
14 : use utils::lsn::Lsn;
15 :
16 : use crate::ConsoleConfig;
17 :
18 : #[derive(Debug)]
19 : pub struct Error {
20 : context: String,
21 : kind: ErrorKind,
22 : }
23 :
24 : impl Error {
25 0 : fn new(context: String, kind: ErrorKind) -> Self {
26 0 : Self { context, kind }
27 0 : }
28 : }
29 :
30 : impl std::fmt::Display for Error {
31 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 0 : match &self.kind {
33 0 : ErrorKind::RequestSend(e) => write!(
34 0 : f,
35 0 : "Failed to send a request. Context: {}, error: {}{}",
36 0 : self.context,
37 0 : e,
38 0 : e.source().map(|e| format!(": {e}")).unwrap_or_default()
39 0 : ),
40 0 : ErrorKind::BodyRead(e) => {
41 0 : write!(
42 0 : f,
43 0 : "Failed to read a request body. Context: {}, error: {}{}",
44 0 : self.context,
45 0 : e,
46 0 : e.source().map(|e| format!(": {e}")).unwrap_or_default()
47 0 : )
48 : }
49 0 : ErrorKind::ResponseStatus(status) => {
50 0 : write!(f, "Bad response status {}: {}", status, self.context)
51 : }
52 0 : ErrorKind::UnexpectedState => write!(f, "Unexpected state: {}", self.context),
53 : }
54 0 : }
55 : }
56 :
57 0 : #[derive(Debug, Clone, serde::Deserialize, Hash, PartialEq, Eq)]
58 : #[serde(transparent)]
59 : pub struct ProjectId(pub String);
60 :
61 0 : #[derive(Clone, Debug, serde::Deserialize, Hash, PartialEq, Eq)]
62 : #[serde(transparent)]
63 : pub struct BranchId(pub String);
64 :
65 : impl std::error::Error for Error {}
66 :
67 : #[derive(Debug)]
68 : pub enum ErrorKind {
69 : RequestSend(reqwest::Error),
70 : BodyRead(reqwest::Error),
71 : ResponseStatus(StatusCode),
72 : UnexpectedState,
73 : }
74 :
75 : pub struct CloudAdminApiClient {
76 : request_limiter: Semaphore,
77 : token: String,
78 : base_url: Url,
79 : http_client: Client,
80 : }
81 :
82 0 : #[derive(Debug, serde::Deserialize)]
83 : struct AdminApiResponse<T> {
84 : data: T,
85 : total: Option<usize>,
86 : }
87 :
88 0 : #[derive(Debug, serde::Deserialize)]
89 : pub struct PageserverData {
90 : pub id: u64,
91 : pub created_at: DateTime<Utc>,
92 : pub updated_at: DateTime<Utc>,
93 : pub region_id: String,
94 : pub version: i64,
95 : pub instance_id: String,
96 : pub port: u16,
97 : pub http_host: String,
98 : pub http_port: u16,
99 : pub active: bool,
100 : pub projects_count: usize,
101 : pub availability_zone_id: String,
102 : }
103 :
104 0 : #[derive(Debug, Clone, serde::Deserialize)]
105 : pub struct SafekeeperData {
106 : pub id: u64,
107 : pub created_at: DateTime<Utc>,
108 : pub updated_at: DateTime<Utc>,
109 : pub region_id: String,
110 : pub version: i64,
111 : pub instance_id: String,
112 : pub active: bool,
113 : pub host: String,
114 : pub port: u16,
115 : pub projects_count: usize,
116 : pub availability_zone_id: String,
117 : }
118 :
119 : /// For ID fields, the Console API does not always return a value or null. It will
120 : /// sometimes return an empty string. Our native Id type does not consider this acceptable
121 : /// (nor should it), so we use a wrapper for talking to the Console API.
122 0 : fn from_nullable_id<'de, D>(deserializer: D) -> Result<TenantId, D::Error>
123 0 : where
124 0 : D: serde::de::Deserializer<'de>,
125 0 : {
126 0 : if deserializer.is_human_readable() {
127 0 : let id_str = String::deserialize(deserializer)?;
128 0 : if id_str.is_empty() {
129 : // This is a bogus value, but for the purposes of the scrubber all that
130 : // matters is that it doesn't collide with any real IDs.
131 0 : Ok(TenantId::from([0u8; 16]))
132 : } else {
133 0 : TenantId::from_hex(&id_str).map_err(|e| serde::de::Error::custom(format!("{e}")))
134 : }
135 : } else {
136 0 : let id_arr = <[u8; 16]>::deserialize(deserializer)?;
137 0 : Ok(TenantId::from(id_arr))
138 : }
139 0 : }
140 :
141 0 : #[derive(Debug, Clone, serde::Deserialize)]
142 : pub struct ProjectData {
143 : pub id: ProjectId,
144 : pub name: String,
145 : pub region_id: String,
146 : pub platform_id: String,
147 : pub user_id: Option<String>,
148 : pub pageserver_id: Option<u64>,
149 : #[serde(deserialize_with = "from_nullable_id")]
150 : pub tenant: TenantId,
151 : pub safekeepers: Vec<SafekeeperData>,
152 : pub deleted: bool,
153 : pub created_at: DateTime<Utc>,
154 : pub updated_at: DateTime<Utc>,
155 : pub pg_version: u32,
156 : pub max_project_size: i64,
157 : pub remote_storage_size: u64,
158 : pub resident_size: u64,
159 : pub synthetic_storage_size: u64,
160 : pub compute_time: u64,
161 : pub data_transfer: u64,
162 : pub data_storage: u64,
163 : pub maintenance_set: Option<String>,
164 : }
165 :
166 0 : #[derive(Debug, Clone, serde::Deserialize)]
167 : pub struct BranchData {
168 : pub id: BranchId,
169 : pub created_at: DateTime<Utc>,
170 : pub updated_at: DateTime<Utc>,
171 : pub name: String,
172 : pub project_id: ProjectId,
173 : pub timeline_id: TimelineId,
174 : #[serde(default)]
175 : pub parent_id: Option<BranchId>,
176 : #[serde(default)]
177 : pub parent_lsn: Option<Lsn>,
178 : pub default: bool,
179 : pub deleted: bool,
180 : pub logical_size: Option<u64>,
181 : pub physical_size: Option<u64>,
182 : pub written_size: Option<u64>,
183 : }
184 :
185 : pub trait MaybeDeleted {
186 : fn is_deleted(&self) -> bool;
187 : }
188 :
189 : impl MaybeDeleted for ProjectData {
190 0 : fn is_deleted(&self) -> bool {
191 0 : self.deleted
192 0 : }
193 : }
194 :
195 : impl MaybeDeleted for BranchData {
196 0 : fn is_deleted(&self) -> bool {
197 0 : self.deleted
198 0 : }
199 : }
200 :
201 : impl CloudAdminApiClient {
202 0 : pub fn new(config: ConsoleConfig) -> Self {
203 0 : Self {
204 0 : token: config.token,
205 0 : base_url: config.base_url,
206 0 : request_limiter: Semaphore::new(200),
207 0 : http_client: Client::new(), // TODO timeout configs at least
208 0 : }
209 0 : }
210 :
211 0 : pub async fn find_tenant_project(
212 0 : &self,
213 0 : tenant_id: TenantId,
214 0 : ) -> Result<Option<ProjectData>, Error> {
215 0 : let _permit = self
216 0 : .request_limiter
217 0 : .acquire()
218 0 : .await
219 0 : .expect("Semaphore is not closed");
220 :
221 0 : let response = CloudAdminApiClient::with_retries(
222 0 : || async {
223 0 : let response = self
224 0 : .http_client
225 0 : .get(self.append_url("/projects"))
226 0 : .query(&[
227 0 : ("tenant_id", tenant_id.to_string()),
228 0 : ("show_deleted", "true".to_string()),
229 0 : ])
230 0 : .header(header::ACCEPT, "application/json")
231 0 : .bearer_auth(&self.token)
232 0 : .send()
233 0 : .await
234 0 : .map_err(|e| {
235 0 : Error::new(
236 0 : "Find project for tenant".to_string(),
237 0 : ErrorKind::RequestSend(e),
238 0 : )
239 0 : })?;
240 :
241 0 : let response: AdminApiResponse<Vec<ProjectData>> =
242 0 : response.json().await.map_err(|e| {
243 0 : Error::new(
244 0 : "Find project for tenant".to_string(),
245 0 : ErrorKind::BodyRead(e),
246 0 : )
247 0 : })?;
248 0 : Ok(response)
249 0 : },
250 0 : "find_tenant_project",
251 0 : )
252 0 : .await?;
253 :
254 0 : match response.data.len() {
255 0 : 0 => Ok(None),
256 0 : 1 => Ok(Some(
257 0 : response
258 0 : .data
259 0 : .into_iter()
260 0 : .next()
261 0 : .expect("Should have exactly one element"),
262 0 : )),
263 0 : too_many => Err(Error::new(
264 0 : format!("Find project for tenant returned {too_many} projects instead of 0 or 1"),
265 0 : ErrorKind::UnexpectedState,
266 0 : )),
267 : }
268 0 : }
269 :
270 0 : pub async fn list_projects(&self) -> Result<Vec<ProjectData>, Error> {
271 0 : let _permit = self
272 0 : .request_limiter
273 0 : .acquire()
274 0 : .await
275 0 : .expect("Semaphore is not closed");
276 0 :
277 0 : let mut pagination_offset = 0;
278 : const PAGINATION_LIMIT: usize = 512;
279 0 : let mut result: Vec<ProjectData> = Vec::with_capacity(PAGINATION_LIMIT);
280 : loop {
281 0 : let response_bytes = CloudAdminApiClient::with_retries(
282 0 : || async {
283 0 : let response = self
284 0 : .http_client
285 0 : .get(self.append_url("/projects"))
286 0 : .query(&[
287 0 : ("show_deleted", "false".to_string()),
288 0 : ("limit", format!("{PAGINATION_LIMIT}")),
289 0 : ("offset", format!("{pagination_offset}")),
290 0 : ])
291 0 : .header(header::ACCEPT, "application/json")
292 0 : .bearer_auth(&self.token)
293 0 : .send()
294 0 : .await
295 0 : .map_err(|e| {
296 0 : Error::new(
297 0 : "List active projects".to_string(),
298 0 : ErrorKind::RequestSend(e),
299 0 : )
300 0 : })?;
301 :
302 0 : response.bytes().await.map_err(|e| {
303 0 : Error::new("List active projects".to_string(), ErrorKind::BodyRead(e))
304 0 : })
305 0 : },
306 0 : "list_projects",
307 0 : )
308 0 : .await?;
309 :
310 0 : let decode_result =
311 0 : serde_json::from_slice::<AdminApiResponse<Vec<ProjectData>>>(&response_bytes);
312 :
313 0 : let mut response = match decode_result {
314 0 : Ok(r) => r,
315 0 : Err(decode) => {
316 0 : tracing::error!(
317 0 : "Failed to decode response body: {}\n{}",
318 0 : decode,
319 0 : String::from_utf8(response_bytes.to_vec()).unwrap()
320 : );
321 0 : panic!("we out");
322 : }
323 : };
324 :
325 0 : pagination_offset += response.data.len();
326 0 :
327 0 : result.append(&mut response.data);
328 0 :
329 0 : if pagination_offset >= response.total.unwrap_or(0) {
330 0 : break;
331 0 : }
332 : }
333 :
334 0 : Ok(result)
335 0 : }
336 :
337 0 : pub async fn find_timeline_branch(
338 0 : &self,
339 0 : tenant_id: TenantId,
340 0 : timeline_id: TimelineId,
341 0 : ) -> Result<Option<BranchData>, Error> {
342 0 : let _permit = self
343 0 : .request_limiter
344 0 : .acquire()
345 0 : .await
346 0 : .expect("Semaphore is not closed");
347 :
348 0 : let response = CloudAdminApiClient::with_retries(
349 0 : || async {
350 0 : let response = self
351 0 : .http_client
352 0 : .get(self.append_url("/branches"))
353 0 : .query(&[
354 0 : ("timeline_id", timeline_id.to_string()),
355 0 : ("show_deleted", "true".to_string()),
356 0 : ])
357 0 : .header(header::ACCEPT, "application/json")
358 0 : .bearer_auth(&self.token)
359 0 : .send()
360 0 : .await
361 0 : .map_err(|e| {
362 0 : Error::new(
363 0 : "Find branch for timeline".to_string(),
364 0 : ErrorKind::RequestSend(e),
365 0 : )
366 0 : })?;
367 :
368 0 : let response: AdminApiResponse<Vec<BranchData>> =
369 0 : response.json().await.map_err(|e| {
370 0 : Error::new(
371 0 : "Find branch for timeline".to_string(),
372 0 : ErrorKind::BodyRead(e),
373 0 : )
374 0 : })?;
375 0 : Ok(response)
376 0 : },
377 0 : "find_timeline_branch",
378 0 : )
379 0 : .await?;
380 :
381 0 : let mut branches: Vec<BranchData> = response.data.into_iter().collect();
382 : // Normally timeline_id is unique. However, we do have at least one case
383 : // of the same timeline_id in two different projects, apparently after
384 : // manual recovery. So always recheck project_id (discovered through
385 : // tenant_id).
386 0 : let project_data = match self.find_tenant_project(tenant_id).await? {
387 0 : Some(pd) => pd,
388 0 : None => return Ok(None),
389 : };
390 0 : branches.retain(|b| b.project_id == project_data.id);
391 0 : if branches.len() < 2 {
392 0 : Ok(branches.first().cloned())
393 : } else {
394 0 : Err(Error::new(
395 0 : format!(
396 0 : "Find branch for timeline {}/{} returned {} branches instead of 0 or 1",
397 0 : tenant_id,
398 0 : timeline_id,
399 0 : branches.len()
400 0 : ),
401 0 : ErrorKind::UnexpectedState,
402 0 : ))
403 : }
404 0 : }
405 :
406 0 : pub async fn list_pageservers(&self) -> Result<Vec<PageserverData>, Error> {
407 0 : let _permit = self
408 0 : .request_limiter
409 0 : .acquire()
410 0 : .await
411 0 : .expect("Semaphore is not closed");
412 :
413 0 : let response = self
414 0 : .http_client
415 0 : .get(self.append_url("/pageservers"))
416 0 : .header(header::ACCEPT, "application/json")
417 0 : .bearer_auth(&self.token)
418 0 : .send()
419 0 : .await
420 0 : .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::RequestSend(e)))?;
421 :
422 0 : let response: AdminApiResponse<Vec<PageserverData>> = response
423 0 : .json()
424 0 : .await
425 0 : .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::BodyRead(e)))?;
426 :
427 0 : Ok(response.data)
428 0 : }
429 :
430 0 : pub async fn list_safekeepers(&self) -> Result<Vec<SafekeeperData>, Error> {
431 0 : let _permit = self
432 0 : .request_limiter
433 0 : .acquire()
434 0 : .await
435 0 : .expect("Semaphore is not closed");
436 :
437 0 : let response = self
438 0 : .http_client
439 0 : .get(self.append_url("/safekeepers"))
440 0 : .header(header::ACCEPT, "application/json")
441 0 : .bearer_auth(&self.token)
442 0 : .send()
443 0 : .await
444 0 : .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::RequestSend(e)))?;
445 :
446 0 : let response: AdminApiResponse<Vec<SafekeeperData>> = response
447 0 : .json()
448 0 : .await
449 0 : .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::BodyRead(e)))?;
450 :
451 0 : Ok(response.data)
452 0 : }
453 :
454 0 : pub async fn projects_for_pageserver(
455 0 : &self,
456 0 : pageserver_id: u64,
457 0 : show_deleted: bool,
458 0 : ) -> Result<Vec<ProjectData>, Error> {
459 0 : let _permit = self
460 0 : .request_limiter
461 0 : .acquire()
462 0 : .await
463 0 : .expect("Semaphore is not closed");
464 :
465 0 : let response = self
466 0 : .http_client
467 0 : .get(self.append_url("/projects"))
468 0 : .query(&[
469 0 : ("pageserver_id", &pageserver_id.to_string()),
470 0 : ("show_deleted", &show_deleted.to_string()),
471 0 : ])
472 0 : .header(header::ACCEPT, "application/json")
473 0 : .bearer_auth(&self.token)
474 0 : .send()
475 0 : .await
476 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
477 :
478 0 : let response: AdminApiResponse<Vec<ProjectData>> = response
479 0 : .json()
480 0 : .await
481 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
482 :
483 0 : Ok(response.data)
484 0 : }
485 :
486 0 : pub async fn project_for_tenant(
487 0 : &self,
488 0 : tenant_id: TenantId,
489 0 : show_deleted: bool,
490 0 : ) -> Result<Option<ProjectData>, Error> {
491 0 : let _permit = self
492 0 : .request_limiter
493 0 : .acquire()
494 0 : .await
495 0 : .expect("Semaphore is not closed");
496 :
497 0 : let response = self
498 0 : .http_client
499 0 : .get(self.append_url("/projects"))
500 0 : .query(&[
501 0 : ("search", &tenant_id.to_string()),
502 0 : ("show_deleted", &show_deleted.to_string()),
503 0 : ])
504 0 : .header(header::ACCEPT, "application/json")
505 0 : .bearer_auth(&self.token)
506 0 : .send()
507 0 : .await
508 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
509 :
510 0 : let response: AdminApiResponse<Vec<ProjectData>> = response
511 0 : .json()
512 0 : .await
513 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
514 :
515 0 : match response.data.as_slice() {
516 0 : [] => Ok(None),
517 0 : [_single] => Ok(Some(response.data.into_iter().next().unwrap())),
518 0 : multiple => Err(Error::new(
519 0 : format!("Got more than one project for tenant {tenant_id} : {multiple:?}"),
520 0 : ErrorKind::UnexpectedState,
521 0 : )),
522 : }
523 0 : }
524 :
525 0 : pub async fn branches_for_project(
526 0 : &self,
527 0 : project_id: &ProjectId,
528 0 : show_deleted: bool,
529 0 : ) -> Result<Vec<BranchData>, Error> {
530 0 : let _permit = self
531 0 : .request_limiter
532 0 : .acquire()
533 0 : .await
534 0 : .expect("Semaphore is not closed");
535 :
536 0 : let response = self
537 0 : .http_client
538 0 : .get(self.append_url("/branches"))
539 0 : .query(&[
540 0 : ("project_id", &project_id.0),
541 0 : ("show_deleted", &show_deleted.to_string()),
542 0 : ])
543 0 : .header(header::ACCEPT, "application/json")
544 0 : .bearer_auth(&self.token)
545 0 : .send()
546 0 : .await
547 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
548 :
549 0 : let response: AdminApiResponse<Vec<BranchData>> = response
550 0 : .json()
551 0 : .await
552 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
553 :
554 0 : Ok(response.data)
555 0 : }
556 :
557 0 : fn append_url(&self, subpath: &str) -> Url {
558 0 : // TODO fugly, but `.join` does not work when called
559 0 : (self.base_url.to_string() + subpath)
560 0 : .parse()
561 0 : .unwrap_or_else(|e| panic!("Could not append {subpath} to base url: {e}"))
562 0 : }
563 :
564 0 : async fn with_retries<T, O, F>(op: O, description: &str) -> Result<T, Error>
565 0 : where
566 0 : O: FnMut() -> F,
567 0 : F: Future<Output = Result<T, Error>>,
568 0 : {
569 0 : let cancel = CancellationToken::new(); // not really used
570 0 : backoff::retry(op, |_| false, 1, 20, description, &cancel)
571 0 : .await
572 0 : .expect("cancellations are disabled")
573 0 : }
574 : }
|