Line data Source code
1 : #![allow(unused)]
2 :
3 : use std::str::FromStr;
4 : use std::time::Duration;
5 :
6 : use chrono::{DateTime, Utc};
7 : use hex::FromHex;
8 : use pageserver::tenant::Tenant;
9 : use reqwest::{header, Client, StatusCode, Url};
10 : use serde::Deserialize;
11 : use tokio::sync::Semaphore;
12 :
13 : use utils::id::{TenantId, TimelineId};
14 : use utils::lsn::Lsn;
15 :
16 : use crate::ConsoleConfig;
17 :
18 0 : #[derive(Debug)]
19 : pub struct Error {
20 : context: String,
21 : kind: ErrorKind,
22 : }
23 :
24 : impl Error {
25 0 : fn new(context: String, kind: ErrorKind) -> Self {
26 0 : Self { context, kind }
27 0 : }
28 : }
29 :
30 : impl std::fmt::Display for Error {
31 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 0 : match &self.kind {
33 0 : ErrorKind::RequestSend(e) => write!(
34 0 : f,
35 0 : "Failed to send a request. Context: {}, error: {}",
36 0 : self.context, e
37 0 : ),
38 0 : ErrorKind::BodyRead(e) => {
39 0 : write!(
40 0 : f,
41 0 : "Failed to read a request body. Context: {}, error: {}",
42 0 : self.context, e
43 0 : )
44 : }
45 0 : ErrorKind::ResponseStatus(status) => {
46 0 : write!(f, "Bad response status {}: {}", status, self.context)
47 : }
48 0 : ErrorKind::UnexpectedState => write!(f, "Unexpected state: {}", self.context),
49 : }
50 0 : }
51 : }
52 :
53 0 : #[derive(Debug, Clone, serde::Deserialize, Hash, PartialEq, Eq)]
54 : #[serde(transparent)]
55 : pub struct ProjectId(pub String);
56 :
57 0 : #[derive(Clone, Debug, serde::Deserialize, Hash, PartialEq, Eq)]
58 : #[serde(transparent)]
59 : pub struct BranchId(pub String);
60 :
61 : impl std::error::Error for Error {}
62 :
63 0 : #[derive(Debug)]
64 : pub enum ErrorKind {
65 : RequestSend(reqwest::Error),
66 : BodyRead(reqwest::Error),
67 : ResponseStatus(StatusCode),
68 : UnexpectedState,
69 : }
70 :
71 : pub struct CloudAdminApiClient {
72 : request_limiter: Semaphore,
73 : token: String,
74 : base_url: Url,
75 : http_client: Client,
76 : }
77 :
78 0 : #[derive(Debug, serde::Deserialize)]
79 : struct AdminApiResponse<T> {
80 : data: T,
81 : total: Option<usize>,
82 : }
83 :
84 0 : #[derive(Debug, serde::Deserialize)]
85 : pub struct PageserverData {
86 : pub id: u64,
87 : pub created_at: DateTime<Utc>,
88 : pub updated_at: DateTime<Utc>,
89 : pub region_id: String,
90 : pub version: i64,
91 : pub instance_id: String,
92 : pub port: u16,
93 : pub http_host: String,
94 : pub http_port: u16,
95 : pub active: bool,
96 : pub projects_count: usize,
97 : pub availability_zone_id: String,
98 : }
99 :
100 0 : #[derive(Debug, Clone, serde::Deserialize)]
101 : pub struct SafekeeperData {
102 : pub id: u64,
103 : pub created_at: DateTime<Utc>,
104 : pub updated_at: DateTime<Utc>,
105 : pub region_id: String,
106 : pub version: i64,
107 : pub instance_id: String,
108 : pub active: bool,
109 : pub host: String,
110 : pub port: u16,
111 : pub projects_count: usize,
112 : pub availability_zone_id: String,
113 : }
114 :
115 : /// For ID fields, the Console API does not always return a value or null. It will
116 : /// sometimes return an empty string. Our native Id type does not consider this acceptable
117 : /// (nor should it), so we use a wrapper for talking to the Console API.
118 0 : fn from_nullable_id<'de, D>(deserializer: D) -> Result<TenantId, D::Error>
119 0 : where
120 0 : D: serde::de::Deserializer<'de>,
121 0 : {
122 0 : if deserializer.is_human_readable() {
123 0 : let id_str = String::deserialize(deserializer)?;
124 0 : if id_str.is_empty() {
125 : // This is a bogus value, but for the purposes of the scrubber all that
126 : // matters is that it doesn't collide with any real IDs.
127 0 : Ok(TenantId::from([0u8; 16]))
128 : } else {
129 0 : TenantId::from_hex(&id_str).map_err(|e| serde::de::Error::custom(format!("{e}")))
130 : }
131 : } else {
132 0 : let id_arr = <[u8; 16]>::deserialize(deserializer)?;
133 0 : Ok(TenantId::from(id_arr))
134 : }
135 0 : }
136 :
137 0 : #[derive(Debug, Clone, serde::Deserialize)]
138 : pub struct ProjectData {
139 : pub id: ProjectId,
140 : pub name: String,
141 : pub region_id: String,
142 : pub platform_id: String,
143 : pub user_id: String,
144 : pub pageserver_id: u64,
145 : #[serde(deserialize_with = "from_nullable_id")]
146 : pub tenant: TenantId,
147 : pub safekeepers: Vec<SafekeeperData>,
148 : pub deleted: bool,
149 : pub created_at: DateTime<Utc>,
150 : pub updated_at: DateTime<Utc>,
151 : pub pg_version: u32,
152 : pub max_project_size: u64,
153 : pub remote_storage_size: u64,
154 : pub resident_size: u64,
155 : pub synthetic_storage_size: u64,
156 : pub compute_time: u64,
157 : pub data_transfer: u64,
158 : pub data_storage: u64,
159 : pub maintenance_set: Option<String>,
160 : }
161 :
162 0 : #[derive(Debug, serde::Deserialize)]
163 : pub struct BranchData {
164 : pub id: BranchId,
165 : pub created_at: DateTime<Utc>,
166 : pub updated_at: DateTime<Utc>,
167 : pub name: String,
168 : pub project_id: ProjectId,
169 : pub timeline_id: TimelineId,
170 : #[serde(default)]
171 : pub parent_id: Option<BranchId>,
172 : #[serde(default)]
173 : pub parent_lsn: Option<Lsn>,
174 : pub default: bool,
175 : pub deleted: bool,
176 : pub logical_size: Option<u64>,
177 : pub physical_size: Option<u64>,
178 : pub written_size: Option<u64>,
179 : }
180 :
181 : pub trait MaybeDeleted {
182 : fn is_deleted(&self) -> bool;
183 : }
184 :
185 : impl MaybeDeleted for ProjectData {
186 0 : fn is_deleted(&self) -> bool {
187 0 : self.deleted
188 0 : }
189 : }
190 :
191 : impl MaybeDeleted for BranchData {
192 0 : fn is_deleted(&self) -> bool {
193 0 : self.deleted
194 0 : }
195 : }
196 :
197 : impl CloudAdminApiClient {
198 0 : pub fn new(config: ConsoleConfig) -> Self {
199 0 : Self {
200 0 : token: config.token,
201 0 : base_url: config.base_url,
202 0 : request_limiter: Semaphore::new(200),
203 0 : http_client: Client::new(), // TODO timeout configs at least
204 0 : }
205 0 : }
206 :
207 0 : pub async fn find_tenant_project(
208 0 : &self,
209 0 : tenant_id: TenantId,
210 0 : ) -> Result<Option<ProjectData>, Error> {
211 0 : let _permit = self
212 0 : .request_limiter
213 0 : .acquire()
214 0 : .await
215 0 : .expect("Semaphore is not closed");
216 :
217 0 : let response = self
218 0 : .http_client
219 0 : .get(self.append_url("/projects"))
220 0 : .query(&[
221 0 : ("tenant_id", tenant_id.to_string()),
222 0 : ("show_deleted", "true".to_string()),
223 0 : ])
224 0 : .header(header::ACCEPT, "application/json")
225 0 : .bearer_auth(&self.token)
226 0 : .send()
227 0 : .await
228 0 : .map_err(|e| {
229 0 : Error::new(
230 0 : "Find project for tenant".to_string(),
231 0 : ErrorKind::RequestSend(e),
232 0 : )
233 0 : })?;
234 :
235 0 : let response: AdminApiResponse<Vec<ProjectData>> = response.json().await.map_err(|e| {
236 0 : Error::new(
237 0 : "Find project for tenant".to_string(),
238 0 : ErrorKind::BodyRead(e),
239 0 : )
240 0 : })?;
241 0 : match response.data.len() {
242 0 : 0 => Ok(None),
243 0 : 1 => Ok(Some(
244 0 : response
245 0 : .data
246 0 : .into_iter()
247 0 : .next()
248 0 : .expect("Should have exactly one element"),
249 0 : )),
250 0 : too_many => Err(Error::new(
251 0 : format!("Find project for tenant returned {too_many} projects instead of 0 or 1"),
252 0 : ErrorKind::UnexpectedState,
253 0 : )),
254 : }
255 0 : }
256 :
257 0 : pub async fn list_projects(&self, region_id: String) -> Result<Vec<ProjectData>, Error> {
258 0 : let _permit = self
259 0 : .request_limiter
260 0 : .acquire()
261 0 : .await
262 0 : .expect("Semaphore is not closed");
263 0 :
264 0 : let mut pagination_offset = 0;
265 0 : const PAGINATION_LIMIT: usize = 512;
266 0 : let mut result: Vec<ProjectData> = Vec::with_capacity(PAGINATION_LIMIT);
267 : loop {
268 0 : let response = self
269 0 : .http_client
270 0 : .get(self.append_url("/projects"))
271 0 : .query(&[
272 0 : ("show_deleted", "false".to_string()),
273 0 : ("limit", format!("{PAGINATION_LIMIT}")),
274 0 : ("offset", format!("{pagination_offset}")),
275 0 : ])
276 0 : .header(header::ACCEPT, "application/json")
277 0 : .bearer_auth(&self.token)
278 0 : .send()
279 0 : .await
280 0 : .map_err(|e| {
281 0 : Error::new(
282 0 : "List active projects".to_string(),
283 0 : ErrorKind::RequestSend(e),
284 0 : )
285 0 : })?;
286 :
287 0 : match response.status() {
288 0 : StatusCode::OK => {}
289 : StatusCode::SERVICE_UNAVAILABLE | StatusCode::TOO_MANY_REQUESTS => {
290 0 : tokio::time::sleep(Duration::from_millis(500)).await;
291 0 : continue;
292 : }
293 0 : status => {
294 0 : return Err(Error::new(
295 0 : "List active projects".to_string(),
296 0 : ErrorKind::ResponseStatus(response.status()),
297 0 : ))
298 : }
299 : }
300 :
301 0 : let response_bytes = response.bytes().await.map_err(|e| {
302 0 : Error::new("List active projects".to_string(), ErrorKind::BodyRead(e))
303 0 : })?;
304 :
305 0 : let decode_result =
306 0 : serde_json::from_slice::<AdminApiResponse<Vec<ProjectData>>>(&response_bytes);
307 :
308 0 : let mut response = match decode_result {
309 0 : Ok(r) => r,
310 0 : Err(decode) => {
311 0 : tracing::error!(
312 0 : "Failed to decode response body: {}\n{}",
313 0 : decode,
314 0 : String::from_utf8(response_bytes.to_vec()).unwrap()
315 0 : );
316 0 : panic!("we out");
317 : }
318 : };
319 :
320 0 : pagination_offset += response.data.len();
321 0 :
322 0 : result.extend(response.data.drain(..).filter(|t| t.region_id == region_id));
323 0 :
324 0 : if pagination_offset >= response.total.unwrap_or(0) {
325 0 : break;
326 0 : }
327 : }
328 :
329 0 : Ok(result)
330 0 : }
331 :
332 0 : pub async fn find_timeline_branch(
333 0 : &self,
334 0 : timeline_id: TimelineId,
335 0 : ) -> Result<Option<BranchData>, Error> {
336 0 : let _permit = self
337 0 : .request_limiter
338 0 : .acquire()
339 0 : .await
340 0 : .expect("Semaphore is not closed");
341 :
342 0 : let response = self
343 0 : .http_client
344 0 : .get(self.append_url("/branches"))
345 0 : .query(&[
346 0 : ("timeline_id", timeline_id.to_string()),
347 0 : ("show_deleted", "true".to_string()),
348 0 : ])
349 0 : .header(header::ACCEPT, "application/json")
350 0 : .bearer_auth(&self.token)
351 0 : .send()
352 0 : .await
353 0 : .map_err(|e| {
354 0 : Error::new(
355 0 : "Find branch for timeline".to_string(),
356 0 : ErrorKind::RequestSend(e),
357 0 : )
358 0 : })?;
359 :
360 0 : let response: AdminApiResponse<Vec<BranchData>> = response.json().await.map_err(|e| {
361 0 : Error::new(
362 0 : "Find branch for timeline".to_string(),
363 0 : ErrorKind::BodyRead(e),
364 0 : )
365 0 : })?;
366 0 : match response.data.len() {
367 0 : 0 => Ok(None),
368 0 : 1 => Ok(Some(
369 0 : response
370 0 : .data
371 0 : .into_iter()
372 0 : .next()
373 0 : .expect("Should have exactly one element"),
374 0 : )),
375 0 : too_many => Err(Error::new(
376 0 : format!("Find branch for timeline returned {too_many} branches instead of 0 or 1"),
377 0 : ErrorKind::UnexpectedState,
378 0 : )),
379 : }
380 0 : }
381 :
382 0 : pub async fn list_pageservers(&self) -> Result<Vec<PageserverData>, Error> {
383 0 : let _permit = self
384 0 : .request_limiter
385 0 : .acquire()
386 0 : .await
387 0 : .expect("Semaphore is not closed");
388 :
389 0 : let response = self
390 0 : .http_client
391 0 : .get(self.append_url("/pageservers"))
392 0 : .header(header::ACCEPT, "application/json")
393 0 : .bearer_auth(&self.token)
394 0 : .send()
395 0 : .await
396 0 : .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::RequestSend(e)))?;
397 :
398 0 : let response: AdminApiResponse<Vec<PageserverData>> = response
399 0 : .json()
400 0 : .await
401 0 : .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::BodyRead(e)))?;
402 :
403 0 : Ok(response.data)
404 0 : }
405 :
406 0 : pub async fn list_safekeepers(&self) -> Result<Vec<SafekeeperData>, Error> {
407 0 : let _permit = self
408 0 : .request_limiter
409 0 : .acquire()
410 0 : .await
411 0 : .expect("Semaphore is not closed");
412 :
413 0 : let response = self
414 0 : .http_client
415 0 : .get(self.append_url("/safekeepers"))
416 0 : .header(header::ACCEPT, "application/json")
417 0 : .bearer_auth(&self.token)
418 0 : .send()
419 0 : .await
420 0 : .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::RequestSend(e)))?;
421 :
422 0 : let response: AdminApiResponse<Vec<SafekeeperData>> = response
423 0 : .json()
424 0 : .await
425 0 : .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::BodyRead(e)))?;
426 :
427 0 : Ok(response.data)
428 0 : }
429 :
430 0 : pub async fn projects_for_pageserver(
431 0 : &self,
432 0 : pageserver_id: u64,
433 0 : show_deleted: bool,
434 0 : ) -> Result<Vec<ProjectData>, Error> {
435 0 : let _permit = self
436 0 : .request_limiter
437 0 : .acquire()
438 0 : .await
439 0 : .expect("Semaphore is not closed");
440 :
441 0 : let response = self
442 0 : .http_client
443 0 : .get(self.append_url("/projects"))
444 0 : .query(&[
445 0 : ("pageserver_id", &pageserver_id.to_string()),
446 0 : ("show_deleted", &show_deleted.to_string()),
447 0 : ])
448 0 : .header(header::ACCEPT, "application/json")
449 0 : .bearer_auth(&self.token)
450 0 : .send()
451 0 : .await
452 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
453 :
454 0 : let response: AdminApiResponse<Vec<ProjectData>> = response
455 0 : .json()
456 0 : .await
457 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
458 :
459 0 : Ok(response.data)
460 0 : }
461 :
462 0 : pub async fn project_for_tenant(
463 0 : &self,
464 0 : tenant_id: TenantId,
465 0 : show_deleted: bool,
466 0 : ) -> Result<Option<ProjectData>, Error> {
467 0 : let _permit = self
468 0 : .request_limiter
469 0 : .acquire()
470 0 : .await
471 0 : .expect("Semaphore is not closed");
472 :
473 0 : let response = self
474 0 : .http_client
475 0 : .get(self.append_url("/projects"))
476 0 : .query(&[
477 0 : ("search", &tenant_id.to_string()),
478 0 : ("show_deleted", &show_deleted.to_string()),
479 0 : ])
480 0 : .header(header::ACCEPT, "application/json")
481 0 : .bearer_auth(&self.token)
482 0 : .send()
483 0 : .await
484 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
485 :
486 0 : let response: AdminApiResponse<Vec<ProjectData>> = response
487 0 : .json()
488 0 : .await
489 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
490 :
491 0 : match response.data.as_slice() {
492 0 : [] => Ok(None),
493 0 : [_single] => Ok(Some(response.data.into_iter().next().unwrap())),
494 0 : multiple => Err(Error::new(
495 0 : format!("Got more than one project for tenant {tenant_id} : {multiple:?}"),
496 0 : ErrorKind::UnexpectedState,
497 0 : )),
498 : }
499 0 : }
500 :
501 0 : pub async fn branches_for_project(
502 0 : &self,
503 0 : project_id: &ProjectId,
504 0 : show_deleted: bool,
505 0 : ) -> Result<Vec<BranchData>, Error> {
506 0 : let _permit = self
507 0 : .request_limiter
508 0 : .acquire()
509 0 : .await
510 0 : .expect("Semaphore is not closed");
511 :
512 0 : let response = self
513 0 : .http_client
514 0 : .get(self.append_url("/branches"))
515 0 : .query(&[
516 0 : ("project_id", &project_id.0),
517 0 : ("show_deleted", &show_deleted.to_string()),
518 0 : ])
519 0 : .header(header::ACCEPT, "application/json")
520 0 : .bearer_auth(&self.token)
521 0 : .send()
522 0 : .await
523 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
524 :
525 0 : let response: AdminApiResponse<Vec<BranchData>> = response
526 0 : .json()
527 0 : .await
528 0 : .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
529 :
530 0 : Ok(response.data)
531 0 : }
532 :
533 0 : fn append_url(&self, subpath: &str) -> Url {
534 0 : // TODO fugly, but `.join` does not work when called
535 0 : (self.base_url.to_string() + subpath)
536 0 : .parse()
537 0 : .unwrap_or_else(|e| panic!("Could not append {subpath} to base url: {e}"))
538 0 : }
539 : }
|