LCOV - code coverage report
Current view: top level - s3_scrubber/src - cloud_admin_api.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 0.0 % 263 0
Test Date: 2023-09-06 10:18:01 Functions: 0.0 % 116 0

            Line data    Source code
       1              : #![allow(unused)]
       2              : 
       3              : use chrono::{DateTime, Utc};
       4              : use reqwest::{header, Client, Url};
       5              : use tokio::sync::Semaphore;
       6              : 
       7              : use utils::id::{TenantId, TimelineId};
       8              : use utils::lsn::Lsn;
       9              : 
      10            0 : #[derive(Debug)]
      11              : pub struct Error {
      12              :     context: String,
      13              :     kind: ErrorKind,
      14              : }
      15              : 
      16              : impl Error {
      17            0 :     fn new(context: String, kind: ErrorKind) -> Self {
      18            0 :         Self { context, kind }
      19            0 :     }
      20              : }
      21              : 
      22              : impl std::fmt::Display for Error {
      23            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      24            0 :         match &self.kind {
      25            0 :             ErrorKind::RequestSend(e) => write!(
      26            0 :                 f,
      27            0 :                 "Failed to send a request. Context: {}, error: {}",
      28            0 :                 self.context, e
      29            0 :             ),
      30            0 :             ErrorKind::BodyRead(e) => {
      31            0 :                 write!(
      32            0 :                     f,
      33            0 :                     "Failed to read a request body. Context: {}, error: {}",
      34            0 :                     self.context, e
      35            0 :                 )
      36              :             }
      37            0 :             ErrorKind::UnexpectedState => write!(f, "Unexpected state: {}", self.context),
      38              :         }
      39            0 :     }
      40              : }
      41              : 
      42            0 : #[derive(Debug, Clone, serde::Deserialize, Hash, PartialEq, Eq)]
      43              : #[serde(transparent)]
      44              : pub struct ProjectId(pub String);
      45              : 
      46            0 : #[derive(Clone, Debug, serde::Deserialize, Hash, PartialEq, Eq)]
      47              : #[serde(transparent)]
      48              : pub struct BranchId(pub String);
      49              : 
      50              : impl std::error::Error for Error {}
      51              : 
      52            0 : #[derive(Debug)]
      53              : pub enum ErrorKind {
      54              :     RequestSend(reqwest::Error),
      55              :     BodyRead(reqwest::Error),
      56              :     UnexpectedState,
      57              : }
      58              : 
      59              : pub struct CloudAdminApiClient {
      60              :     request_limiter: Semaphore,
      61              :     token: String,
      62              :     base_url: Url,
      63              :     http_client: Client,
      64              : }
      65              : 
      66            0 : #[derive(Debug, serde::Deserialize)]
      67              : struct AdminApiResponse<T> {
      68              :     data: T,
      69              :     total: Option<usize>,
      70              : }
      71              : 
      72            0 : #[derive(Debug, serde::Deserialize)]
      73              : pub struct PageserverData {
      74              :     pub id: u64,
      75              :     pub created_at: DateTime<Utc>,
      76              :     pub updated_at: DateTime<Utc>,
      77              :     pub region_id: String,
      78              :     pub version: i64,
      79              :     pub instance_id: String,
      80              :     pub port: u16,
      81              :     pub http_host: String,
      82              :     pub http_port: u16,
      83              :     pub active: bool,
      84              :     pub projects_count: usize,
      85              :     pub availability_zone_id: String,
      86              : }
      87              : 
      88            0 : #[derive(Debug, Clone, serde::Deserialize)]
      89              : pub struct SafekeeperData {
      90              :     pub id: u64,
      91              :     pub created_at: DateTime<Utc>,
      92              :     pub updated_at: DateTime<Utc>,
      93              :     pub region_id: String,
      94              :     pub version: i64,
      95              :     pub instance_id: String,
      96              :     pub active: bool,
      97              :     pub host: String,
      98              :     pub port: u16,
      99              :     pub projects_count: usize,
     100              :     pub availability_zone_id: String,
     101              : }
     102              : 
     103              : #[serde_with::serde_as]
     104            0 : #[derive(Debug, Clone, serde::Deserialize)]
     105              : pub struct ProjectData {
     106              :     pub id: ProjectId,
     107              :     pub name: String,
     108              :     pub region_id: String,
     109              :     pub platform_id: String,
     110              :     pub user_id: String,
     111              :     pub pageserver_id: u64,
     112              :     #[serde_as(as = "serde_with::DisplayFromStr")]
     113              :     pub tenant: TenantId,
     114              :     pub safekeepers: Vec<SafekeeperData>,
     115              :     pub deleted: bool,
     116              :     pub created_at: DateTime<Utc>,
     117              :     pub updated_at: DateTime<Utc>,
     118              :     pub pg_version: u32,
     119              :     pub max_project_size: u64,
     120              :     pub remote_storage_size: u64,
     121              :     pub resident_size: u64,
     122              :     pub synthetic_storage_size: u64,
     123              :     pub compute_time: u64,
     124              :     pub data_transfer: u64,
     125              :     pub data_storage: u64,
     126              :     pub maintenance_set: Option<String>,
     127              : }
     128              : 
     129              : #[serde_with::serde_as]
     130            0 : #[derive(Debug, serde::Deserialize)]
     131              : pub struct BranchData {
     132              :     pub id: BranchId,
     133              :     pub created_at: DateTime<Utc>,
     134              :     pub updated_at: DateTime<Utc>,
     135              :     pub name: String,
     136              :     pub project_id: ProjectId,
     137              :     #[serde_as(as = "serde_with::DisplayFromStr")]
     138              :     pub timeline_id: TimelineId,
     139              :     #[serde(default)]
     140              :     pub parent_id: Option<BranchId>,
     141              :     #[serde(default)]
     142              :     #[serde_as(as = "Option<serde_with::DisplayFromStr>")]
     143              :     pub parent_lsn: Option<Lsn>,
     144              :     pub default: bool,
     145              :     pub deleted: bool,
     146              :     pub logical_size: Option<u64>,
     147              :     pub physical_size: Option<u64>,
     148              :     pub written_size: Option<u64>,
     149              : }
     150              : 
     151              : impl CloudAdminApiClient {
     152            0 :     pub fn new(token: String, base_url: Url) -> Self {
     153            0 :         Self {
     154            0 :             token,
     155            0 :             base_url,
     156            0 :             request_limiter: Semaphore::new(200),
     157            0 :             http_client: Client::new(), // TODO timeout configs at least
     158            0 :         }
     159            0 :     }
     160              : 
     161            0 :     pub async fn find_tenant_project(
     162            0 :         &self,
     163            0 :         tenant_id: TenantId,
     164            0 :     ) -> Result<Option<ProjectData>, Error> {
     165            0 :         let _permit = self
     166            0 :             .request_limiter
     167            0 :             .acquire()
     168            0 :             .await
     169            0 :             .expect("Semaphore is not closed");
     170              : 
     171            0 :         let response = self
     172            0 :             .http_client
     173            0 :             .get(self.append_url("/projects"))
     174            0 :             .query(&[
     175            0 :                 ("tenant_id", tenant_id.to_string()),
     176            0 :                 ("show_deleted", "true".to_string()),
     177            0 :             ])
     178            0 :             .header(header::ACCEPT, "application/json")
     179            0 :             .bearer_auth(&self.token)
     180            0 :             .send()
     181            0 :             .await
     182            0 :             .map_err(|e| {
     183            0 :                 Error::new(
     184            0 :                     "Find project for tenant".to_string(),
     185            0 :                     ErrorKind::RequestSend(e),
     186            0 :                 )
     187            0 :             })?;
     188              : 
     189            0 :         let response: AdminApiResponse<Vec<ProjectData>> = response.json().await.map_err(|e| {
     190            0 :             Error::new(
     191            0 :                 "Find project for tenant".to_string(),
     192            0 :                 ErrorKind::BodyRead(e),
     193            0 :             )
     194            0 :         })?;
     195            0 :         match response.data.len() {
     196            0 :             0 => Ok(None),
     197            0 :             1 => Ok(Some(
     198            0 :                 response
     199            0 :                     .data
     200            0 :                     .into_iter()
     201            0 :                     .next()
     202            0 :                     .expect("Should have exactly one element"),
     203            0 :             )),
     204            0 :             too_many => Err(Error::new(
     205            0 :                 format!("Find project for tenant returned {too_many} projects instead of 0 or 1"),
     206            0 :                 ErrorKind::UnexpectedState,
     207            0 :             )),
     208              :         }
     209            0 :     }
     210              : 
     211            0 :     pub async fn find_timeline_branch(
     212            0 :         &self,
     213            0 :         timeline_id: TimelineId,
     214            0 :     ) -> Result<Option<BranchData>, Error> {
     215            0 :         let _permit = self
     216            0 :             .request_limiter
     217            0 :             .acquire()
     218            0 :             .await
     219            0 :             .expect("Semaphore is not closed");
     220              : 
     221            0 :         let response = self
     222            0 :             .http_client
     223            0 :             .get(self.append_url("/branches"))
     224            0 :             .query(&[
     225            0 :                 ("timeline_id", timeline_id.to_string()),
     226            0 :                 ("show_deleted", "true".to_string()),
     227            0 :             ])
     228            0 :             .header(header::ACCEPT, "application/json")
     229            0 :             .bearer_auth(&self.token)
     230            0 :             .send()
     231            0 :             .await
     232            0 :             .map_err(|e| {
     233            0 :                 Error::new(
     234            0 :                     "Find branch for timeline".to_string(),
     235            0 :                     ErrorKind::RequestSend(e),
     236            0 :                 )
     237            0 :             })?;
     238              : 
     239            0 :         let response: AdminApiResponse<Vec<BranchData>> = response.json().await.map_err(|e| {
     240            0 :             Error::new(
     241            0 :                 "Find branch for timeline".to_string(),
     242            0 :                 ErrorKind::BodyRead(e),
     243            0 :             )
     244            0 :         })?;
     245            0 :         match response.data.len() {
     246            0 :             0 => Ok(None),
     247            0 :             1 => Ok(Some(
     248            0 :                 response
     249            0 :                     .data
     250            0 :                     .into_iter()
     251            0 :                     .next()
     252            0 :                     .expect("Should have exactly one element"),
     253            0 :             )),
     254            0 :             too_many => Err(Error::new(
     255            0 :                 format!("Find branch for timeline returned {too_many} branches instead of 0 or 1"),
     256            0 :                 ErrorKind::UnexpectedState,
     257            0 :             )),
     258              :         }
     259            0 :     }
     260              : 
     261            0 :     pub async fn list_pageservers(&self) -> Result<Vec<PageserverData>, Error> {
     262            0 :         let _permit = self
     263            0 :             .request_limiter
     264            0 :             .acquire()
     265            0 :             .await
     266            0 :             .expect("Semaphore is not closed");
     267              : 
     268            0 :         let response = self
     269            0 :             .http_client
     270            0 :             .get(self.append_url("/pageservers"))
     271            0 :             .header(header::ACCEPT, "application/json")
     272            0 :             .bearer_auth(&self.token)
     273            0 :             .send()
     274            0 :             .await
     275            0 :             .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::RequestSend(e)))?;
     276              : 
     277            0 :         let response: AdminApiResponse<Vec<PageserverData>> = response
     278            0 :             .json()
     279            0 :             .await
     280            0 :             .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::BodyRead(e)))?;
     281              : 
     282            0 :         Ok(response.data)
     283            0 :     }
     284              : 
     285            0 :     pub async fn list_safekeepers(&self) -> Result<Vec<SafekeeperData>, Error> {
     286            0 :         let _permit = self
     287            0 :             .request_limiter
     288            0 :             .acquire()
     289            0 :             .await
     290            0 :             .expect("Semaphore is not closed");
     291              : 
     292            0 :         let response = self
     293            0 :             .http_client
     294            0 :             .get(self.append_url("/safekeepers"))
     295            0 :             .header(header::ACCEPT, "application/json")
     296            0 :             .bearer_auth(&self.token)
     297            0 :             .send()
     298            0 :             .await
     299            0 :             .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::RequestSend(e)))?;
     300              : 
     301            0 :         let response: AdminApiResponse<Vec<SafekeeperData>> = response
     302            0 :             .json()
     303            0 :             .await
     304            0 :             .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::BodyRead(e)))?;
     305              : 
     306            0 :         Ok(response.data)
     307            0 :     }
     308              : 
     309            0 :     pub async fn projects_for_pageserver(
     310            0 :         &self,
     311            0 :         pageserver_id: u64,
     312            0 :         show_deleted: bool,
     313            0 :     ) -> Result<Vec<ProjectData>, Error> {
     314            0 :         let _permit = self
     315            0 :             .request_limiter
     316            0 :             .acquire()
     317            0 :             .await
     318            0 :             .expect("Semaphore is not closed");
     319              : 
     320            0 :         let response = self
     321            0 :             .http_client
     322            0 :             .get(self.append_url("/projects"))
     323            0 :             .query(&[
     324            0 :                 ("pageserver_id", &pageserver_id.to_string()),
     325            0 :                 ("show_deleted", &show_deleted.to_string()),
     326            0 :             ])
     327            0 :             .header(header::ACCEPT, "application/json")
     328            0 :             .bearer_auth(&self.token)
     329            0 :             .send()
     330            0 :             .await
     331            0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
     332              : 
     333            0 :         let response: AdminApiResponse<Vec<ProjectData>> = response
     334            0 :             .json()
     335            0 :             .await
     336            0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
     337              : 
     338            0 :         Ok(response.data)
     339            0 :     }
     340              : 
     341            0 :     pub async fn project_for_tenant(
     342            0 :         &self,
     343            0 :         tenant_id: TenantId,
     344            0 :         show_deleted: bool,
     345            0 :     ) -> Result<Option<ProjectData>, Error> {
     346            0 :         let _permit = self
     347            0 :             .request_limiter
     348            0 :             .acquire()
     349            0 :             .await
     350            0 :             .expect("Semaphore is not closed");
     351              : 
     352            0 :         let response = self
     353            0 :             .http_client
     354            0 :             .get(self.append_url("/projects"))
     355            0 :             .query(&[
     356            0 :                 ("search", &tenant_id.to_string()),
     357            0 :                 ("show_deleted", &show_deleted.to_string()),
     358            0 :             ])
     359            0 :             .header(header::ACCEPT, "application/json")
     360            0 :             .bearer_auth(&self.token)
     361            0 :             .send()
     362            0 :             .await
     363            0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
     364              : 
     365            0 :         let response: AdminApiResponse<Vec<ProjectData>> = response
     366            0 :             .json()
     367            0 :             .await
     368            0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
     369              : 
     370            0 :         match response.data.as_slice() {
     371            0 :             [] => Ok(None),
     372            0 :             [_single] => Ok(Some(response.data.into_iter().next().unwrap())),
     373            0 :             multiple => Err(Error::new(
     374            0 :                 format!("Got more than one project for tenant {tenant_id} : {multiple:?}"),
     375            0 :                 ErrorKind::UnexpectedState,
     376            0 :             )),
     377              :         }
     378            0 :     }
     379              : 
     380            0 :     pub async fn branches_for_project(
     381            0 :         &self,
     382            0 :         project_id: &ProjectId,
     383            0 :         show_deleted: bool,
     384            0 :     ) -> Result<Vec<BranchData>, Error> {
     385            0 :         let _permit = self
     386            0 :             .request_limiter
     387            0 :             .acquire()
     388            0 :             .await
     389            0 :             .expect("Semaphore is not closed");
     390              : 
     391            0 :         let response = self
     392            0 :             .http_client
     393            0 :             .get(self.append_url("/branches"))
     394            0 :             .query(&[
     395            0 :                 ("project_id", &project_id.0),
     396            0 :                 ("show_deleted", &show_deleted.to_string()),
     397            0 :             ])
     398            0 :             .header(header::ACCEPT, "application/json")
     399            0 :             .bearer_auth(&self.token)
     400            0 :             .send()
     401            0 :             .await
     402            0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
     403              : 
     404            0 :         let response: AdminApiResponse<Vec<BranchData>> = response
     405            0 :             .json()
     406            0 :             .await
     407            0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
     408              : 
     409            0 :         Ok(response.data)
     410            0 :     }
     411              : 
     412            0 :     fn append_url(&self, subpath: &str) -> Url {
     413            0 :         // TODO fugly, but `.join` does not work when called
     414            0 :         (self.base_url.to_string() + subpath)
     415            0 :             .parse()
     416            0 :             .unwrap_or_else(|e| panic!("Could not append {subpath} to base url: {e}"))
     417            0 :     }
     418              : }
        

Generated by: LCOV version 2.1-beta