LCOV - code coverage report
Current view: top level - s3_scrubber/src - main.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 0.0 % 176 0
Test Date: 2023-09-06 10:18:01 Functions: 0.0 % 61 0

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::fmt::Display;
       3              : use std::num::NonZeroUsize;
       4              : use std::sync::Arc;
       5              : 
       6              : use anyhow::Context;
       7              : use aws_sdk_s3::config::Region;
       8              : use s3_scrubber::cloud_admin_api::CloudAdminApiClient;
       9              : use s3_scrubber::delete_batch_producer::DeleteBatchProducer;
      10              : use s3_scrubber::scan_metadata::scan_metadata;
      11              : use s3_scrubber::{
      12              :     checks, get_cloud_admin_api_token_or_exit, init_logging, init_s3_client, BucketConfig,
      13              :     ConsoleConfig, RootTarget, S3Deleter, S3Target, TraversingDepth, CLI_NAME,
      14              : };
      15              : use tracing::{info, warn};
      16              : 
      17              : use clap::{Parser, Subcommand, ValueEnum};
      18              : 
      19            0 : #[derive(Parser)]
      20              : #[command(author, version, about, long_about = None)]
      21              : #[command(arg_required_else_help(true))]
      22              : struct Cli {
      23              :     #[command(subcommand)]
      24              :     command: Command,
      25              : 
      26            0 :     #[arg(short, long, default_value_t = false)]
      27            0 :     delete: bool,
      28            0 : }
      29              : 
      30            0 : #[derive(ValueEnum, Clone, Copy, Eq, PartialEq)]
      31              : enum NodeKind {
      32              :     Safekeeper,
      33              :     Pageserver,
      34              : }
      35              : 
      36              : impl NodeKind {
      37            0 :     fn as_str(&self) -> &'static str {
      38            0 :         match self {
      39            0 :             Self::Safekeeper => "safekeeper",
      40            0 :             Self::Pageserver => "pageserver",
      41              :         }
      42            0 :     }
      43              : }
      44              : 
      45              : impl Display for NodeKind {
      46            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      47            0 :         f.write_str(self.as_str())
      48            0 :     }
      49              : }
      50              : 
      51            0 : #[derive(Subcommand)]
      52              : enum Command {
      53              :     Tidy {
      54              :         #[arg(short, long)]
      55            0 :         node_kind: NodeKind,
      56            0 :         #[arg(short, long, default_value_t=TraversingDepth::Tenant)]
      57            0 :         depth: TraversingDepth,
      58            0 :         #[arg(short, long, default_value_t = false)]
      59            0 :         skip_validation: bool,
      60            0 :     },
      61              :     ScanMetadata {},
      62              : }
      63              : 
      64            0 : async fn tidy(
      65            0 :     cli: &Cli,
      66            0 :     bucket_config: BucketConfig,
      67            0 :     console_config: ConsoleConfig,
      68            0 :     node_kind: NodeKind,
      69            0 :     depth: TraversingDepth,
      70            0 :     skip_validation: bool,
      71            0 : ) -> anyhow::Result<()> {
      72            0 :     let dry_run = !cli.delete;
      73            0 :     let file_name = if dry_run {
      74            0 :         format!(
      75            0 :             "{}_{}_{}__dry.log",
      76            0 :             CLI_NAME,
      77            0 :             node_kind,
      78            0 :             chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
      79            0 :         )
      80              :     } else {
      81            0 :         format!(
      82            0 :             "{}_{}_{}.log",
      83            0 :             CLI_NAME,
      84            0 :             node_kind,
      85            0 :             chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
      86            0 :         )
      87              :     };
      88              : 
      89            0 :     let _guard = init_logging(&file_name);
      90            0 : 
      91            0 :     if dry_run {
      92            0 :         info!("Dry run, not removing items for real");
      93              :     } else {
      94            0 :         warn!("Dry run disabled, removing bucket items for real");
      95              :     }
      96              : 
      97            0 :     info!("skip_validation={skip_validation}");
      98              : 
      99            0 :     info!("Starting extra S3 removal in {bucket_config} for node kind '{node_kind}', traversing depth: {depth:?}");
     100              : 
     101            0 :     info!("Starting extra tenant S3 removal in {bucket_config} for node kind '{node_kind}'");
     102            0 :     let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(
     103            0 :         get_cloud_admin_api_token_or_exit(),
     104            0 :         console_config.admin_api_url,
     105            0 :     ));
     106            0 : 
     107            0 :     let bucket_region = Region::new(bucket_config.region);
     108            0 :     let delimiter = "/".to_string();
     109            0 :     let s3_client = Arc::new(init_s3_client(bucket_config.sso_account_id, bucket_region));
     110            0 :     let s3_root = match node_kind {
     111            0 :         NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
     112            0 :             bucket_name: bucket_config.bucket,
     113            0 :             prefix_in_bucket: ["pageserver", "v1", "tenants", ""].join(&delimiter),
     114            0 :             delimiter,
     115            0 :         }),
     116            0 :         NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target {
     117            0 :             bucket_name: bucket_config.bucket,
     118            0 :             prefix_in_bucket: ["safekeeper", "v1", "wal", ""].join(&delimiter),
     119            0 :             delimiter,
     120            0 :         }),
     121              :     };
     122              : 
     123            0 :     let delete_batch_producer = DeleteBatchProducer::start(
     124            0 :         Arc::clone(&cloud_admin_api_client),
     125            0 :         Arc::clone(&s3_client),
     126            0 :         s3_root.clone(),
     127            0 :         depth,
     128            0 :     );
     129            0 : 
     130            0 :     let s3_deleter = S3Deleter::new(
     131            0 :         dry_run,
     132            0 :         NonZeroUsize::new(15).unwrap(),
     133            0 :         Arc::clone(&s3_client),
     134            0 :         delete_batch_producer.subscribe(),
     135            0 :         s3_root.clone(),
     136            0 :     );
     137              : 
     138            0 :     let (deleter_task_result, batch_producer_task_result) =
     139            0 :         tokio::join!(s3_deleter.remove_all(), delete_batch_producer.join());
     140              : 
     141            0 :     let deletion_stats = deleter_task_result.context("s3 deletion")?;
     142            0 :     info!(
     143            0 :         "Deleted {} tenants ({} keys) and {} timelines ({} keys) total. Dry run: {}",
     144            0 :         deletion_stats.deleted_tenant_keys.len(),
     145            0 :         deletion_stats.deleted_tenant_keys.values().sum::<usize>(),
     146            0 :         deletion_stats.deleted_timeline_keys.len(),
     147            0 :         deletion_stats.deleted_timeline_keys.values().sum::<usize>(),
     148            0 :         dry_run,
     149            0 :     );
     150            0 :     info!(
     151            0 :         "Total tenant deletion stats: {:?}",
     152            0 :         deletion_stats
     153            0 :             .deleted_tenant_keys
     154            0 :             .into_iter()
     155            0 :             .map(|(id, key)| (id.to_string(), key))
     156            0 :             .collect::<HashMap<_, _>>()
     157            0 :     );
     158            0 :     info!(
     159            0 :         "Total timeline deletion stats: {:?}",
     160            0 :         deletion_stats
     161            0 :             .deleted_timeline_keys
     162            0 :             .into_iter()
     163            0 :             .map(|(id, key)| (id.to_string(), key))
     164            0 :             .collect::<HashMap<_, _>>()
     165            0 :     );
     166              : 
     167            0 :     let batch_producer_stats = batch_producer_task_result.context("delete batch producer join")?;
     168            0 :     info!(
     169            0 :         "Total bucket tenants listed: {}; for {} active tenants, timelines checked: {}",
     170            0 :         batch_producer_stats.tenants_checked(),
     171            0 :         batch_producer_stats.active_tenants(),
     172            0 :         batch_producer_stats.timelines_checked()
     173            0 :     );
     174              : 
     175            0 :     if node_kind == NodeKind::Pageserver {
     176            0 :         info!("node_kind != pageserver, finish without performing validation step");
     177            0 :         return Ok(());
     178            0 :     }
     179            0 : 
     180            0 :     if skip_validation {
     181            0 :         info!("--skip-validation is set, exiting");
     182            0 :         return Ok(());
     183            0 :     }
     184              : 
     185            0 :     info!("validating active tenants and timelines for pageserver S3 data");
     186              : 
     187              :     // TODO kb real stats for validation + better stats for every place: add and print `min`, `max`, `mean` values at least
     188            0 :     let validation_stats = checks::validate_pageserver_active_tenant_and_timelines(
     189            0 :         s3_client,
     190            0 :         s3_root,
     191            0 :         cloud_admin_api_client,
     192            0 :         batch_producer_stats,
     193            0 :     )
     194            0 :     .await
     195            0 :     .context("active tenant and timeline validation")?;
     196            0 :     info!("Finished active tenant and timeline validation, correct timelines: {}, timeline validation errors: {}",
     197            0 :         validation_stats.normal_timelines.len(), validation_stats.timelines_with_errors.len());
     198            0 :     if !validation_stats.timelines_with_errors.is_empty() {
     199            0 :         warn!(
     200            0 :             "Validation errors: {:#?}",
     201            0 :             validation_stats
     202            0 :                 .timelines_with_errors
     203            0 :                 .into_iter()
     204            0 :                 .map(|(id, errors)| (id.to_string(), format!("{errors:?}")))
     205            0 :                 .collect::<HashMap<_, _>>()
     206            0 :         );
     207            0 :     }
     208              : 
     209            0 :     info!("Done");
     210            0 :     Ok(())
     211            0 : }
     212              : 
     213              : #[tokio::main]
     214            0 : async fn main() -> anyhow::Result<()> {
     215            0 :     let cli = Cli::parse();
     216              : 
     217            0 :     let bucket_config = BucketConfig::from_env()?;
     218              : 
     219            0 :     match cli.command {
     220              :         Command::Tidy {
     221            0 :             node_kind,
     222            0 :             depth,
     223            0 :             skip_validation,
     224              :         } => {
     225            0 :             let console_config = ConsoleConfig::from_env()?;
     226            0 :             tidy(
     227            0 :                 &cli,
     228            0 :                 bucket_config,
     229            0 :                 console_config,
     230            0 :                 node_kind,
     231            0 :                 depth,
     232            0 :                 skip_validation,
     233            0 :             )
     234            0 :             .await
     235              :         }
     236            0 :         Command::ScanMetadata {} => match scan_metadata(bucket_config).await {
     237            0 :             Err(e) => {
     238            0 :                 tracing::error!("Failed: {e}");
     239            0 :                 Err(e)
     240              :             }
     241            0 :             Ok(summary) => {
     242            0 :                 println!("{}", summary.summary_string());
     243            0 :                 if summary.is_fatal() {
     244            0 :                     Err(anyhow::anyhow!("Fatal scrub errors detected"))
     245              :                 } else {
     246            0 :                     Ok(())
     247              :                 }
     248              :             }
     249              :         },
     250              :     }
     251              : }
        

Generated by: LCOV version 2.1-beta