LCOV - code coverage report
Current view: top level - storage_scrubber/src - main.rs (source / functions) Coverage Total Hit
Test: bb522999b2ee0ee028df22bb188d3a84170ba700.info Lines: 0.0 % 200 0
Test Date: 2024-07-21 16:16:09 Functions: 0.0 % 60 0

            Line data    Source code
       1              : use anyhow::{anyhow, bail};
       2              : use camino::Utf8PathBuf;
       3              : use pageserver_api::shard::TenantShardId;
       4              : use reqwest::Url;
       5              : use storage_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
       6              : use storage_scrubber::pageserver_physical_gc::GcMode;
       7              : use storage_scrubber::scan_pageserver_metadata::scan_metadata;
       8              : use storage_scrubber::tenant_snapshot::SnapshotDownloader;
       9              : use storage_scrubber::{find_large_objects, ControllerClientConfig};
      10              : use storage_scrubber::{
      11              :     init_logging, pageserver_physical_gc::pageserver_physical_gc,
      12              :     scan_safekeeper_metadata::scan_safekeeper_metadata, BucketConfig, ConsoleConfig, NodeKind,
      13              :     TraversingDepth,
      14              : };
      15              : 
      16              : use clap::{Parser, Subcommand};
      17              : use utils::id::TenantId;
      18              : 
      19            0 : #[derive(Parser)]
      20              : #[command(author, version, about, long_about = None)]
      21              : #[command(arg_required_else_help(true))]
      22              : struct Cli {
      23              :     #[command(subcommand)]
      24              :     command: Command,
      25              : 
      26            0 :     #[arg(short, long, default_value_t = false)]
      27            0 :     delete: bool,
      28              : 
      29              :     #[arg(long)]
      30              :     /// URL to storage controller.  e.g. http://127.0.0.1:1234 when using `neon_local`
      31              :     controller_api: Option<Url>,
      32              : 
      33              :     #[arg(long)]
      34              :     /// JWT token for authenticating with storage controller.  Requires scope 'scrubber' or 'admin'.
      35              :     controller_jwt: Option<String>,
      36              : }
      37              : 
      38            0 : #[derive(Subcommand, Debug)]
      39              : enum Command {
      40              :     FindGarbage {
      41              :         #[arg(short, long)]
      42            0 :         node_kind: NodeKind,
      43            0 :         #[arg(short, long, default_value_t=TraversingDepth::Tenant)]
      44            0 :         depth: TraversingDepth,
      45            0 :         #[arg(short, long, default_value_t = String::from("garbage.json"))]
      46            0 :         output_path: String,
      47              :     },
      48              :     PurgeGarbage {
      49              :         #[arg(short, long)]
      50            0 :         input_path: String,
      51            0 :         #[arg(short, long, default_value_t = PurgeMode::DeletedOnly)]
      52            0 :         mode: PurgeMode,
      53              :     },
      54              :     #[command(verbatim_doc_comment)]
      55              :     ScanMetadata {
      56              :         #[arg(short, long)]
      57            0 :         node_kind: NodeKind,
      58            0 :         #[arg(short, long, default_value_t = false)]
      59            0 :         json: bool,
      60              :         #[arg(long = "tenant-id", num_args = 0..)]
      61            0 :         tenant_ids: Vec<TenantShardId>,
      62              :         #[arg(long, default_value = None)]
      63              :         /// For safekeeper node_kind only, points to db with debug dump
      64              :         dump_db_connstr: Option<String>,
      65              :         /// For safekeeper node_kind only, table in the db with debug dump
      66              :         #[arg(long, default_value = None)]
      67              :         dump_db_table: Option<String>,
      68              :     },
      69              :     TenantSnapshot {
      70              :         #[arg(long = "tenant-id")]
      71            0 :         tenant_id: TenantId,
      72            0 :         #[arg(long = "concurrency", short = 'j', default_value_t = 8)]
      73            0 :         concurrency: usize,
      74              :         #[arg(short, long)]
      75            0 :         output_path: Utf8PathBuf,
      76              :     },
      77              :     PageserverPhysicalGc {
      78              :         #[arg(long = "tenant-id", num_args = 0..)]
      79            0 :         tenant_ids: Vec<TenantShardId>,
      80              :         #[arg(long = "min-age")]
      81            0 :         min_age: humantime::Duration,
      82            0 :         #[arg(short, long, default_value_t = GcMode::IndicesOnly)]
      83            0 :         mode: GcMode,
      84              :     },
      85              :     FindLargeObjects {
      86              :         #[arg(long = "min-size")]
      87            0 :         min_size: u64,
      88            0 :         #[arg(short, long, default_value_t = false)]
      89            0 :         ignore_deltas: bool,
      90            0 :         #[arg(long = "concurrency", short = 'j', default_value_t = 64)]
      91            0 :         concurrency: usize,
      92              :     },
      93              : }
      94              : 
      95              : #[tokio::main]
      96            0 : async fn main() -> anyhow::Result<()> {
      97            0 :     let cli = Cli::parse();
      98            0 : 
      99            0 :     let bucket_config = BucketConfig::from_env()?;
     100            0 : 
     101            0 :     let command_log_name = match &cli.command {
     102            0 :         Command::ScanMetadata { .. } => "scan",
     103            0 :         Command::FindGarbage { .. } => "find-garbage",
     104            0 :         Command::PurgeGarbage { .. } => "purge-garbage",
     105            0 :         Command::TenantSnapshot { .. } => "tenant-snapshot",
     106            0 :         Command::PageserverPhysicalGc { .. } => "pageserver-physical-gc",
     107            0 :         Command::FindLargeObjects { .. } => "find-large-objects",
     108            0 :     };
     109            0 :     let _guard = init_logging(&format!(
     110            0 :         "{}_{}_{}_{}.log",
     111            0 :         std::env::args().next().unwrap(),
     112            0 :         command_log_name,
     113            0 :         bucket_config.bucket,
     114            0 :         chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
     115            0 :     ));
     116            0 : 
     117            0 :     match cli.command {
     118            0 :         Command::ScanMetadata {
     119            0 :             json,
     120            0 :             tenant_ids,
     121            0 :             node_kind,
     122            0 :             dump_db_connstr,
     123            0 :             dump_db_table,
     124            0 :         } => {
     125            0 :             if let NodeKind::Safekeeper = node_kind {
     126            0 :                 let dump_db_connstr =
     127            0 :                     dump_db_connstr.ok_or(anyhow::anyhow!("dump_db_connstr not specified"))?;
     128            0 :                 let dump_db_table =
     129            0 :                     dump_db_table.ok_or(anyhow::anyhow!("dump_db_table not specified"))?;
     130            0 : 
     131            0 :                 let summary = scan_safekeeper_metadata(
     132            0 :                     bucket_config.clone(),
     133            0 :                     tenant_ids.iter().map(|tshid| tshid.tenant_id).collect(),
     134            0 :                     dump_db_connstr,
     135            0 :                     dump_db_table,
     136            0 :                 )
     137            0 :                 .await?;
     138            0 :                 if json {
     139            0 :                     println!("{}", serde_json::to_string(&summary).unwrap())
     140            0 :                 } else {
     141            0 :                     println!("{}", summary.summary_string());
     142            0 :                 }
     143            0 :                 if summary.is_fatal() {
     144            0 :                     bail!("Fatal scrub errors detected");
     145            0 :                 }
     146            0 :                 if summary.is_empty() {
     147            0 :                     // Strictly speaking an empty bucket is a valid bucket, but if someone ran the
     148            0 :                     // scrubber they were likely expecting to scan something, and if we see no timelines
     149            0 :                     // at all then it's likely due to some configuration issues like a bad prefix
     150            0 :                     bail!(
     151            0 :                         "No timelines found in bucket {} prefix {}",
     152            0 :                         bucket_config.bucket,
     153            0 :                         bucket_config
     154            0 :                             .prefix_in_bucket
     155            0 :                             .unwrap_or("<none>".to_string())
     156            0 :                     );
     157            0 :                 }
     158            0 :                 Ok(())
     159            0 :             } else {
     160            0 :                 match scan_metadata(bucket_config.clone(), tenant_ids).await {
     161            0 :                     Err(e) => {
     162            0 :                         tracing::error!("Failed: {e}");
     163            0 :                         Err(e)
     164            0 :                     }
     165            0 :                     Ok(summary) => {
     166            0 :                         if json {
     167            0 :                             println!("{}", serde_json::to_string(&summary).unwrap())
     168            0 :                         } else {
     169            0 :                             println!("{}", summary.summary_string());
     170            0 :                         }
     171            0 :                         if summary.is_fatal() {
     172            0 :                             Err(anyhow::anyhow!("Fatal scrub errors detected"))
     173            0 :                         } else if summary.is_empty() {
     174            0 :                             // Strictly speaking an empty bucket is a valid bucket, but if someone ran the
     175            0 :                             // scrubber they were likely expecting to scan something, and if we see no timelines
     176            0 :                             // at all then it's likely due to some configuration issues like a bad prefix
     177            0 :                             Err(anyhow::anyhow!(
     178            0 :                                 "No timelines found in bucket {} prefix {}",
     179            0 :                                 bucket_config.bucket,
     180            0 :                                 bucket_config
     181            0 :                                     .prefix_in_bucket
     182            0 :                                     .unwrap_or("<none>".to_string())
     183            0 :                             ))
     184            0 :                         } else {
     185            0 :                             Ok(())
     186            0 :                         }
     187            0 :                     }
     188            0 :                 }
     189            0 :             }
     190            0 :         }
     191            0 :         Command::FindGarbage {
     192            0 :             node_kind,
     193            0 :             depth,
     194            0 :             output_path,
     195            0 :         } => {
     196            0 :             let console_config = ConsoleConfig::from_env()?;
     197            0 :             find_garbage(bucket_config, console_config, depth, node_kind, output_path).await
     198            0 :         }
     199            0 :         Command::PurgeGarbage { input_path, mode } => {
     200            0 :             purge_garbage(input_path, mode, !cli.delete).await
     201            0 :         }
     202            0 :         Command::TenantSnapshot {
     203            0 :             tenant_id,
     204            0 :             output_path,
     205            0 :             concurrency,
     206            0 :         } => {
     207            0 :             let downloader =
     208            0 :                 SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency).await?;
     209            0 :             downloader.download().await
     210            0 :         }
     211            0 :         Command::PageserverPhysicalGc {
     212            0 :             tenant_ids,
     213            0 :             min_age,
     214            0 :             mode,
     215            0 :         } => {
     216            0 :             let controller_client_conf = cli.controller_api.map(|controller_api| {
     217            0 :                 ControllerClientConfig {
     218            0 :                     controller_api,
     219            0 :                     // Default to no key: this is a convenience when working in a development environment
     220            0 :                     controller_jwt: cli.controller_jwt.unwrap_or("".to_owned()),
     221            0 :                 }
     222            0 :             });
     223            0 : 
     224            0 :             match (&controller_client_conf, mode) {
     225            0 :                 (Some(_), _) => {
     226            0 :                     // Any mode may run when controller API is set
     227            0 :                 }
     228            0 :                 (None, GcMode::Full) => {
     229            0 :                     // The part of physical GC where we erase ancestor layers cannot be done safely without
     230            0 :                     // confirming the most recent complete shard split with the controller.  Refuse to run, rather
     231            0 :                     // than doing it unsafely.
     232            0 :                     return Err(anyhow!("Full physical GC requires `--controller-api` and `--controller-jwt` to run"));
     233            0 :                 }
     234            0 :                 (None, GcMode::DryRun | GcMode::IndicesOnly) => {
     235            0 :                     // These GcModes do not require the controller to run.
     236            0 :                 }
     237            0 :             }
     238            0 : 
     239            0 :             let summary = pageserver_physical_gc(
     240            0 :                 bucket_config,
     241            0 :                 controller_client_conf,
     242            0 :                 tenant_ids,
     243            0 :                 min_age.into(),
     244            0 :                 mode,
     245            0 :             )
     246            0 :             .await?;
     247            0 :             println!("{}", serde_json::to_string(&summary).unwrap());
     248            0 :             Ok(())
     249            0 :         }
     250            0 :         Command::FindLargeObjects {
     251            0 :             min_size,
     252            0 :             ignore_deltas,
     253            0 :             concurrency,
     254            0 :         } => {
     255            0 :             let summary = find_large_objects::find_large_objects(
     256            0 :                 bucket_config,
     257            0 :                 min_size,
     258            0 :                 ignore_deltas,
     259            0 :                 concurrency,
     260            0 :             )
     261            0 :             .await?;
     262            0 :             println!("{}", serde_json::to_string(&summary).unwrap());
     263            0 :             Ok(())
     264            0 :         }
     265            0 :     }
     266            0 : }
        

Generated by: LCOV version 2.1-beta