LCOV - differential code coverage report
Current view: top level - s3_scrubber/src - main.rs (source / functions) Coverage Total Hit UBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 0.0 % 176 0 176
Current Date: 2023-10-19 02:04:12 Functions: 0.0 % 61 0 61
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use std::collections::HashMap;
       2                 : use std::fmt::Display;
       3                 : use std::num::NonZeroUsize;
       4                 : use std::sync::Arc;
       5                 : 
       6                 : use anyhow::Context;
       7                 : use aws_sdk_s3::config::Region;
       8                 : use s3_scrubber::cloud_admin_api::CloudAdminApiClient;
       9                 : use s3_scrubber::delete_batch_producer::DeleteBatchProducer;
      10                 : use s3_scrubber::scan_metadata::scan_metadata;
      11                 : use s3_scrubber::{
      12                 :     checks, get_cloud_admin_api_token_or_exit, init_logging, init_s3_client, BucketConfig,
      13                 :     ConsoleConfig, RootTarget, S3Deleter, S3Target, TraversingDepth, CLI_NAME,
      14                 : };
      15                 : use tracing::{info, warn};
      16                 : 
      17                 : use clap::{Parser, Subcommand, ValueEnum};
      18                 : 
      19 UBC           0 : #[derive(Parser)]
      20                 : #[command(author, version, about, long_about = None)]
      21                 : #[command(arg_required_else_help(true))]
      22                 : struct Cli {
      23                 :     #[command(subcommand)]
      24                 :     command: Command,
      25                 : 
      26               0 :     #[arg(short, long, default_value_t = false)]
      27               0 :     delete: bool,
      28               0 : }
      29                 : 
      30               0 : #[derive(ValueEnum, Clone, Copy, Eq, PartialEq)]
      31                 : enum NodeKind {
      32                 :     Safekeeper,
      33                 :     Pageserver,
      34                 : }
      35                 : 
      36                 : impl NodeKind {
      37               0 :     fn as_str(&self) -> &'static str {
      38               0 :         match self {
      39               0 :             Self::Safekeeper => "safekeeper",
      40               0 :             Self::Pageserver => "pageserver",
      41                 :         }
      42               0 :     }
      43                 : }
      44                 : 
      45                 : impl Display for NodeKind {
      46               0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      47               0 :         f.write_str(self.as_str())
      48               0 :     }
      49                 : }
      50                 : 
      51               0 : #[derive(Subcommand)]
      52                 : enum Command {
      53                 :     Tidy {
      54                 :         #[arg(short, long)]
      55               0 :         node_kind: NodeKind,
      56               0 :         #[arg(short, long, default_value_t=TraversingDepth::Tenant)]
      57               0 :         depth: TraversingDepth,
      58               0 :         #[arg(short, long, default_value_t = false)]
      59               0 :         skip_validation: bool,
      60               0 :     },
      61                 :     ScanMetadata {},
      62                 : }
      63                 : 
      64               0 : async fn tidy(
      65               0 :     cli: &Cli,
      66               0 :     bucket_config: BucketConfig,
      67               0 :     console_config: ConsoleConfig,
      68               0 :     node_kind: NodeKind,
      69               0 :     depth: TraversingDepth,
      70               0 :     skip_validation: bool,
      71               0 : ) -> anyhow::Result<()> {
      72               0 :     let dry_run = !cli.delete;
      73               0 :     let file_name = if dry_run {
      74               0 :         format!(
      75               0 :             "{}_{}_{}__dry.log",
      76               0 :             CLI_NAME,
      77               0 :             node_kind,
      78               0 :             chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
      79               0 :         )
      80                 :     } else {
      81               0 :         format!(
      82               0 :             "{}_{}_{}.log",
      83               0 :             CLI_NAME,
      84               0 :             node_kind,
      85               0 :             chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
      86               0 :         )
      87                 :     };
      88                 : 
      89               0 :     let _guard = init_logging(&file_name);
      90               0 : 
      91               0 :     if dry_run {
      92               0 :         info!("Dry run, not removing items for real");
      93                 :     } else {
      94               0 :         warn!("Dry run disabled, removing bucket items for real");
      95                 :     }
      96                 : 
      97               0 :     info!("skip_validation={skip_validation}");
      98                 : 
      99               0 :     info!("Starting extra S3 removal in {bucket_config} for node kind '{node_kind}', traversing depth: {depth:?}");
     100                 : 
     101               0 :     info!("Starting extra tenant S3 removal in {bucket_config} for node kind '{node_kind}'");
     102               0 :     let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(
     103               0 :         get_cloud_admin_api_token_or_exit(),
     104               0 :         console_config.admin_api_url,
     105               0 :     ));
     106               0 : 
     107               0 :     let bucket_region = Region::new(bucket_config.region);
     108               0 :     let delimiter = "/".to_string();
     109               0 :     let s3_client = Arc::new(init_s3_client(bucket_config.sso_account_id, bucket_region));
     110               0 :     let s3_root = match node_kind {
     111               0 :         NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
     112               0 :             bucket_name: bucket_config.bucket,
     113               0 :             prefix_in_bucket: ["pageserver", "v1", "tenants", ""].join(&delimiter),
     114               0 :             delimiter,
     115               0 :         }),
     116               0 :         NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target {
     117               0 :             bucket_name: bucket_config.bucket,
     118               0 :             prefix_in_bucket: ["safekeeper", "v1", "wal", ""].join(&delimiter),
     119               0 :             delimiter,
     120               0 :         }),
     121                 :     };
     122                 : 
     123               0 :     let delete_batch_producer = DeleteBatchProducer::start(
     124               0 :         Arc::clone(&cloud_admin_api_client),
     125               0 :         Arc::clone(&s3_client),
     126               0 :         s3_root.clone(),
     127               0 :         depth,
     128               0 :     );
     129               0 : 
     130               0 :     let s3_deleter = S3Deleter::new(
     131               0 :         dry_run,
     132               0 :         NonZeroUsize::new(15).unwrap(),
     133               0 :         Arc::clone(&s3_client),
     134               0 :         delete_batch_producer.subscribe(),
     135               0 :         s3_root.clone(),
     136               0 :     );
     137                 : 
     138               0 :     let (deleter_task_result, batch_producer_task_result) =
     139               0 :         tokio::join!(s3_deleter.remove_all(), delete_batch_producer.join());
     140                 : 
     141               0 :     let deletion_stats = deleter_task_result.context("s3 deletion")?;
     142               0 :     info!(
     143               0 :         "Deleted {} tenants ({} keys) and {} timelines ({} keys) total. Dry run: {}",
     144               0 :         deletion_stats.deleted_tenant_keys.len(),
     145               0 :         deletion_stats.deleted_tenant_keys.values().sum::<usize>(),
     146               0 :         deletion_stats.deleted_timeline_keys.len(),
     147               0 :         deletion_stats.deleted_timeline_keys.values().sum::<usize>(),
     148               0 :         dry_run,
     149               0 :     );
     150               0 :     info!(
     151               0 :         "Total tenant deletion stats: {:?}",
     152               0 :         deletion_stats
     153               0 :             .deleted_tenant_keys
     154               0 :             .into_iter()
     155               0 :             .map(|(id, key)| (id.to_string(), key))
     156               0 :             .collect::<HashMap<_, _>>()
     157               0 :     );
     158               0 :     info!(
     159               0 :         "Total timeline deletion stats: {:?}",
     160               0 :         deletion_stats
     161               0 :             .deleted_timeline_keys
     162               0 :             .into_iter()
     163               0 :             .map(|(id, key)| (id.to_string(), key))
     164               0 :             .collect::<HashMap<_, _>>()
     165               0 :     );
     166                 : 
     167               0 :     let batch_producer_stats = batch_producer_task_result.context("delete batch producer join")?;
     168               0 :     info!(
     169               0 :         "Total bucket tenants listed: {}; for {} active tenants, timelines checked: {}",
     170               0 :         batch_producer_stats.tenants_checked(),
     171               0 :         batch_producer_stats.active_tenants(),
     172               0 :         batch_producer_stats.timelines_checked()
     173               0 :     );
     174                 : 
     175               0 :     if node_kind == NodeKind::Pageserver {
     176               0 :         info!("node_kind != pageserver, finish without performing validation step");
     177               0 :         return Ok(());
     178               0 :     }
     179               0 : 
     180               0 :     if skip_validation {
     181               0 :         info!("--skip-validation is set, exiting");
     182               0 :         return Ok(());
     183               0 :     }
     184                 : 
     185               0 :     info!("validating active tenants and timelines for pageserver S3 data");
     186                 : 
     187                 :     // TODO kb real stats for validation + better stats for every place: add and print `min`, `max`, `mean` values at least
     188               0 :     let validation_stats = checks::validate_pageserver_active_tenant_and_timelines(
     189               0 :         s3_client,
     190               0 :         s3_root,
     191               0 :         cloud_admin_api_client,
     192               0 :         batch_producer_stats,
     193               0 :     )
     194               0 :     .await
     195               0 :     .context("active tenant and timeline validation")?;
     196               0 :     info!("Finished active tenant and timeline validation, correct timelines: {}, timeline validation errors: {}",
     197               0 :         validation_stats.normal_timelines.len(), validation_stats.timelines_with_errors.len());
     198               0 :     if !validation_stats.timelines_with_errors.is_empty() {
     199               0 :         warn!(
     200               0 :             "Validation errors: {:#?}",
     201               0 :             validation_stats
     202               0 :                 .timelines_with_errors
     203               0 :                 .into_iter()
     204               0 :                 .map(|(id, errors)| (id.to_string(), format!("{errors:?}")))
     205               0 :                 .collect::<HashMap<_, _>>()
     206               0 :         );
     207               0 :     }
     208                 : 
     209               0 :     info!("Done");
     210               0 :     Ok(())
     211               0 : }
     212                 : 
     213                 : #[tokio::main]
     214               0 : async fn main() -> anyhow::Result<()> {
     215               0 :     let cli = Cli::parse();
     216                 : 
     217               0 :     let bucket_config = BucketConfig::from_env()?;
     218                 : 
     219               0 :     match cli.command {
     220                 :         Command::Tidy {
     221               0 :             node_kind,
     222               0 :             depth,
     223               0 :             skip_validation,
     224                 :         } => {
     225               0 :             let console_config = ConsoleConfig::from_env()?;
     226               0 :             tidy(
     227               0 :                 &cli,
     228               0 :                 bucket_config,
     229               0 :                 console_config,
     230               0 :                 node_kind,
     231               0 :                 depth,
     232               0 :                 skip_validation,
     233               0 :             )
     234               0 :             .await
     235                 :         }
     236               0 :         Command::ScanMetadata {} => match scan_metadata(bucket_config).await {
     237               0 :             Err(e) => {
     238               0 :                 tracing::error!("Failed: {e}");
     239               0 :                 Err(e)
     240                 :             }
     241               0 :             Ok(summary) => {
     242               0 :                 println!("{}", summary.summary_string());
     243               0 :                 if summary.is_fatal() {
     244               0 :                     Err(anyhow::anyhow!("Fatal scrub errors detected"))
     245                 :                 } else {
     246               0 :                     Ok(())
     247                 :                 }
     248                 :             }
     249                 :         },
     250                 :     }
     251                 : }
        

Generated by: LCOV version 2.1-beta