LCOV - differential code coverage report
Current view: top level - s3_scrubber/src - cloud_admin_api.rs (source / functions) Coverage Total Hit UBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 0.0 % 343 0 343
Current Date: 2024-01-09 02:06:09 Functions: 0.0 % 144 0 144
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : #![allow(unused)]
       2                 : 
       3                 : use std::str::FromStr;
       4                 : use std::time::Duration;
       5                 : 
       6                 : use chrono::{DateTime, Utc};
       7                 : use hex::FromHex;
       8                 : use pageserver::tenant::Tenant;
       9                 : use reqwest::{header, Client, StatusCode, Url};
      10                 : use serde::Deserialize;
      11                 : use tokio::sync::Semaphore;
      12                 : 
      13                 : use utils::id::{TenantId, TimelineId};
      14                 : use utils::lsn::Lsn;
      15                 : 
      16                 : use crate::ConsoleConfig;
      17                 : 
      18 UBC           0 : #[derive(Debug)]
      19                 : pub struct Error {
      20                 :     context: String,
      21                 :     kind: ErrorKind,
      22                 : }
      23                 : 
      24                 : impl Error {
      25               0 :     fn new(context: String, kind: ErrorKind) -> Self {
      26               0 :         Self { context, kind }
      27               0 :     }
      28                 : }
      29                 : 
      30                 : impl std::fmt::Display for Error {
      31               0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      32               0 :         match &self.kind {
      33               0 :             ErrorKind::RequestSend(e) => write!(
      34               0 :                 f,
      35               0 :                 "Failed to send a request. Context: {}, error: {}",
      36               0 :                 self.context, e
      37               0 :             ),
      38               0 :             ErrorKind::BodyRead(e) => {
      39               0 :                 write!(
      40               0 :                     f,
      41               0 :                     "Failed to read a request body. Context: {}, error: {}",
      42               0 :                     self.context, e
      43               0 :                 )
      44                 :             }
      45               0 :             ErrorKind::ResponseStatus(status) => {
      46               0 :                 write!(f, "Bad response status {}: {}", status, self.context)
      47                 :             }
      48               0 :             ErrorKind::UnexpectedState => write!(f, "Unexpected state: {}", self.context),
      49                 :         }
      50               0 :     }
      51                 : }
      52                 : 
      53               0 : #[derive(Debug, Clone, serde::Deserialize, Hash, PartialEq, Eq)]
      54                 : #[serde(transparent)]
      55                 : pub struct ProjectId(pub String);
      56                 : 
      57               0 : #[derive(Clone, Debug, serde::Deserialize, Hash, PartialEq, Eq)]
      58                 : #[serde(transparent)]
      59                 : pub struct BranchId(pub String);
      60                 : 
      61                 : impl std::error::Error for Error {}
      62                 : 
      63               0 : #[derive(Debug)]
      64                 : pub enum ErrorKind {
      65                 :     RequestSend(reqwest::Error),
      66                 :     BodyRead(reqwest::Error),
      67                 :     ResponseStatus(StatusCode),
      68                 :     UnexpectedState,
      69                 : }
      70                 : 
      71                 : pub struct CloudAdminApiClient {
      72                 :     request_limiter: Semaphore,
      73                 :     token: String,
      74                 :     base_url: Url,
      75                 :     http_client: Client,
      76                 : }
      77                 : 
      78               0 : #[derive(Debug, serde::Deserialize)]
      79                 : struct AdminApiResponse<T> {
      80                 :     data: T,
      81                 :     total: Option<usize>,
      82                 : }
      83                 : 
      84               0 : #[derive(Debug, serde::Deserialize)]
      85                 : pub struct PageserverData {
      86                 :     pub id: u64,
      87                 :     pub created_at: DateTime<Utc>,
      88                 :     pub updated_at: DateTime<Utc>,
      89                 :     pub region_id: String,
      90                 :     pub version: i64,
      91                 :     pub instance_id: String,
      92                 :     pub port: u16,
      93                 :     pub http_host: String,
      94                 :     pub http_port: u16,
      95                 :     pub active: bool,
      96                 :     pub projects_count: usize,
      97                 :     pub availability_zone_id: String,
      98                 : }
      99                 : 
     100               0 : #[derive(Debug, Clone, serde::Deserialize)]
     101                 : pub struct SafekeeperData {
     102                 :     pub id: u64,
     103                 :     pub created_at: DateTime<Utc>,
     104                 :     pub updated_at: DateTime<Utc>,
     105                 :     pub region_id: String,
     106                 :     pub version: i64,
     107                 :     pub instance_id: String,
     108                 :     pub active: bool,
     109                 :     pub host: String,
     110                 :     pub port: u16,
     111                 :     pub projects_count: usize,
     112                 :     pub availability_zone_id: String,
     113                 : }
     114                 : 
     115                 : /// For ID fields, the Console API does not always return a value or null.  It will
     116                 : /// sometimes return an empty string.  Our native Id type does not consider this acceptable
     117                 : /// (nor should it), so we use a wrapper for talking to the Console API.
     118               0 : fn from_nullable_id<'de, D>(deserializer: D) -> Result<TenantId, D::Error>
     119               0 : where
     120               0 :     D: serde::de::Deserializer<'de>,
     121               0 : {
     122               0 :     if deserializer.is_human_readable() {
     123               0 :         let id_str = String::deserialize(deserializer)?;
     124               0 :         if id_str.is_empty() {
     125                 :             // This is a bogus value, but for the purposes of the scrubber all that
     126                 :             // matters is that it doesn't collide with any real IDs.
     127               0 :             Ok(TenantId::from([0u8; 16]))
     128                 :         } else {
     129               0 :             TenantId::from_hex(&id_str).map_err(|e| serde::de::Error::custom(format!("{e}")))
     130                 :         }
     131                 :     } else {
     132               0 :         let id_arr = <[u8; 16]>::deserialize(deserializer)?;
     133               0 :         Ok(TenantId::from(id_arr))
     134                 :     }
     135               0 : }
     136                 : 
     137               0 : #[derive(Debug, Clone, serde::Deserialize)]
     138                 : pub struct ProjectData {
     139                 :     pub id: ProjectId,
     140                 :     pub name: String,
     141                 :     pub region_id: String,
     142                 :     pub platform_id: String,
     143                 :     pub user_id: String,
     144                 :     pub pageserver_id: u64,
     145                 :     #[serde(deserialize_with = "from_nullable_id")]
     146                 :     pub tenant: TenantId,
     147                 :     pub safekeepers: Vec<SafekeeperData>,
     148                 :     pub deleted: bool,
     149                 :     pub created_at: DateTime<Utc>,
     150                 :     pub updated_at: DateTime<Utc>,
     151                 :     pub pg_version: u32,
     152                 :     pub max_project_size: u64,
     153                 :     pub remote_storage_size: u64,
     154                 :     pub resident_size: u64,
     155                 :     pub synthetic_storage_size: u64,
     156                 :     pub compute_time: u64,
     157                 :     pub data_transfer: u64,
     158                 :     pub data_storage: u64,
     159                 :     pub maintenance_set: Option<String>,
     160                 : }
     161                 : 
     162               0 : #[derive(Debug, serde::Deserialize)]
     163                 : pub struct BranchData {
     164                 :     pub id: BranchId,
     165                 :     pub created_at: DateTime<Utc>,
     166                 :     pub updated_at: DateTime<Utc>,
     167                 :     pub name: String,
     168                 :     pub project_id: ProjectId,
     169                 :     pub timeline_id: TimelineId,
     170                 :     #[serde(default)]
     171                 :     pub parent_id: Option<BranchId>,
     172                 :     #[serde(default)]
     173                 :     pub parent_lsn: Option<Lsn>,
     174                 :     pub default: bool,
     175                 :     pub deleted: bool,
     176                 :     pub logical_size: Option<u64>,
     177                 :     pub physical_size: Option<u64>,
     178                 :     pub written_size: Option<u64>,
     179                 : }
     180                 : 
     181                 : pub trait MaybeDeleted {
     182                 :     fn is_deleted(&self) -> bool;
     183                 : }
     184                 : 
     185                 : impl MaybeDeleted for ProjectData {
     186               0 :     fn is_deleted(&self) -> bool {
     187               0 :         self.deleted
     188               0 :     }
     189                 : }
     190                 : 
     191                 : impl MaybeDeleted for BranchData {
     192               0 :     fn is_deleted(&self) -> bool {
     193               0 :         self.deleted
     194               0 :     }
     195                 : }
     196                 : 
     197                 : impl CloudAdminApiClient {
     198               0 :     pub fn new(config: ConsoleConfig) -> Self {
     199               0 :         Self {
     200               0 :             token: config.token,
     201               0 :             base_url: config.base_url,
     202               0 :             request_limiter: Semaphore::new(200),
     203               0 :             http_client: Client::new(), // TODO timeout configs at least
     204               0 :         }
     205               0 :     }
     206                 : 
     207               0 :     pub async fn find_tenant_project(
     208               0 :         &self,
     209               0 :         tenant_id: TenantId,
     210               0 :     ) -> Result<Option<ProjectData>, Error> {
     211               0 :         let _permit = self
     212               0 :             .request_limiter
     213               0 :             .acquire()
     214               0 :             .await
     215               0 :             .expect("Semaphore is not closed");
     216                 : 
     217               0 :         let response = self
     218               0 :             .http_client
     219               0 :             .get(self.append_url("/projects"))
     220               0 :             .query(&[
     221               0 :                 ("tenant_id", tenant_id.to_string()),
     222               0 :                 ("show_deleted", "true".to_string()),
     223               0 :             ])
     224               0 :             .header(header::ACCEPT, "application/json")
     225               0 :             .bearer_auth(&self.token)
     226               0 :             .send()
     227               0 :             .await
     228               0 :             .map_err(|e| {
     229               0 :                 Error::new(
     230               0 :                     "Find project for tenant".to_string(),
     231               0 :                     ErrorKind::RequestSend(e),
     232               0 :                 )
     233               0 :             })?;
     234                 : 
     235               0 :         let response: AdminApiResponse<Vec<ProjectData>> = response.json().await.map_err(|e| {
     236               0 :             Error::new(
     237               0 :                 "Find project for tenant".to_string(),
     238               0 :                 ErrorKind::BodyRead(e),
     239               0 :             )
     240               0 :         })?;
     241               0 :         match response.data.len() {
     242               0 :             0 => Ok(None),
     243               0 :             1 => Ok(Some(
     244               0 :                 response
     245               0 :                     .data
     246               0 :                     .into_iter()
     247               0 :                     .next()
     248               0 :                     .expect("Should have exactly one element"),
     249               0 :             )),
     250               0 :             too_many => Err(Error::new(
     251               0 :                 format!("Find project for tenant returned {too_many} projects instead of 0 or 1"),
     252               0 :                 ErrorKind::UnexpectedState,
     253               0 :             )),
     254                 :         }
     255               0 :     }
     256                 : 
     257               0 :     pub async fn list_projects(&self, region_id: String) -> Result<Vec<ProjectData>, Error> {
     258               0 :         let _permit = self
     259               0 :             .request_limiter
     260               0 :             .acquire()
     261               0 :             .await
     262               0 :             .expect("Semaphore is not closed");
     263               0 : 
     264               0 :         let mut pagination_offset = 0;
     265               0 :         const PAGINATION_LIMIT: usize = 512;
     266               0 :         let mut result: Vec<ProjectData> = Vec::with_capacity(PAGINATION_LIMIT);
     267                 :         loop {
     268               0 :             let response = self
     269               0 :                 .http_client
     270               0 :                 .get(self.append_url("/projects"))
     271               0 :                 .query(&[
     272               0 :                     ("show_deleted", "false".to_string()),
     273               0 :                     ("limit", format!("{PAGINATION_LIMIT}")),
     274               0 :                     ("offset", format!("{pagination_offset}")),
     275               0 :                 ])
     276               0 :                 .header(header::ACCEPT, "application/json")
     277               0 :                 .bearer_auth(&self.token)
     278               0 :                 .send()
     279               0 :                 .await
     280               0 :                 .map_err(|e| {
     281               0 :                     Error::new(
     282               0 :                         "List active projects".to_string(),
     283               0 :                         ErrorKind::RequestSend(e),
     284               0 :                     )
     285               0 :                 })?;
     286                 : 
     287               0 :             match response.status() {
     288               0 :                 StatusCode::OK => {}
     289                 :                 StatusCode::SERVICE_UNAVAILABLE | StatusCode::TOO_MANY_REQUESTS => {
     290               0 :                     tokio::time::sleep(Duration::from_millis(500)).await;
     291               0 :                     continue;
     292                 :                 }
     293               0 :                 status => {
     294               0 :                     return Err(Error::new(
     295               0 :                         "List active projects".to_string(),
     296               0 :                         ErrorKind::ResponseStatus(response.status()),
     297               0 :                     ))
     298                 :                 }
     299                 :             }
     300                 : 
     301               0 :             let response_bytes = response.bytes().await.map_err(|e| {
     302               0 :                 Error::new("List active projects".to_string(), ErrorKind::BodyRead(e))
     303               0 :             })?;
     304                 : 
     305               0 :             let decode_result =
     306               0 :                 serde_json::from_slice::<AdminApiResponse<Vec<ProjectData>>>(&response_bytes);
     307                 : 
     308               0 :             let mut response = match decode_result {
     309               0 :                 Ok(r) => r,
     310               0 :                 Err(decode) => {
     311               0 :                     tracing::error!(
     312               0 :                         "Failed to decode response body: {}\n{}",
     313               0 :                         decode,
     314               0 :                         String::from_utf8(response_bytes.to_vec()).unwrap()
     315               0 :                     );
     316               0 :                     panic!("we out");
     317                 :                 }
     318                 :             };
     319                 : 
     320               0 :             pagination_offset += response.data.len();
     321               0 : 
     322               0 :             result.extend(response.data.drain(..).filter(|t| t.region_id == region_id));
     323               0 : 
     324               0 :             if pagination_offset >= response.total.unwrap_or(0) {
     325               0 :                 break;
     326               0 :             }
     327                 :         }
     328                 : 
     329               0 :         Ok(result)
     330               0 :     }
     331                 : 
     332               0 :     pub async fn find_timeline_branch(
     333               0 :         &self,
     334               0 :         timeline_id: TimelineId,
     335               0 :     ) -> Result<Option<BranchData>, Error> {
     336               0 :         let _permit = self
     337               0 :             .request_limiter
     338               0 :             .acquire()
     339               0 :             .await
     340               0 :             .expect("Semaphore is not closed");
     341                 : 
     342               0 :         let response = self
     343               0 :             .http_client
     344               0 :             .get(self.append_url("/branches"))
     345               0 :             .query(&[
     346               0 :                 ("timeline_id", timeline_id.to_string()),
     347               0 :                 ("show_deleted", "true".to_string()),
     348               0 :             ])
     349               0 :             .header(header::ACCEPT, "application/json")
     350               0 :             .bearer_auth(&self.token)
     351               0 :             .send()
     352               0 :             .await
     353               0 :             .map_err(|e| {
     354               0 :                 Error::new(
     355               0 :                     "Find branch for timeline".to_string(),
     356               0 :                     ErrorKind::RequestSend(e),
     357               0 :                 )
     358               0 :             })?;
     359                 : 
     360               0 :         let response: AdminApiResponse<Vec<BranchData>> = response.json().await.map_err(|e| {
     361               0 :             Error::new(
     362               0 :                 "Find branch for timeline".to_string(),
     363               0 :                 ErrorKind::BodyRead(e),
     364               0 :             )
     365               0 :         })?;
     366               0 :         match response.data.len() {
     367               0 :             0 => Ok(None),
     368               0 :             1 => Ok(Some(
     369               0 :                 response
     370               0 :                     .data
     371               0 :                     .into_iter()
     372               0 :                     .next()
     373               0 :                     .expect("Should have exactly one element"),
     374               0 :             )),
     375               0 :             too_many => Err(Error::new(
     376               0 :                 format!("Find branch for timeline returned {too_many} branches instead of 0 or 1"),
     377               0 :                 ErrorKind::UnexpectedState,
     378               0 :             )),
     379                 :         }
     380               0 :     }
     381                 : 
     382               0 :     pub async fn list_pageservers(&self) -> Result<Vec<PageserverData>, Error> {
     383               0 :         let _permit = self
     384               0 :             .request_limiter
     385               0 :             .acquire()
     386               0 :             .await
     387               0 :             .expect("Semaphore is not closed");
     388                 : 
     389               0 :         let response = self
     390               0 :             .http_client
     391               0 :             .get(self.append_url("/pageservers"))
     392               0 :             .header(header::ACCEPT, "application/json")
     393               0 :             .bearer_auth(&self.token)
     394               0 :             .send()
     395               0 :             .await
     396               0 :             .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::RequestSend(e)))?;
     397                 : 
     398               0 :         let response: AdminApiResponse<Vec<PageserverData>> = response
     399               0 :             .json()
     400               0 :             .await
     401               0 :             .map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::BodyRead(e)))?;
     402                 : 
     403               0 :         Ok(response.data)
     404               0 :     }
     405                 : 
     406               0 :     pub async fn list_safekeepers(&self) -> Result<Vec<SafekeeperData>, Error> {
     407               0 :         let _permit = self
     408               0 :             .request_limiter
     409               0 :             .acquire()
     410               0 :             .await
     411               0 :             .expect("Semaphore is not closed");
     412                 : 
     413               0 :         let response = self
     414               0 :             .http_client
     415               0 :             .get(self.append_url("/safekeepers"))
     416               0 :             .header(header::ACCEPT, "application/json")
     417               0 :             .bearer_auth(&self.token)
     418               0 :             .send()
     419               0 :             .await
     420               0 :             .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::RequestSend(e)))?;
     421                 : 
     422               0 :         let response: AdminApiResponse<Vec<SafekeeperData>> = response
     423               0 :             .json()
     424               0 :             .await
     425               0 :             .map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::BodyRead(e)))?;
     426                 : 
     427               0 :         Ok(response.data)
     428               0 :     }
     429                 : 
     430               0 :     pub async fn projects_for_pageserver(
     431               0 :         &self,
     432               0 :         pageserver_id: u64,
     433               0 :         show_deleted: bool,
     434               0 :     ) -> Result<Vec<ProjectData>, Error> {
     435               0 :         let _permit = self
     436               0 :             .request_limiter
     437               0 :             .acquire()
     438               0 :             .await
     439               0 :             .expect("Semaphore is not closed");
     440                 : 
     441               0 :         let response = self
     442               0 :             .http_client
     443               0 :             .get(self.append_url("/projects"))
     444               0 :             .query(&[
     445               0 :                 ("pageserver_id", &pageserver_id.to_string()),
     446               0 :                 ("show_deleted", &show_deleted.to_string()),
     447               0 :             ])
     448               0 :             .header(header::ACCEPT, "application/json")
     449               0 :             .bearer_auth(&self.token)
     450               0 :             .send()
     451               0 :             .await
     452               0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
     453                 : 
     454               0 :         let response: AdminApiResponse<Vec<ProjectData>> = response
     455               0 :             .json()
     456               0 :             .await
     457               0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
     458                 : 
     459               0 :         Ok(response.data)
     460               0 :     }
     461                 : 
     462               0 :     pub async fn project_for_tenant(
     463               0 :         &self,
     464               0 :         tenant_id: TenantId,
     465               0 :         show_deleted: bool,
     466               0 :     ) -> Result<Option<ProjectData>, Error> {
     467               0 :         let _permit = self
     468               0 :             .request_limiter
     469               0 :             .acquire()
     470               0 :             .await
     471               0 :             .expect("Semaphore is not closed");
     472                 : 
     473               0 :         let response = self
     474               0 :             .http_client
     475               0 :             .get(self.append_url("/projects"))
     476               0 :             .query(&[
     477               0 :                 ("search", &tenant_id.to_string()),
     478               0 :                 ("show_deleted", &show_deleted.to_string()),
     479               0 :             ])
     480               0 :             .header(header::ACCEPT, "application/json")
     481               0 :             .bearer_auth(&self.token)
     482               0 :             .send()
     483               0 :             .await
     484               0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
     485                 : 
     486               0 :         let response: AdminApiResponse<Vec<ProjectData>> = response
     487               0 :             .json()
     488               0 :             .await
     489               0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
     490                 : 
     491               0 :         match response.data.as_slice() {
     492               0 :             [] => Ok(None),
     493               0 :             [_single] => Ok(Some(response.data.into_iter().next().unwrap())),
     494               0 :             multiple => Err(Error::new(
     495               0 :                 format!("Got more than one project for tenant {tenant_id} : {multiple:?}"),
     496               0 :                 ErrorKind::UnexpectedState,
     497               0 :             )),
     498                 :         }
     499               0 :     }
     500                 : 
     501               0 :     pub async fn branches_for_project(
     502               0 :         &self,
     503               0 :         project_id: &ProjectId,
     504               0 :         show_deleted: bool,
     505               0 :     ) -> Result<Vec<BranchData>, Error> {
     506               0 :         let _permit = self
     507               0 :             .request_limiter
     508               0 :             .acquire()
     509               0 :             .await
     510               0 :             .expect("Semaphore is not closed");
     511                 : 
     512               0 :         let response = self
     513               0 :             .http_client
     514               0 :             .get(self.append_url("/branches"))
     515               0 :             .query(&[
     516               0 :                 ("project_id", &project_id.0),
     517               0 :                 ("show_deleted", &show_deleted.to_string()),
     518               0 :             ])
     519               0 :             .header(header::ACCEPT, "application/json")
     520               0 :             .bearer_auth(&self.token)
     521               0 :             .send()
     522               0 :             .await
     523               0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
     524                 : 
     525               0 :         let response: AdminApiResponse<Vec<BranchData>> = response
     526               0 :             .json()
     527               0 :             .await
     528               0 :             .map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
     529                 : 
     530               0 :         Ok(response.data)
     531               0 :     }
     532                 : 
     533               0 :     fn append_url(&self, subpath: &str) -> Url {
     534               0 :         // TODO fugly, but `.join` does not work when called
     535               0 :         (self.base_url.to_string() + subpath)
     536               0 :             .parse()
     537               0 :             .unwrap_or_else(|e| panic!("Could not append {subpath} to base url: {e}"))
     538               0 :     }
     539                 : }
        

Generated by: LCOV version 2.1-beta