LCOV - differential code coverage report
Current view: top level - s3_scrubber/src - lib.rs (source / functions) Coverage Total Hit UBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 0.0 % 182 0 182
Current Date: 2023-10-19 02:04:12 Functions: 0.0 % 33 0 33
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : pub mod checks;
       2                 : pub mod cloud_admin_api;
       3                 : pub mod delete_batch_producer;
       4                 : pub mod metadata_stream;
       5                 : mod s3_deletion;
       6                 : pub mod scan_metadata;
       7                 : 
       8                 : use std::env;
       9                 : use std::fmt::Display;
      10                 : use std::time::Duration;
      11                 : 
      12                 : use anyhow::Context;
      13                 : use aws_config::environment::EnvironmentVariableCredentialsProvider;
      14                 : use aws_config::imds::credentials::ImdsCredentialsProvider;
      15                 : use aws_config::meta::credentials::CredentialsProviderChain;
      16                 : use aws_config::sso::SsoCredentialsProvider;
      17                 : use aws_sdk_s3::config::Region;
      18                 : use aws_sdk_s3::{Client, Config};
      19                 : 
      20                 : use reqwest::Url;
      21                 : pub use s3_deletion::S3Deleter;
      22                 : use std::io::IsTerminal;
      23                 : use tokio::io::AsyncReadExt;
      24                 : use tracing::error;
      25                 : use tracing_appender::non_blocking::WorkerGuard;
      26                 : use tracing_subscriber::{fmt, prelude::*, EnvFilter};
      27                 : use utils::id::{TenantId, TenantTimelineId};
      28                 : 
      29                 : const MAX_RETRIES: usize = 20;
      30                 : const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
      31                 : 
      32                 : pub const CLI_NAME: &str = "s3-scrubber";
      33                 : 
      34 UBC           0 : #[derive(Debug, Clone)]
      35                 : pub struct S3Target {
      36                 :     pub bucket_name: String,
      37                 :     pub prefix_in_bucket: String,
      38                 :     pub delimiter: String,
      39                 : }
      40                 : 
      41               0 : #[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
      42                 : pub enum TraversingDepth {
      43                 :     Tenant,
      44                 :     Timeline,
      45                 : }
      46                 : 
      47                 : impl Display for TraversingDepth {
      48               0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      49               0 :         f.write_str(match self {
      50               0 :             Self::Tenant => "tenant",
      51               0 :             Self::Timeline => "timeline",
      52                 :         })
      53               0 :     }
      54                 : }
      55                 : 
      56                 : impl S3Target {
      57               0 :     pub fn with_sub_segment(&self, new_segment: &str) -> Self {
      58               0 :         let mut new_self = self.clone();
      59               0 :         let _ = new_self.prefix_in_bucket.pop();
      60               0 :         new_self.prefix_in_bucket =
      61               0 :             [&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter);
      62               0 :         new_self
      63               0 :     }
      64                 : }
      65                 : 
      66               0 : #[derive(Clone)]
      67                 : pub enum RootTarget {
      68                 :     Pageserver(S3Target),
      69                 :     Safekeeper(S3Target),
      70                 : }
      71                 : 
      72                 : impl RootTarget {
      73               0 :     pub fn tenants_root(&self) -> &S3Target {
      74               0 :         match self {
      75               0 :             Self::Pageserver(root) => root,
      76               0 :             Self::Safekeeper(root) => root,
      77                 :         }
      78               0 :     }
      79                 : 
      80               0 :     pub fn tenant_root(&self, tenant_id: &TenantId) -> S3Target {
      81               0 :         self.tenants_root().with_sub_segment(&tenant_id.to_string())
      82               0 :     }
      83                 : 
      84               0 :     pub fn timelines_root(&self, tenant_id: &TenantId) -> S3Target {
      85               0 :         match self {
      86               0 :             Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
      87               0 :             Self::Safekeeper(_) => self.tenant_root(tenant_id),
      88                 :         }
      89               0 :     }
      90                 : 
      91               0 :     pub fn timeline_root(&self, id: &TenantTimelineId) -> S3Target {
      92               0 :         self.timelines_root(&id.tenant_id)
      93               0 :             .with_sub_segment(&id.timeline_id.to_string())
      94               0 :     }
      95                 : 
      96               0 :     pub fn bucket_name(&self) -> &str {
      97               0 :         match self {
      98               0 :             Self::Pageserver(root) => &root.bucket_name,
      99               0 :             Self::Safekeeper(root) => &root.bucket_name,
     100                 :         }
     101               0 :     }
     102                 : 
     103               0 :     pub fn delimiter(&self) -> &str {
     104               0 :         match self {
     105               0 :             Self::Pageserver(root) => &root.delimiter,
     106               0 :             Self::Safekeeper(root) => &root.delimiter,
     107                 :         }
     108               0 :     }
     109                 : }
     110                 : 
     111                 : pub struct BucketConfig {
     112                 :     pub region: String,
     113                 :     pub bucket: String,
     114                 : 
     115                 :     /// Use SSO if this is set, else rely on AWS_* environment vars
     116                 :     pub sso_account_id: Option<String>,
     117                 : }
     118                 : 
     119                 : impl Display for BucketConfig {
     120               0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     121               0 :         write!(
     122               0 :             f,
     123               0 :             "{}/{}/{}",
     124               0 :             self.sso_account_id.as_deref().unwrap_or("<none>"),
     125               0 :             self.region,
     126               0 :             self.bucket
     127               0 :         )
     128               0 :     }
     129                 : }
     130                 : 
     131                 : impl BucketConfig {
     132               0 :     pub fn from_env() -> anyhow::Result<Self> {
     133               0 :         let sso_account_id = env::var("SSO_ACCOUNT_ID").ok();
     134               0 :         let region = env::var("REGION").context("'REGION' param retrieval")?;
     135               0 :         let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?;
     136                 : 
     137               0 :         Ok(Self {
     138               0 :             region,
     139               0 :             bucket,
     140               0 :             sso_account_id,
     141               0 :         })
     142               0 :     }
     143                 : }
     144                 : 
     145                 : pub struct ConsoleConfig {
     146                 :     pub admin_api_url: Url,
     147                 : }
     148                 : 
     149                 : impl ConsoleConfig {
     150               0 :     pub fn from_env() -> anyhow::Result<Self> {
     151               0 :         let admin_api_url: Url = env::var("CLOUD_ADMIN_API_URL")
     152               0 :             .context("'CLOUD_ADMIN_API_URL' param retrieval")?
     153               0 :             .parse()
     154               0 :             .context("'CLOUD_ADMIN_API_URL' param parsing")?;
     155                 : 
     156               0 :         Ok(Self { admin_api_url })
     157               0 :     }
     158                 : }
     159                 : 
     160               0 : pub fn get_cloud_admin_api_token_or_exit() -> String {
     161               0 :     match env::var(CLOUD_ADMIN_API_TOKEN_ENV_VAR) {
     162               0 :         Ok(token) => token,
     163                 :         Err(env::VarError::NotPresent) => {
     164               0 :             error!("{CLOUD_ADMIN_API_TOKEN_ENV_VAR} env variable is not present");
     165               0 :             std::process::exit(1);
     166                 :         }
     167               0 :         Err(env::VarError::NotUnicode(not_unicode_string)) => {
     168               0 :             error!("{CLOUD_ADMIN_API_TOKEN_ENV_VAR} env variable's value is not a valid unicode string: {not_unicode_string:?}");
     169               0 :             std::process::exit(1);
     170                 :         }
     171                 :     }
     172               0 : }
     173                 : 
     174               0 : pub fn init_logging(file_name: &str) -> WorkerGuard {
     175               0 :     let (file_writer, guard) =
     176               0 :         tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
     177               0 : 
     178               0 :     let file_logs = fmt::Layer::new()
     179               0 :         .with_target(false)
     180               0 :         .with_ansi(false)
     181               0 :         .with_writer(file_writer);
     182               0 :     let stdout_logs = fmt::Layer::new()
     183               0 :         .with_ansi(std::io::stdout().is_terminal())
     184               0 :         .with_target(false)
     185               0 :         .with_writer(std::io::stdout);
     186               0 :     tracing_subscriber::registry()
     187               0 :         .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
     188               0 :         .with(file_logs)
     189               0 :         .with(stdout_logs)
     190               0 :         .init();
     191               0 : 
     192               0 :     guard
     193               0 : }
     194                 : 
     195               0 : pub fn init_s3_client(account_id: Option<String>, bucket_region: Region) -> Client {
     196               0 :     let credentials_provider = {
     197                 :         // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
     198               0 :         let chain = CredentialsProviderChain::first_try(
     199               0 :             "env",
     200               0 :             EnvironmentVariableCredentialsProvider::new(),
     201               0 :         );
     202               0 : 
     203               0 :         // Use SSO if we were given an account ID
     204               0 :         match account_id {
     205               0 :             Some(sso_account) => chain.or_else(
     206               0 :                 "sso",
     207               0 :                 SsoCredentialsProvider::builder()
     208               0 :                     .account_id(sso_account)
     209               0 :                     .role_name("PowerUserAccess")
     210               0 :                     .start_url("https://neondb.awsapps.com/start")
     211               0 :                     .region(Region::from_static("eu-central-1"))
     212               0 :                     .build(),
     213               0 :             ),
     214               0 :             None => chain,
     215                 :         }
     216               0 :         .or_else(
     217               0 :             // Finally try IMDS
     218               0 :             "imds",
     219               0 :             ImdsCredentialsProvider::builder().build(),
     220               0 :         )
     221               0 :     };
     222               0 : 
     223               0 :     let mut builder = Config::builder()
     224               0 :         .region(bucket_region)
     225               0 :         .credentials_provider(credentials_provider);
     226                 : 
     227               0 :     if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") {
     228               0 :         builder = builder.endpoint_url(endpoint)
     229               0 :     }
     230                 : 
     231               0 :     Client::from_conf(builder.build())
     232               0 : }
     233                 : 
     234               0 : async fn list_objects_with_retries(
     235               0 :     s3_client: &Client,
     236               0 :     s3_target: &S3Target,
     237               0 :     continuation_token: Option<String>,
     238               0 : ) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
     239               0 :     for _ in 0..MAX_RETRIES {
     240               0 :         match s3_client
     241               0 :             .list_objects_v2()
     242               0 :             .bucket(&s3_target.bucket_name)
     243               0 :             .prefix(&s3_target.prefix_in_bucket)
     244               0 :             .delimiter(&s3_target.delimiter)
     245               0 :             .set_continuation_token(continuation_token.clone())
     246               0 :             .send()
     247               0 :             .await
     248                 :         {
     249               0 :             Ok(response) => return Ok(response),
     250               0 :             Err(e) => {
     251               0 :                 error!("list_objects_v2 query failed: {e}");
     252               0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     253                 :             }
     254                 :         }
     255                 :     }
     256                 : 
     257               0 :     anyhow::bail!("Failed to list objects {MAX_RETRIES} times")
     258               0 : }
     259                 : 
     260               0 : async fn download_object_with_retries(
     261               0 :     s3_client: &Client,
     262               0 :     bucket_name: &str,
     263               0 :     key: &str,
     264               0 : ) -> anyhow::Result<Vec<u8>> {
     265               0 :     for _ in 0..MAX_RETRIES {
     266               0 :         let mut body_buf = Vec::new();
     267               0 :         let response_stream = match s3_client
     268               0 :             .get_object()
     269               0 :             .bucket(bucket_name)
     270               0 :             .key(key)
     271               0 :             .send()
     272               0 :             .await
     273                 :         {
     274               0 :             Ok(response) => response,
     275               0 :             Err(e) => {
     276               0 :                 error!("Failed to download object for key {key}: {e}");
     277               0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     278               0 :                 continue;
     279                 :             }
     280                 :         };
     281                 : 
     282               0 :         match response_stream
     283               0 :             .body
     284               0 :             .into_async_read()
     285               0 :             .read_to_end(&mut body_buf)
     286               0 :             .await
     287                 :         {
     288               0 :             Ok(bytes_read) => {
     289               0 :                 tracing::info!("Downloaded {bytes_read} bytes for object object with key {key}");
     290               0 :                 return Ok(body_buf);
     291                 :             }
     292               0 :             Err(e) => {
     293               0 :                 error!("Failed to stream object body for key {key}: {e}");
     294               0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     295                 :             }
     296                 :         }
     297                 :     }
     298                 : 
     299               0 :     anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
     300               0 : }
        

Generated by: LCOV version 2.1-beta