LCOV - code coverage report
Current view: top level - storage_scrubber/src - cloud_admin_api.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 0.0 % 370 0
Test Date: 2025-02-20 13:11:02 Functions: 0.0 % 109 0

            Line data    Source code
       1              : use std::error::Error as _;
       2              : 
       3              : use chrono::{DateTime, Utc};
       4              : use futures::Future;
       5              : use hex::FromHex;
       6              : 
       7              : use reqwest::{header, Client, StatusCode, Url};
       8              : use serde::Deserialize;
       9              : use tokio::sync::Semaphore;
      10              : 
      11              : use tokio_util::sync::CancellationToken;
      12              : use utils::backoff;
      13              : use utils::id::{TenantId, TimelineId};
      14              : use utils::lsn::Lsn;
      15              : 
      16              : use crate::ConsoleConfig;
      17              : 
      18              : #[derive(Debug)]
      19              : pub struct Error {
      20              :     context: String,
      21              :     kind: ErrorKind,
      22              : }
      23              : 
      24              : impl Error {
      25            0 :     fn new(context: String, kind: ErrorKind) -> Self {
      26            0 :         Self { context, kind }
      27            0 :     }
      28              : }
      29              : 
      30              : impl std::fmt::Display for Error {
      31            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      32            0 :         match &self.kind {
      33            0 :             ErrorKind::RequestSend(e) => write!(
      34            0 :                 f,
      35            0 :                 "Failed to send a request. Context: {}, error: {}{}",
      36            0 :                 self.context,
      37            0 :                 e,
      38            0 :                 e.source().map(|e| format!(": {e}")).unwrap_or_default()
      39            0 :             ),
      40            0 :             ErrorKind::BodyRead(e) => {
      41            0 :                 write!(
      42            0 :                     f,
      43            0 :                     "Failed to read a request body. Context: {}, error: {}{}",
      44            0 :                     self.context,
      45            0 :                     e,
      46            0 :                     e.source().map(|e| format!(": {e}")).unwrap_or_default()
      47            0 :                 )
      48              :             }
      49            0 :             ErrorKind::ResponseStatus(status) => {
      50            0 :                 write!(f, "Bad response status {}: {}", status, self.context)
      51              :             }
      52            0 :             ErrorKind::UnexpectedState => write!(f, "Unexpected state: {}", self.context),
      53              :         }
      54            0 :     }
      55              : }
      56              : 
      57            0 : #[derive(Debug, Clone, serde::Deserialize, Hash, PartialEq, Eq)]
      58              : #[serde(transparent)]
      59              : pub struct ProjectId(pub String);
      60              : 
      61            0 : #[derive(Clone, Debug, serde::Deserialize, Hash, PartialEq, Eq)]
      62              : #[serde(transparent)]
      63              : pub struct BranchId(pub String);
      64              : 
      65              : impl std::error::Error for Error {}
      66              : 
      67              : #[derive(Debug)]
      68              : pub enum ErrorKind {
      69              :     RequestSend(reqwest::Error),
      70              :     BodyRead(reqwest::Error),
      71              :     ResponseStatus(StatusCode),
      72              :     UnexpectedState,
      73              : }
      74              : 
      75              : pub struct CloudAdminApiClient {
      76              :     request_limiter: Semaphore,
      77              :     token: String,
      78              :     base_url: Url,
      79              :     http_client: Client,
      80              : }
      81              : 
      82            0 : #[derive(Debug, serde::Deserialize)]
      83              : struct AdminApiResponse<T> {
      84              :     data: T,
      85              :     total: Option<usize>,
      86              : }
      87              : 
      88            0 : #[derive(Debug, serde::Deserialize)]
      89              : pub struct PageserverData {
      90              :     pub id: u64,
      91              :     pub created_at: DateTime<Utc>,
      92              :     pub updated_at: DateTime<Utc>,
      93              :     pub region_id: String,
      94              :     pub version: i64,
      95              :     pub instance_id: String,
      96              :     pub port: u16,
      97              :     pub http_host: String,
      98              :     pub http_port: u16,
      99              :     pub active: bool,
     100              :     pub projects_count: usize,
     101              :     pub availability_zone_id: String,
     102              : }
     103              : 
     104            0 : #[derive(Debug, Clone, serde::Deserialize)]
     105              : pub struct SafekeeperData {
     106              :     pub id: u64,
     107              :     pub created_at: DateTime<Utc>,
     108              :     pub updated_at: DateTime<Utc>,
     109              :     pub region_id: String,
     110              :     pub version: i64,
     111              :     pub instance_id: String,
     112              :     pub active: bool,
     113              :     pub host: String,
     114              :     pub port: u16,
     115              :     pub projects_count: usize,
     116              :     pub availability_zone_id: String,
     117              : }
     118              : 
     119              : /// For ID fields, the Console API does not always return a value or null.  It will
     120              : /// sometimes return an empty string.  Our native Id type does not consider this acceptable
     121              : /// (nor should it), so we use a wrapper for talking to the Console API.
     122            0 : fn from_nullable_id<'de, D>(deserializer: D) -> Result<TenantId, D::Error>
     123            0 : where
     124            0 :     D: serde::de::Deserializer<'de>,
     125            0 : {
     126            0 :     if deserializer.is_human_readable() {
     127            0 :         let id_str = String::deserialize(deserializer)?;
     128            0 :         if id_str.is_empty() {
     129              :             // This is a bogus value, but for the purposes of the scrubber all that
     130              :             // matters is that it doesn't collide with any real IDs.
     131            0 :             Ok(TenantId::from([0u8; 16]))
     132              :         } else {
     133            0 :             TenantId::from_hex(&id_str).map_err(|e| serde::de::Error::custom(format!("{e}")))
     134              :         }
     135              :     } else {
     136            0 :         let id_arr = <[u8; 16]>::deserialize(deserializer)?;
     137            0 :         Ok(TenantId::from(id_arr))
     138              :     }
     139            0 : }
     140              : 
     141            0 : #[derive(Debug, Clone, serde::Deserialize)]
     142              : pub struct ProjectData {
     143              :     pub id: ProjectId,
     144              :     pub name: String,
     145              :     pub region_id: String,
     146              :     pub platform_id: String,
     147              :     pub user_id: Option<String>,
     148              :     pub pageserver_id: Option<u64>,
     149              :     #[serde(deserialize_with = "from_nullable_id")]
     150              :     pub tenant: TenantId,
     151              :     pub safekeepers: Vec<SafekeeperData>,
     152              :     pub deleted: bool,
     153              :     pub created_at: DateTime<Utc>,
     154              :     pub updated_at: DateTime<Utc>,
     155              :     pub pg_version: u32,
     156              :     pub max_project_size: i64,
     157              :     pub remote_storage_size: u64,
     158              :     pub resident_size: u64,
     159              :     pub synthetic_storage_size: u64,
     160              :     pub compute_time: u64,
     161              :     pub data_transfer: u64,
     162              :     pub data_storage: u64,
     163              :     pub maintenance_set: Option<String>,
     164              : }
     165              : 
     166            0 : #[derive(Debug, Clone, serde::Deserialize)]
     167              : pub struct BranchData {
     168              :     pub id: BranchId,
     169              :     pub created_at: DateTime<Utc>,
     170              :     pub updated_at: DateTime<Utc>,
     171              :     pub name: String,
     172              :     pub project_id: ProjectId,
     173              :     pub timeline_id: TimelineId,
     174              :     #[serde(default)]
     175              :     pub parent_id: Option<BranchId>,
     176              :     #[serde(default)]
     177              :     pub parent_lsn: Option<Lsn>,
     178              :     pub default: bool,
     179              :     pub deleted: bool,
     180              :     pub logical_size: Option<u64>,
     181              :     pub physical_size: Option<u64>,
     182              :     pub written_size: Option<u64>,
     183              : }
     184              : 
     185              : pub trait MaybeDeleted {
     186              :     fn is_deleted(&self) -> bool;
     187              : }
     188              : 
     189              : impl MaybeDeleted for ProjectData {
     190            0 :     fn is_deleted(&self) -> bool {
     191            0 :         self.deleted
     192            0 :     }
     193              : }
     194              : 
     195              : impl MaybeDeleted for BranchData {
     196            0 :     fn is_deleted(&self) -> bool {
     197            0 :         self.deleted
     198            0 :     }
     199              : }
     200              : 
     201              : impl CloudAdminApiClient {
     202            0 :     pub fn new(config: ConsoleConfig) -> Self {
     203            0 :         Self {
     204            0 :             token: config.token,
     205            0 :             base_url: config.base_url,
     206            0 :             request_limiter: Semaphore::new(200),
     207            0 :             http_client: Client::new(), // TODO timeout configs at least
     208            0 :         }
     209            0 :     }
     210              : 
     211            0 :     pub async fn find_tenant_project(
     212            0 :         &self,
     213            0 :         tenant_id: TenantId,
     214            0 :     ) -> Result<Option<ProjectData>, Error> {
     215            0 :         let _permit = self
     216            0 :             .request_limiter
     217            0 :             .acquire()
     218            0 :             .await
     219            0 :             .expect("Semaphore is not closed");
     220              : 
     221            0 :         let response = CloudAdminApiClient::with_retries(
     222            0 :             || async {
     223            0 :                 let response = self
     224            0 :                     .http_client
     225            0 :                     .get(self.append_url("/projects"))
     226            0 :                     .query(&[
     227            0 :                         ("tenant_id", tenant_id.to_string()),
     228            0 :                         ("show_deleted", "true".to_string()),
     229            0 :                     ])
     230            0 :                     .header(header::ACCEPT, "application/json")
     231            0 :                     .bearer_auth(&self.token)
     232            0 :                     .send()
     233            0 :                     .await
     234            0 :                     .map_err(|e| {
     235            0 :                         Error::new(
     236            0 :                             "Find project for tenant".to_string(),
     237            0 :                             ErrorKind::RequestSend(e),
     238            0 :                         )
     239            0 :                     })?;
     240              : 
     241            0 :                 let response: AdminApiResponse<Vec<ProjectData>> =
     242            0 :                     response.json().await.map_err(|e| {
     243            0 :                         Error::new(
     244            0 :                             "Find project for tenant".to_string(),
     245            0 :                             ErrorKind::BodyRead(e),
     246            0 :                         )
     247            0 :                     })?;
     248            0 :                 Ok(response)
     249            0 :             },
     250            0 :             "find_tenant_project",
     251            0 :         )
     252            0 :         .await?;
     253              : 
     254            0 :         match response.data.len() {
     255            0 :             0 => Ok(None),
     256            0 :             1 => Ok(Some(
     257            0 :                 response
     258            0 :                     .data
     259            0 :                     .into_iter()
     260            0 :                     .next()
     261            0 :                     .expect("Should have exactly one element"),
     262            0 :             )),
     263            0 :             too_many => Err(Error::new(
     264            0 :                 format!("Find project for tenant returned {too_many} projects instead of 0 or 1"),
     265            0 :                 ErrorKind::UnexpectedState,
     266            0 :             )),
     267              :         }
     268            0 :     }
     269              : 
     270            0 :     pub async fn list_projects(&self) -> Result<Vec<ProjectData>, Error> {
     271            0 :         let _permit = self
     272            0 :             .request_limiter
     273            0 :             .acquire()
     274            0 :             .await
     275            0 :             .expect("Semaphore is not closed");
     276            0 : 
     277            0 :         let mut pagination_offset = 0;
     278              :         const PAGINATION_LIMIT: usize = 512;
     279            0 :         let mut result: Vec<ProjectData> = Vec::with_capacity(PAGINATION_LIMIT);
     280              :         loop {
     281            0 :             let response_bytes = CloudAdminApiClient::with_retries(
     282            0 :                 || async {
     283            0 :                     let response = self
     284            0 :                         .http_client
     285            0 :                         .get(self.append_url("/projects"))
     286            0 :                         .query(&[
     287            0 :                             ("show_deleted", "false".to_string()),
     288            0 :                             ("limit", format!("{PAGINATION_LIMIT}")),
     289            0 :                             ("offset", format!("{pagination_offset}")),
     290            0 :                         ])
     291            0 :                         .header(header::ACCEPT, "application/json")
     292            0 :                         .bearer_auth(&self.token)
     293            0 :                         .send()
     294            0 :                         .await
     295            0 :                         .map_err(|e| {
     296            0 :                             Error::new(
     297            0 :                                 "List active projects".to_string(),
     298            0 :                                 ErrorKind::RequestSend(e),
     299            0 :                             )
     300            0 :                         })?;
     301              : 
     302            0 :                     response.bytes().await.map_err(|e| {
     303            0 :                         Error::new("List active projects".to_string(), ErrorKind::BodyRead(e))
     304            0 :                     })
     305            0 :                 },
     306            0 :                 "list_projects",
     307            0 :             )
     308            0 :             .await?;
     309              : 
     310            0 :             let decode_result =
     311            0 :                 serde_json::from_slice::<AdminApiResponse<Vec<ProjectData>>>(&response_bytes);
     312              : 
     313            0 :             let mut response = match decode_result {
     314            0 :                 Ok(r) => r,
     315            0 :                 Err(decode) => {
     316            0 :                     tracing::error!(
     317            0 :                         "Failed to decode response body: {}\n{}",
     318            0 :                         decode,
     319            0 :                         String::from_utf8(response_bytes.to_vec()).unwrap()
     320              :                     );
     321            0 :                     panic!("we out");
     322              :                 }
     323              :             };
     324              : 
     325            0 :             pagination_offset += response.data.len();
     326            0 : 
     327            0 :             result.append(&mut response.data);
     328            0 : 
     329            0 :             if pagination_offset >= response.total.unwrap_or(0) {
     330            0 :                 break;
     331            0 :             }
     332              :         }
     333              : 
     334            0 :         Ok(result)
     335            0 :     }
     336              : 
     337            0 :     pub async fn find_timeline_branch(
     338            0 :         &self,
     339            0 :         tenant_id: TenantId,
     340            0 :         timeline_id: TimelineId,
     341            0 :     ) -> Result<Option<BranchData>, Error> {
     342            0 :         let _permit = self
     343            0 :             .request_limiter
     344            0 :             .acquire()
     345            0 :             .await
     346            0 :             .expect("Semaphore is not closed");
     347              : 
     348            0 :         let response = CloudAdminApiClient::with_retries(
     349            0 :             || async {
     350            0 :                 let response = self
     351            0 :                     .http_client
     352            0 :                     .get(self.append_url("/branches"))
     353            0 :                     .query(&[
     354            0 :                         ("timeline_id", timeline_id.to_string()),
     355            0 :                         ("show_deleted", "true".to_string()),
     356            0 :                     ])
     357            0 :                     .header(header::ACCEPT, "application/json")
     358            0 :                     .bearer_auth(&self.token)
     359            0 :                     .send()
     360            0 :                     .await
     361            0 :                     .map_err(|e| {
     362            0 :                         Error::new(
     363            0 :                             "Find branch for timeline".to_string(),
     364            0 :                             ErrorKind::RequestSend(e),
     365            0 :                         )
     366            0 :                     })?;
     367              : 
     368            0 :                 let response: AdminApiResponse<Vec<BranchData>> =
     369            0 :                     response.json().await.map_err(|e| {
     370            0 :                         Error::new(
     371            0 :                             "Find branch for timeline".to_string(),
     372            0 :                             ErrorKind::BodyRead(e),
     373            0 :                         )
     374            0 :                     })?;
     375            0 :                 Ok(response)
     376            0 :             },
     377            0 :             "find_timeline_branch",
     378            0 :         )
     379            0 :         .await?;
     380              : 
     381            0 :         let mut branches: Vec<BranchData> = response.data.into_iter().collect();
     382              :         // Normally timeline_id is unique. However, we do have at least one case
     383              :         // of the same timeline_id in two different projects, apparently after
     384              :         // manual recovery. So always recheck project_id (discovered through
     385              :         // tenant_id).
     386            0 :         let project_data = match self.find_tenant_project(tenant_id).await? {
     387            0 :             Some(pd) => pd,
     388            0 :             None => return Ok(None),
     389              :         };
     390            0 :         branches.retain(|b| b.project_id == project_data.id);
     391            0 :         if branches.len() < 2 {
     392            0 :             Ok(branches.first().cloned())
     393              :         } else {
     394            0 :             Err(Error::new(
     395            0 :                 format!(
     396            0 :                     "Find branch for timeline {}/{} returned {} branches instead of 0 or 1",
     397            0 :                     tenant_id,
     398            0 :                     timeline_id,
     399            0 :                     branches.len()
     400            0 :                 ),
     401            0 :                 ErrorKind::UnexpectedState,
     402            0 :             ))
     403              :         }
     404            0 :     }
     405              : 
     406            0 :     pub async fn list_pageservers(&self) -> Result<Vec<PageserverData>, Error> {
     407            0 :         let _permit = self
     408            0 :             .request_limiter
     409            0 :             .acquire()
     410            0 :             .await
     411            0 :             .expect("Semaphore is not closed");
     412              : 
     413            0 :         let response = self
     414            0 :             .http_client
     415            0 :             .get(self.append_url("/pageservers"))
     416            0 :             .header(header::ACCEPT, "application/json")
     417            0 :             .bearer_auth(&self.token)
     418            0 :             .send()
     419            0 :             .await
     420            0 :             .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::RequestSend(e)))?;
     421              : 
     422            0 :         let response: AdminApiResponse<Vec<PageserverData>> = response
     423            0 :             .json()
     424            0 :             .await
     425            0 :             .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::BodyRead(e)))?;
     426              : 
     427            0 :         Ok(response.data)
     428            0 :     }
     429              : 
     430            0 :     pub async fn list_safekeepers(&self) -> Result<Vec<SafekeeperData>, Error> {
     431            0 :         let _permit = self
     432            0 :             .request_limiter
     433            0 :             .acquire()
     434            0 :             .await
     435            0 :             .expect("Semaphore is not closed");
     436              : 
     437            0 :         let response = self
     438            0 :             .http_client
     439            0 :             .get(self.append_url("/safekeepers"))
     440            0 :             .header(header::ACCEPT, "application/json")
     441            0 :             .bearer_auth(&self.token)
     442            0 :             .send()
     443            0 :             .await
     444            0 :             .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::RequestSend(e)))?;
     445              : 
     446            0 :         let response: AdminApiResponse<Vec<SafekeeperData>> = response
     447            0 :             .json()
     448            0 :             .await
     449            0 :             .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::BodyRead(e)))?;
     450              : 
     451            0 :         Ok(response.data)
     452            0 :     }
     453              : 
     454            0 :     pub async fn projects_for_pageserver(
     455            0 :         &self,
     456            0 :         pageserver_id: u64,
     457            0 :         show_deleted: bool,
     458            0 :     ) -> Result<Vec<ProjectData>, Error> {
     459            0 :         let _permit = self
     460            0 :             .request_limiter
     461            0 :             .acquire()
     462            0 :             .await
     463            0 :             .expect("Semaphore is not closed");
     464              : 
     465            0 :         let response = self
     466            0 :             .http_client
     467            0 :             .get(self.append_url("/projects"))
     468            0 :             .query(&[
     469            0 :                 ("pageserver_id", &pageserver_id.to_string()),
     470            0 :                 ("show_deleted", &show_deleted.to_string()),
     471            0 :             ])
     472            0 :             .header(header::ACCEPT, "application/json")
     473            0 :             .bearer_auth(&self.token)
     474            0 :             .send()
     475            0 :             .await
     476            0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
     477              : 
     478            0 :         let response: AdminApiResponse<Vec<ProjectData>> = response
     479            0 :             .json()
     480            0 :             .await
     481            0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
     482              : 
     483            0 :         Ok(response.data)
     484            0 :     }
     485              : 
     486            0 :     pub async fn project_for_tenant(
     487            0 :         &self,
     488            0 :         tenant_id: TenantId,
     489            0 :         show_deleted: bool,
     490            0 :     ) -> Result<Option<ProjectData>, Error> {
     491            0 :         let _permit = self
     492            0 :             .request_limiter
     493            0 :             .acquire()
     494            0 :             .await
     495            0 :             .expect("Semaphore is not closed");
     496              : 
     497            0 :         let response = self
     498            0 :             .http_client
     499            0 :             .get(self.append_url("/projects"))
     500            0 :             .query(&[
     501            0 :                 ("search", &tenant_id.to_string()),
     502            0 :                 ("show_deleted", &show_deleted.to_string()),
     503            0 :             ])
     504            0 :             .header(header::ACCEPT, "application/json")
     505            0 :             .bearer_auth(&self.token)
     506            0 :             .send()
     507            0 :             .await
     508            0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
     509              : 
     510            0 :         let response: AdminApiResponse<Vec<ProjectData>> = response
     511            0 :             .json()
     512            0 :             .await
     513            0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
     514              : 
     515            0 :         match response.data.as_slice() {
     516            0 :             [] => Ok(None),
     517            0 :             [_single] => Ok(Some(response.data.into_iter().next().unwrap())),
     518            0 :             multiple => Err(Error::new(
     519            0 :                 format!("Got more than one project for tenant {tenant_id} : {multiple:?}"),
     520            0 :                 ErrorKind::UnexpectedState,
     521            0 :             )),
     522              :         }
     523            0 :     }
     524              : 
     525            0 :     pub async fn branches_for_project(
     526            0 :         &self,
     527            0 :         project_id: &ProjectId,
     528            0 :         show_deleted: bool,
     529            0 :     ) -> Result<Vec<BranchData>, Error> {
     530            0 :         let _permit = self
     531            0 :             .request_limiter
     532            0 :             .acquire()
     533            0 :             .await
     534            0 :             .expect("Semaphore is not closed");
     535              : 
     536            0 :         let response = self
     537            0 :             .http_client
     538            0 :             .get(self.append_url("/branches"))
     539            0 :             .query(&[
     540            0 :                 ("project_id", &project_id.0),
     541            0 :                 ("show_deleted", &show_deleted.to_string()),
     542            0 :             ])
     543            0 :             .header(header::ACCEPT, "application/json")
     544            0 :             .bearer_auth(&self.token)
     545            0 :             .send()
     546            0 :             .await
     547            0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
     548              : 
     549            0 :         let response: AdminApiResponse<Vec<BranchData>> = response
     550            0 :             .json()
     551            0 :             .await
     552            0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
     553              : 
     554            0 :         Ok(response.data)
     555            0 :     }
     556              : 
     557            0 :     fn append_url(&self, subpath: &str) -> Url {
     558            0 :         // TODO fugly, but `.join` does not work when called
     559            0 :         (self.base_url.to_string() + subpath)
     560            0 :             .parse()
     561            0 :             .unwrap_or_else(|e| panic!("Could not append {subpath} to base url: {e}"))
     562            0 :     }
     563              : 
     564            0 :     async fn with_retries<T, O, F>(op: O, description: &str) -> Result<T, Error>
     565            0 :     where
     566            0 :         O: FnMut() -> F,
     567            0 :         F: Future<Output = Result<T, Error>>,
     568            0 :     {
     569            0 :         let cancel = CancellationToken::new(); // not really used
     570            0 :         backoff::retry(op, |_| false, 1, 20, description, &cancel)
     571            0 :             .await
     572            0 :             .expect("cancellations are disabled")
     573            0 :     }
     574              : }
        

Generated by: LCOV version 2.1-beta