LCOV - differential code coverage report
Current view: top level - s3_scrubber/src - cloud_admin_api.rs (source / functions) Coverage Total Hit UBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 0.0 % 263 0 263
Current Date: 2023-10-19 02:04:12 Functions: 0.0 % 116 0 116
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : #![allow(unused)]
       2                 : 
       3                 : use chrono::{DateTime, Utc};
       4                 : use reqwest::{header, Client, Url};
       5                 : use tokio::sync::Semaphore;
       6                 : 
       7                 : use utils::id::{TenantId, TimelineId};
       8                 : use utils::lsn::Lsn;
       9                 : 
      10 UBC           0 : #[derive(Debug)]
      11                 : pub struct Error {
      12                 :     context: String,
      13                 :     kind: ErrorKind,
      14                 : }
      15                 : 
      16                 : impl Error {
      17               0 :     fn new(context: String, kind: ErrorKind) -> Self {
      18               0 :         Self { context, kind }
      19               0 :     }
      20                 : }
      21                 : 
      22                 : impl std::fmt::Display for Error {
      23               0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      24               0 :         match &self.kind {
      25               0 :             ErrorKind::RequestSend(e) => write!(
      26               0 :                 f,
      27               0 :                 "Failed to send a request. Context: {}, error: {}",
      28               0 :                 self.context, e
      29               0 :             ),
      30               0 :             ErrorKind::BodyRead(e) => {
      31               0 :                 write!(
      32               0 :                     f,
      33               0 :                     "Failed to read a request body. Context: {}, error: {}",
      34               0 :                     self.context, e
      35               0 :                 )
      36                 :             }
      37               0 :             ErrorKind::UnexpectedState => write!(f, "Unexpected state: {}", self.context),
      38                 :         }
      39               0 :     }
      40                 : }
      41                 : 
      42               0 : #[derive(Debug, Clone, serde::Deserialize, Hash, PartialEq, Eq)]
      43                 : #[serde(transparent)]
      44                 : pub struct ProjectId(pub String);
      45                 : 
      46               0 : #[derive(Clone, Debug, serde::Deserialize, Hash, PartialEq, Eq)]
      47                 : #[serde(transparent)]
      48                 : pub struct BranchId(pub String);
      49                 : 
      50                 : impl std::error::Error for Error {}
      51                 : 
      52               0 : #[derive(Debug)]
      53                 : pub enum ErrorKind {
      54                 :     RequestSend(reqwest::Error),
      55                 :     BodyRead(reqwest::Error),
      56                 :     UnexpectedState,
      57                 : }
      58                 : 
      59                 : pub struct CloudAdminApiClient {
      60                 :     request_limiter: Semaphore,
      61                 :     token: String,
      62                 :     base_url: Url,
      63                 :     http_client: Client,
      64                 : }
      65                 : 
      66               0 : #[derive(Debug, serde::Deserialize)]
      67                 : struct AdminApiResponse<T> {
      68                 :     data: T,
      69                 :     total: Option<usize>,
      70                 : }
      71                 : 
      72               0 : #[derive(Debug, serde::Deserialize)]
      73                 : pub struct PageserverData {
      74                 :     pub id: u64,
      75                 :     pub created_at: DateTime<Utc>,
      76                 :     pub updated_at: DateTime<Utc>,
      77                 :     pub region_id: String,
      78                 :     pub version: i64,
      79                 :     pub instance_id: String,
      80                 :     pub port: u16,
      81                 :     pub http_host: String,
      82                 :     pub http_port: u16,
      83                 :     pub active: bool,
      84                 :     pub projects_count: usize,
      85                 :     pub availability_zone_id: String,
      86                 : }
      87                 : 
      88               0 : #[derive(Debug, Clone, serde::Deserialize)]
      89                 : pub struct SafekeeperData {
      90                 :     pub id: u64,
      91                 :     pub created_at: DateTime<Utc>,
      92                 :     pub updated_at: DateTime<Utc>,
      93                 :     pub region_id: String,
      94                 :     pub version: i64,
      95                 :     pub instance_id: String,
      96                 :     pub active: bool,
      97                 :     pub host: String,
      98                 :     pub port: u16,
      99                 :     pub projects_count: usize,
     100                 :     pub availability_zone_id: String,
     101                 : }
     102                 : 
     103                 : #[serde_with::serde_as]
     104               0 : #[derive(Debug, Clone, serde::Deserialize)]
     105                 : pub struct ProjectData {
     106                 :     pub id: ProjectId,
     107                 :     pub name: String,
     108                 :     pub region_id: String,
     109                 :     pub platform_id: String,
     110                 :     pub user_id: String,
     111                 :     pub pageserver_id: u64,
     112                 :     #[serde_as(as = "serde_with::DisplayFromStr")]
     113                 :     pub tenant: TenantId,
     114                 :     pub safekeepers: Vec<SafekeeperData>,
     115                 :     pub deleted: bool,
     116                 :     pub created_at: DateTime<Utc>,
     117                 :     pub updated_at: DateTime<Utc>,
     118                 :     pub pg_version: u32,
     119                 :     pub max_project_size: u64,
     120                 :     pub remote_storage_size: u64,
     121                 :     pub resident_size: u64,
     122                 :     pub synthetic_storage_size: u64,
     123                 :     pub compute_time: u64,
     124                 :     pub data_transfer: u64,
     125                 :     pub data_storage: u64,
     126                 :     pub maintenance_set: Option<String>,
     127                 : }
     128                 : 
     129                 : #[serde_with::serde_as]
     130               0 : #[derive(Debug, serde::Deserialize)]
     131                 : pub struct BranchData {
     132                 :     pub id: BranchId,
     133                 :     pub created_at: DateTime<Utc>,
     134                 :     pub updated_at: DateTime<Utc>,
     135                 :     pub name: String,
     136                 :     pub project_id: ProjectId,
     137                 :     #[serde_as(as = "serde_with::DisplayFromStr")]
     138                 :     pub timeline_id: TimelineId,
     139                 :     #[serde(default)]
     140                 :     pub parent_id: Option<BranchId>,
     141                 :     #[serde(default)]
     142                 :     #[serde_as(as = "Option<serde_with::DisplayFromStr>")]
     143                 :     pub parent_lsn: Option<Lsn>,
     144                 :     pub default: bool,
     145                 :     pub deleted: bool,
     146                 :     pub logical_size: Option<u64>,
     147                 :     pub physical_size: Option<u64>,
     148                 :     pub written_size: Option<u64>,
     149                 : }
     150                 : 
     151                 : impl CloudAdminApiClient {
     152               0 :     pub fn new(token: String, base_url: Url) -> Self {
     153               0 :         Self {
     154               0 :             token,
     155               0 :             base_url,
     156               0 :             request_limiter: Semaphore::new(200),
     157               0 :             http_client: Client::new(), // TODO timeout configs at least
     158               0 :         }
     159               0 :     }
     160                 : 
     161               0 :     pub async fn find_tenant_project(
     162               0 :         &self,
     163               0 :         tenant_id: TenantId,
     164               0 :     ) -> Result<Option<ProjectData>, Error> {
     165               0 :         let _permit = self
     166               0 :             .request_limiter
     167               0 :             .acquire()
     168               0 :             .await
     169               0 :             .expect("Semaphore is not closed");
     170                 : 
     171               0 :         let response = self
     172               0 :             .http_client
     173               0 :             .get(self.append_url("/projects"))
     174               0 :             .query(&[
     175               0 :                 ("tenant_id", tenant_id.to_string()),
     176               0 :                 ("show_deleted", "true".to_string()),
     177               0 :             ])
     178               0 :             .header(header::ACCEPT, "application/json")
     179               0 :             .bearer_auth(&self.token)
     180               0 :             .send()
     181               0 :             .await
     182               0 :             .map_err(|e| {
     183               0 :                 Error::new(
     184               0 :                     "Find project for tenant".to_string(),
     185               0 :                     ErrorKind::RequestSend(e),
     186               0 :                 )
     187               0 :             })?;
     188                 : 
     189               0 :         let response: AdminApiResponse<Vec<ProjectData>> = response.json().await.map_err(|e| {
     190               0 :             Error::new(
     191               0 :                 "Find project for tenant".to_string(),
     192               0 :                 ErrorKind::BodyRead(e),
     193               0 :             )
     194               0 :         })?;
     195               0 :         match response.data.len() {
     196               0 :             0 => Ok(None),
     197               0 :             1 => Ok(Some(
     198               0 :                 response
     199               0 :                     .data
     200               0 :                     .into_iter()
     201               0 :                     .next()
     202               0 :                     .expect("Should have exactly one element"),
     203               0 :             )),
     204               0 :             too_many => Err(Error::new(
     205               0 :                 format!("Find project for tenant returned {too_many} projects instead of 0 or 1"),
     206               0 :                 ErrorKind::UnexpectedState,
     207               0 :             )),
     208                 :         }
     209               0 :     }
     210                 : 
     211               0 :     pub async fn find_timeline_branch(
     212               0 :         &self,
     213               0 :         timeline_id: TimelineId,
     214               0 :     ) -> Result<Option<BranchData>, Error> {
     215               0 :         let _permit = self
     216               0 :             .request_limiter
     217               0 :             .acquire()
     218               0 :             .await
     219               0 :             .expect("Semaphore is not closed");
     220                 : 
     221               0 :         let response = self
     222               0 :             .http_client
     223               0 :             .get(self.append_url("/branches"))
     224               0 :             .query(&[
     225               0 :                 ("timeline_id", timeline_id.to_string()),
     226               0 :                 ("show_deleted", "true".to_string()),
     227               0 :             ])
     228               0 :             .header(header::ACCEPT, "application/json")
     229               0 :             .bearer_auth(&self.token)
     230               0 :             .send()
     231               0 :             .await
     232               0 :             .map_err(|e| {
     233               0 :                 Error::new(
     234               0 :                     "Find branch for timeline".to_string(),
     235               0 :                     ErrorKind::RequestSend(e),
     236               0 :                 )
     237               0 :             })?;
     238                 : 
     239               0 :         let response: AdminApiResponse<Vec<BranchData>> = response.json().await.map_err(|e| {
     240               0 :             Error::new(
     241               0 :                 "Find branch for timeline".to_string(),
     242               0 :                 ErrorKind::BodyRead(e),
     243               0 :             )
     244               0 :         })?;
     245               0 :         match response.data.len() {
     246               0 :             0 => Ok(None),
     247               0 :             1 => Ok(Some(
     248               0 :                 response
     249               0 :                     .data
     250               0 :                     .into_iter()
     251               0 :                     .next()
     252               0 :                     .expect("Should have exactly one element"),
     253               0 :             )),
     254               0 :             too_many => Err(Error::new(
     255               0 :                 format!("Find branch for timeline returned {too_many} branches instead of 0 or 1"),
     256               0 :                 ErrorKind::UnexpectedState,
     257               0 :             )),
     258                 :         }
     259               0 :     }
     260                 : 
     261               0 :     pub async fn list_pageservers(&self) -> Result<Vec<PageserverData>, Error> {
     262               0 :         let _permit = self
     263               0 :             .request_limiter
     264               0 :             .acquire()
     265               0 :             .await
     266               0 :             .expect("Semaphore is not closed");
     267                 : 
     268               0 :         let response = self
     269               0 :             .http_client
     270               0 :             .get(self.append_url("/pageservers"))
     271               0 :             .header(header::ACCEPT, "application/json")
     272               0 :             .bearer_auth(&self.token)
     273               0 :             .send()
     274               0 :             .await
     275               0 :             .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::RequestSend(e)))?;
     276                 : 
     277               0 :         let response: AdminApiResponse<Vec<PageserverData>> = response
     278               0 :             .json()
     279               0 :             .await
     280               0 :             .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::BodyRead(e)))?;
     281                 : 
     282               0 :         Ok(response.data)
     283               0 :     }
     284                 : 
     285               0 :     pub async fn list_safekeepers(&self) -> Result<Vec<SafekeeperData>, Error> {
     286               0 :         let _permit = self
     287               0 :             .request_limiter
     288               0 :             .acquire()
     289               0 :             .await
     290               0 :             .expect("Semaphore is not closed");
     291                 : 
     292               0 :         let response = self
     293               0 :             .http_client
     294               0 :             .get(self.append_url("/safekeepers"))
     295               0 :             .header(header::ACCEPT, "application/json")
     296               0 :             .bearer_auth(&self.token)
     297               0 :             .send()
     298               0 :             .await
     299               0 :             .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::RequestSend(e)))?;
     300                 : 
     301               0 :         let response: AdminApiResponse<Vec<SafekeeperData>> = response
     302               0 :             .json()
     303               0 :             .await
     304               0 :             .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::BodyRead(e)))?;
     305                 : 
     306               0 :         Ok(response.data)
     307               0 :     }
     308                 : 
     309               0 :     pub async fn projects_for_pageserver(
     310               0 :         &self,
     311               0 :         pageserver_id: u64,
     312               0 :         show_deleted: bool,
     313               0 :     ) -> Result<Vec<ProjectData>, Error> {
     314               0 :         let _permit = self
     315               0 :             .request_limiter
     316               0 :             .acquire()
     317               0 :             .await
     318               0 :             .expect("Semaphore is not closed");
     319                 : 
     320               0 :         let response = self
     321               0 :             .http_client
     322               0 :             .get(self.append_url("/projects"))
     323               0 :             .query(&[
     324               0 :                 ("pageserver_id", &pageserver_id.to_string()),
     325               0 :                 ("show_deleted", &show_deleted.to_string()),
     326               0 :             ])
     327               0 :             .header(header::ACCEPT, "application/json")
     328               0 :             .bearer_auth(&self.token)
     329               0 :             .send()
     330               0 :             .await
     331               0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
     332                 : 
     333               0 :         let response: AdminApiResponse<Vec<ProjectData>> = response
     334               0 :             .json()
     335               0 :             .await
     336               0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
     337                 : 
     338               0 :         Ok(response.data)
     339               0 :     }
     340                 : 
     341               0 :     pub async fn project_for_tenant(
     342               0 :         &self,
     343               0 :         tenant_id: TenantId,
     344               0 :         show_deleted: bool,
     345               0 :     ) -> Result<Option<ProjectData>, Error> {
     346               0 :         let _permit = self
     347               0 :             .request_limiter
     348               0 :             .acquire()
     349               0 :             .await
     350               0 :             .expect("Semaphore is not closed");
     351                 : 
     352               0 :         let response = self
     353               0 :             .http_client
     354               0 :             .get(self.append_url("/projects"))
     355               0 :             .query(&[
     356               0 :                 ("search", &tenant_id.to_string()),
     357               0 :                 ("show_deleted", &show_deleted.to_string()),
     358               0 :             ])
     359               0 :             .header(header::ACCEPT, "application/json")
     360               0 :             .bearer_auth(&self.token)
     361               0 :             .send()
     362               0 :             .await
     363               0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
     364                 : 
     365               0 :         let response: AdminApiResponse<Vec<ProjectData>> = response
     366               0 :             .json()
     367               0 :             .await
     368               0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
     369                 : 
     370               0 :         match response.data.as_slice() {
     371               0 :             [] => Ok(None),
     372               0 :             [_single] => Ok(Some(response.data.into_iter().next().unwrap())),
     373               0 :             multiple => Err(Error::new(
     374               0 :                 format!("Got more than one project for tenant {tenant_id} : {multiple:?}"),
     375               0 :                 ErrorKind::UnexpectedState,
     376               0 :             )),
     377                 :         }
     378               0 :     }
     379                 : 
     380               0 :     pub async fn branches_for_project(
     381               0 :         &self,
     382               0 :         project_id: &ProjectId,
     383               0 :         show_deleted: bool,
     384               0 :     ) -> Result<Vec<BranchData>, Error> {
     385               0 :         let _permit = self
     386               0 :             .request_limiter
     387               0 :             .acquire()
     388               0 :             .await
     389               0 :             .expect("Semaphore is not closed");
     390                 : 
     391               0 :         let response = self
     392               0 :             .http_client
     393               0 :             .get(self.append_url("/branches"))
     394               0 :             .query(&[
     395               0 :                 ("project_id", &project_id.0),
     396               0 :                 ("show_deleted", &show_deleted.to_string()),
     397               0 :             ])
     398               0 :             .header(header::ACCEPT, "application/json")
     399               0 :             .bearer_auth(&self.token)
     400               0 :             .send()
     401               0 :             .await
     402               0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
     403                 : 
     404               0 :         let response: AdminApiResponse<Vec<BranchData>> = response
     405               0 :             .json()
     406               0 :             .await
     407               0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
     408                 : 
     409               0 :         Ok(response.data)
     410               0 :     }
     411                 : 
     412               0 :     fn append_url(&self, subpath: &str) -> Url {
     413               0 :         // TODO fugly, but `.join` does not work when called
     414               0 :         (self.base_url.to_string() + subpath)
     415               0 :             .parse()
     416               0 :             .unwrap_or_else(|e| panic!("Could not append {subpath} to base url: {e}"))
     417               0 :     }
     418                 : }
        

Generated by: LCOV version 2.1-beta