LCOV - code coverage report
Current view: top level - s3_scrubber/src - lib.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 0.0 % 180 0
Test Date: 2023-09-06 10:18:01 Functions: 0.0 % 33 0

            Line data    Source code
       1              : pub mod checks;
       2              : pub mod cloud_admin_api;
       3              : pub mod delete_batch_producer;
       4              : pub mod metadata_stream;
       5              : mod s3_deletion;
       6              : pub mod scan_metadata;
       7              : 
       8              : use std::env;
       9              : use std::fmt::Display;
      10              : use std::time::Duration;
      11              : 
      12              : use anyhow::Context;
      13              : use aws_config::environment::EnvironmentVariableCredentialsProvider;
      14              : use aws_config::imds::credentials::ImdsCredentialsProvider;
      15              : use aws_config::meta::credentials::CredentialsProviderChain;
      16              : use aws_config::sso::SsoCredentialsProvider;
      17              : use aws_sdk_s3::config::Region;
      18              : use aws_sdk_s3::{Client, Config};
      19              : 
      20              : use reqwest::Url;
      21              : pub use s3_deletion::S3Deleter;
      22              : use tokio::io::AsyncReadExt;
      23              : use tracing::error;
      24              : use tracing_appender::non_blocking::WorkerGuard;
      25              : use tracing_subscriber::{fmt, prelude::*, EnvFilter};
      26              : use utils::id::{TenantId, TenantTimelineId};
      27              : 
      28              : const MAX_RETRIES: usize = 20;
      29              : const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
      30              : 
      31              : pub const CLI_NAME: &str = "s3-scrubber";
      32              : 
      33            0 : #[derive(Debug, Clone)]
      34              : pub struct S3Target {
      35              :     pub bucket_name: String,
      36              :     pub prefix_in_bucket: String,
      37              :     pub delimiter: String,
      38              : }
      39              : 
      40            0 : #[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
      41              : pub enum TraversingDepth {
      42              :     Tenant,
      43              :     Timeline,
      44              : }
      45              : 
      46              : impl Display for TraversingDepth {
      47              :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      48            0 :         f.write_str(match self {
      49            0 :             Self::Tenant => "tenant",
      50            0 :             Self::Timeline => "timeline",
      51              :         })
      52            0 :     }
      53              : }
      54              : 
      55              : impl S3Target {
      56            0 :     pub fn with_sub_segment(&self, new_segment: &str) -> Self {
      57            0 :         let mut new_self = self.clone();
      58            0 :         let _ = new_self.prefix_in_bucket.pop();
      59            0 :         new_self.prefix_in_bucket =
      60            0 :             [&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter);
      61            0 :         new_self
      62            0 :     }
      63              : }
      64              : 
      65            0 : #[derive(Clone)]
      66              : pub enum RootTarget {
      67              :     Pageserver(S3Target),
      68              :     Safekeeper(S3Target),
      69              : }
      70              : 
      71              : impl RootTarget {
      72            0 :     pub fn tenants_root(&self) -> &S3Target {
      73            0 :         match self {
      74            0 :             Self::Pageserver(root) => root,
      75            0 :             Self::Safekeeper(root) => root,
      76              :         }
      77            0 :     }
      78              : 
      79            0 :     pub fn tenant_root(&self, tenant_id: &TenantId) -> S3Target {
      80            0 :         self.tenants_root().with_sub_segment(&tenant_id.to_string())
      81            0 :     }
      82              : 
      83            0 :     pub fn timelines_root(&self, tenant_id: &TenantId) -> S3Target {
      84            0 :         match self {
      85            0 :             Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
      86            0 :             Self::Safekeeper(_) => self.tenant_root(tenant_id),
      87              :         }
      88            0 :     }
      89              : 
      90            0 :     pub fn timeline_root(&self, id: &TenantTimelineId) -> S3Target {
      91            0 :         self.timelines_root(&id.tenant_id)
      92            0 :             .with_sub_segment(&id.timeline_id.to_string())
      93            0 :     }
      94              : 
      95            0 :     pub fn bucket_name(&self) -> &str {
      96            0 :         match self {
      97            0 :             Self::Pageserver(root) => &root.bucket_name,
      98            0 :             Self::Safekeeper(root) => &root.bucket_name,
      99              :         }
     100            0 :     }
     101              : 
     102            0 :     pub fn delimiter(&self) -> &str {
     103            0 :         match self {
     104            0 :             Self::Pageserver(root) => &root.delimiter,
     105            0 :             Self::Safekeeper(root) => &root.delimiter,
     106              :         }
     107            0 :     }
     108              : }
     109              : 
     110              : pub struct BucketConfig {
     111              :     pub region: String,
     112              :     pub bucket: String,
     113              : 
     114              :     /// Use SSO if this is set, else rely on AWS_* environment vars
     115              :     pub sso_account_id: Option<String>,
     116              : }
     117              : 
     118              : impl Display for BucketConfig {
     119            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     120            0 :         write!(
     121            0 :             f,
     122            0 :             "{}/{}/{}",
     123            0 :             self.sso_account_id.as_deref().unwrap_or("<none>"),
     124            0 :             self.region,
     125            0 :             self.bucket
     126            0 :         )
     127            0 :     }
     128              : }
     129              : 
     130              : impl BucketConfig {
     131            0 :     pub fn from_env() -> anyhow::Result<Self> {
     132            0 :         let sso_account_id = env::var("SSO_ACCOUNT_ID").ok();
     133            0 :         let region = env::var("REGION").context("'REGION' param retrieval")?;
     134            0 :         let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?;
     135              : 
     136            0 :         Ok(Self {
     137            0 :             region,
     138            0 :             bucket,
     139            0 :             sso_account_id,
     140            0 :         })
     141            0 :     }
     142              : }
     143              : 
     144              : pub struct ConsoleConfig {
     145              :     pub admin_api_url: Url,
     146              : }
     147              : 
     148              : impl ConsoleConfig {
     149            0 :     pub fn from_env() -> anyhow::Result<Self> {
     150            0 :         let admin_api_url: Url = env::var("CLOUD_ADMIN_API_URL")
     151            0 :             .context("'CLOUD_ADMIN_API_URL' param retrieval")?
     152            0 :             .parse()
     153            0 :             .context("'CLOUD_ADMIN_API_URL' param parsing")?;
     154              : 
     155            0 :         Ok(Self { admin_api_url })
     156            0 :     }
     157              : }
     158              : 
     159            0 : pub fn get_cloud_admin_api_token_or_exit() -> String {
     160            0 :     match env::var(CLOUD_ADMIN_API_TOKEN_ENV_VAR) {
     161            0 :         Ok(token) => token,
     162              :         Err(env::VarError::NotPresent) => {
     163            0 :             error!("{CLOUD_ADMIN_API_TOKEN_ENV_VAR} env variable is not present");
     164            0 :             std::process::exit(1);
     165              :         }
     166            0 :         Err(env::VarError::NotUnicode(not_unicode_string)) => {
     167            0 :             error!("{CLOUD_ADMIN_API_TOKEN_ENV_VAR} env variable's value is not a valid unicode string: {not_unicode_string:?}");
     168            0 :             std::process::exit(1);
     169              :         }
     170              :     }
     171            0 : }
     172              : 
     173            0 : pub fn init_logging(file_name: &str) -> WorkerGuard {
     174            0 :     let (file_writer, guard) =
     175            0 :         tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
     176            0 : 
     177            0 :     let file_logs = fmt::Layer::new()
     178            0 :         .with_target(false)
     179            0 :         .with_ansi(false)
     180            0 :         .with_writer(file_writer);
     181            0 :     let stdout_logs = fmt::Layer::new()
     182            0 :         .with_target(false)
     183            0 :         .with_writer(std::io::stdout);
     184            0 :     tracing_subscriber::registry()
     185            0 :         .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
     186            0 :         .with(file_logs)
     187            0 :         .with(stdout_logs)
     188            0 :         .init();
     189            0 : 
     190            0 :     guard
     191            0 : }
     192              : 
     193            0 : pub fn init_s3_client(account_id: Option<String>, bucket_region: Region) -> Client {
     194            0 :     let credentials_provider = {
     195              :         // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
     196            0 :         let chain = CredentialsProviderChain::first_try(
     197            0 :             "env",
     198            0 :             EnvironmentVariableCredentialsProvider::new(),
     199            0 :         );
     200            0 : 
     201            0 :         // Use SSO if we were given an account ID
     202            0 :         match account_id {
     203            0 :             Some(sso_account) => chain.or_else(
     204            0 :                 "sso",
     205            0 :                 SsoCredentialsProvider::builder()
     206            0 :                     .account_id(sso_account)
     207            0 :                     .role_name("PowerUserAccess")
     208            0 :                     .start_url("https://neondb.awsapps.com/start")
     209            0 :                     .region(Region::from_static("eu-central-1"))
     210            0 :                     .build(),
     211            0 :             ),
     212            0 :             None => chain,
     213              :         }
     214            0 :         .or_else(
     215            0 :             // Finally try IMDS
     216            0 :             "imds",
     217            0 :             ImdsCredentialsProvider::builder().build(),
     218            0 :         )
     219            0 :     };
     220            0 : 
     221            0 :     let mut builder = Config::builder()
     222            0 :         .region(bucket_region)
     223            0 :         .credentials_provider(credentials_provider);
     224              : 
     225            0 :     if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") {
     226            0 :         builder = builder.endpoint_url(endpoint)
     227            0 :     }
     228              : 
     229            0 :     Client::from_conf(builder.build())
     230            0 : }
     231              : 
     232            0 : async fn list_objects_with_retries(
     233            0 :     s3_client: &Client,
     234            0 :     s3_target: &S3Target,
     235            0 :     continuation_token: Option<String>,
     236            0 : ) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
     237            0 :     for _ in 0..MAX_RETRIES {
     238            0 :         match s3_client
     239            0 :             .list_objects_v2()
     240            0 :             .bucket(&s3_target.bucket_name)
     241            0 :             .prefix(&s3_target.prefix_in_bucket)
     242            0 :             .delimiter(&s3_target.delimiter)
     243            0 :             .set_continuation_token(continuation_token.clone())
     244            0 :             .send()
     245            0 :             .await
     246              :         {
     247            0 :             Ok(response) => return Ok(response),
     248            0 :             Err(e) => {
     249            0 :                 error!("list_objects_v2 query failed: {e}");
     250            0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     251              :             }
     252              :         }
     253              :     }
     254              : 
     255            0 :     anyhow::bail!("Failed to list objects {MAX_RETRIES} times")
     256            0 : }
     257              : 
     258            0 : async fn download_object_with_retries(
     259            0 :     s3_client: &Client,
     260            0 :     bucket_name: &str,
     261            0 :     key: &str,
     262            0 : ) -> anyhow::Result<Vec<u8>> {
     263            0 :     for _ in 0..MAX_RETRIES {
     264            0 :         let mut body_buf = Vec::new();
     265            0 :         let response_stream = match s3_client
     266            0 :             .get_object()
     267            0 :             .bucket(bucket_name)
     268            0 :             .key(key)
     269            0 :             .send()
     270            0 :             .await
     271              :         {
     272            0 :             Ok(response) => response,
     273            0 :             Err(e) => {
     274            0 :                 error!("Failed to download object for key {key}: {e}");
     275            0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     276            0 :                 continue;
     277              :             }
     278              :         };
     279              : 
     280            0 :         match response_stream
     281            0 :             .body
     282            0 :             .into_async_read()
     283            0 :             .read_to_end(&mut body_buf)
     284            0 :             .await
     285              :         {
     286            0 :             Ok(bytes_read) => {
     287            0 :                 tracing::info!("Downloaded {bytes_read} bytes for object object with key {key}");
     288            0 :                 return Ok(body_buf);
     289              :             }
     290            0 :             Err(e) => {
     291            0 :                 error!("Failed to stream object body for key {key}: {e}");
     292            0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     293              :             }
     294              :         }
     295              :     }
     296              : 
     297            0 :     anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
     298            0 : }
        

Generated by: LCOV version 2.1-beta