|             Line data    Source code 
       1              : use anyhow::{Context, anyhow, bail};
       2              : use camino::Utf8PathBuf;
       3              : use clap::{Parser, Subcommand};
       4              : use pageserver_api::controller_api::{MetadataHealthUpdateRequest, MetadataHealthUpdateResponse};
       5              : use pageserver_api::shard::TenantShardId;
       6              : use reqwest::{Certificate, Method, Url};
       7              : use storage_controller_client::control_api;
       8              : use storage_scrubber::garbage::{PurgeMode, find_garbage, purge_garbage};
       9              : use storage_scrubber::pageserver_physical_gc::{GcMode, pageserver_physical_gc};
      10              : use storage_scrubber::scan_pageserver_metadata::scan_pageserver_metadata;
      11              : use storage_scrubber::scan_safekeeper_metadata::{DatabaseOrList, scan_safekeeper_metadata};
      12              : use storage_scrubber::tenant_snapshot::SnapshotDownloader;
      13              : use storage_scrubber::{
      14              :     BucketConfig, ConsoleConfig, ControllerClientConfig, NodeKind, TraversingDepth,
      15              :     find_large_objects, init_logging,
      16              : };
      17              : use utils::id::TenantId;
      18              : use utils::{project_build_tag, project_git_version};
      19              : 
      20              : project_git_version!(GIT_VERSION);
      21              : project_build_tag!(BUILD_TAG);
      22              : 
      23              : #[derive(Parser)]
      24              : #[command(author, version, about, long_about = None)]
      25              : #[command(arg_required_else_help(true))]
      26              : struct Cli {
      27              :     #[command(subcommand)]
      28              :     command: Command,
      29              : 
      30              :     #[arg(short, long, default_value_t = false)]
      31              :     delete: bool,
      32              : 
      33              :     #[arg(long)]
      34              :     /// URL to storage controller.  e.g. http://127.0.0.1:1234 when using `neon_local`
      35              :     controller_api: Option<Url>,
      36              : 
      37              :     #[arg(long)]
      38              :     /// JWT token for authenticating with storage controller.  Requires scope 'scrubber' or 'admin'.
      39              :     controller_jwt: Option<String>,
      40              : 
      41              :     /// If set to true, the scrubber will exit with error code on fatal error.
      42              :     #[arg(long, default_value_t = false)]
      43              :     exit_code: bool,
      44              : 
      45              :     /// Trusted root CA certificates to use in https APIs.
      46              :     #[arg(long)]
      47              :     ssl_ca_file: Option<Utf8PathBuf>,
      48              : }
      49              : 
      50              : #[derive(Subcommand, Debug)]
      51              : enum Command {
      52              :     FindGarbage {
      53              :         #[arg(short, long)]
      54              :         node_kind: NodeKind,
      55              :         #[arg(short, long, default_value_t=TraversingDepth::Tenant)]
      56              :         depth: TraversingDepth,
      57              :         #[arg(short, long, default_value=None)]
      58              :         tenant_id_prefix: Option<String>,
      59              :         #[arg(short, long, default_value_t = String::from("garbage.json"))]
      60              :         output_path: String,
      61              :     },
      62              :     PurgeGarbage {
      63              :         #[arg(short, long)]
      64              :         input_path: String,
      65              :         #[arg(short, long, default_value_t = PurgeMode::DeletedOnly)]
      66              :         mode: PurgeMode,
      67              :         #[arg(long = "min-age")]
      68              :         min_age: humantime::Duration,
      69              :     },
      70              :     #[command(verbatim_doc_comment)]
      71              :     ScanMetadata {
      72              :         #[arg(short, long)]
      73              :         node_kind: NodeKind,
      74              :         #[arg(short, long, default_value_t = false)]
      75              :         json: bool,
      76              :         #[arg(long = "tenant-id", num_args = 0..)]
      77              :         tenant_ids: Vec<TenantShardId>,
      78              :         #[arg(long = "post", default_value_t = false)]
      79              :         post_to_storcon: bool,
      80              :         #[arg(long, default_value = None)]
      81              :         /// For safekeeper node_kind only, points to db with debug dump
      82              :         dump_db_connstr: Option<String>,
      83              :         /// For safekeeper node_kind only, table in the db with debug dump
      84              :         #[arg(long, default_value = None)]
      85              :         dump_db_table: Option<String>,
      86              :         /// For safekeeper node_kind only, json list of timelines and their lsn info
      87              :         #[arg(long, default_value = None)]
      88              :         timeline_lsns: Option<String>,
      89              :         #[arg(long, default_value_t = false)]
      90              :         verbose: bool,
      91              :     },
      92              :     TenantSnapshot {
      93              :         #[arg(long = "tenant-id")]
      94              :         tenant_id: TenantId,
      95              :         #[arg(long = "concurrency", short = 'j', default_value_t = 8)]
      96              :         concurrency: usize,
      97              :         #[arg(short, long)]
      98              :         output_path: Utf8PathBuf,
      99              :     },
     100              :     PageserverPhysicalGc {
     101              :         #[arg(long = "tenant-id", num_args = 0..)]
     102              :         tenant_ids: Vec<TenantShardId>,
     103              :         #[arg(long = "min-age")]
     104              :         min_age: humantime::Duration,
     105              :         #[arg(short, long, default_value_t = GcMode::IndicesOnly)]
     106              :         mode: GcMode,
     107              :     },
     108              :     FindLargeObjects {
     109              :         #[arg(long = "min-size")]
     110              :         min_size: u64,
     111              :         #[arg(short, long, default_value_t = false)]
     112              :         ignore_deltas: bool,
     113              :         #[arg(long = "concurrency", short = 'j', default_value_t = 64)]
     114              :         concurrency: usize,
     115              :     },
     116              :     CronJob {
     117              :         // PageserverPhysicalGc
     118              :         #[arg(long = "min-age")]
     119              :         gc_min_age: humantime::Duration,
     120              :         #[arg(short, long, default_value_t = GcMode::IndicesOnly)]
     121              :         gc_mode: GcMode,
     122              :         // ScanMetadata
     123              :         #[arg(long = "post", default_value_t = false)]
     124              :         post_to_storcon: bool,
     125              :     },
     126              : }
     127              : 
     128              : #[tokio::main]
     129            0 : async fn main() -> anyhow::Result<()> {
     130            0 :     let cli = Cli::parse();
     131              : 
     132            0 :     let bucket_config = BucketConfig::from_env()?;
     133              : 
     134            0 :     let command_log_name = match &cli.command {
     135            0 :         Command::ScanMetadata { .. } => "scan",
     136            0 :         Command::FindGarbage { .. } => "find-garbage",
     137            0 :         Command::PurgeGarbage { .. } => "purge-garbage",
     138            0 :         Command::TenantSnapshot { .. } => "tenant-snapshot",
     139            0 :         Command::PageserverPhysicalGc { .. } => "pageserver-physical-gc",
     140            0 :         Command::FindLargeObjects { .. } => "find-large-objects",
     141            0 :         Command::CronJob { .. } => "cron-job",
     142              :     };
     143            0 :     let _guard = init_logging(&format!(
     144            0 :         "{}_{}_{}_{}.log",
     145            0 :         std::env::args().next().unwrap(),
     146            0 :         command_log_name,
     147            0 :         bucket_config.bucket_name().unwrap_or("nobucket"),
     148            0 :         chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
     149            0 :     ));
     150              : 
     151            0 :     tracing::info!("version: {}, build_tag {}", GIT_VERSION, BUILD_TAG);
     152              : 
     153            0 :     let ssl_ca_certs = match cli.ssl_ca_file.as_ref() {
     154            0 :         Some(ssl_ca_file) => {
     155            0 :             tracing::info!("Using ssl root CA file: {ssl_ca_file:?}");
     156            0 :             let buf = tokio::fs::read(ssl_ca_file).await?;
     157            0 :             Certificate::from_pem_bundle(&buf)?
     158              :         }
     159            0 :         None => Vec::new(),
     160              :     };
     161              : 
     162            0 :     let mut http_client = reqwest::Client::builder();
     163            0 :     for cert in ssl_ca_certs {
     164            0 :         http_client = http_client.add_root_certificate(cert);
     165            0 :     }
     166            0 :     let http_client = http_client.build()?;
     167              : 
     168            0 :     let controller_client = cli.controller_api.map(|controller_api| {
     169            0 :         ControllerClientConfig {
     170            0 :             controller_api,
     171            0 :             // Default to no key: this is a convenience when working in a development environment
     172            0 :             controller_jwt: cli.controller_jwt.unwrap_or("".to_owned()),
     173            0 :         }
     174            0 :         .build_client(http_client)
     175            0 :     });
     176              : 
     177            0 :     match cli.command {
     178            0 :         Command::ScanMetadata {
     179            0 :             json,
     180            0 :             tenant_ids,
     181            0 :             node_kind,
     182            0 :             post_to_storcon,
     183            0 :             dump_db_connstr,
     184            0 :             dump_db_table,
     185            0 :             timeline_lsns,
     186            0 :             verbose,
     187            0 :         } => {
     188            0 :             if let NodeKind::Safekeeper = node_kind {
     189            0 :                 let db_or_list = match (timeline_lsns, dump_db_connstr) {
     190            0 :                     (Some(timeline_lsns), _) => {
     191            0 :                         let timeline_lsns = serde_json::from_str(&timeline_lsns)
     192            0 :                             .context("parsing timeline_lsns")?;
     193            0 :                         DatabaseOrList::List(timeline_lsns)
     194            0 :                     }
     195            0 :                     (None, Some(dump_db_connstr)) => {
     196            0 :                         let dump_db_table = dump_db_table
     197            0 :                             .ok_or_else(|| anyhow::anyhow!("dump_db_table not specified"))?;
     198            0 :                         let tenant_ids = tenant_ids.iter().map(|tshid| tshid.tenant_id).collect();
     199            0 :                         DatabaseOrList::Database {
     200            0 :                             tenant_ids,
     201            0 :                             connstr: dump_db_connstr,
     202            0 :                             table: dump_db_table,
     203            0 :                         }
     204            0 :                     }
     205            0 :                     (None, None) => anyhow::bail!(
     206            0 :                         "neither `timeline_lsns` specified, nor `dump_db_connstr` and `dump_db_table`"
     207            0 :                     ),
     208            0 :                 };
     209            0 :                 let summary = scan_safekeeper_metadata(bucket_config.clone(), db_or_list).await?;
     210            0 :                 if json {
     211            0 :                     println!("{}", serde_json::to_string(&summary).unwrap())
     212            0 :                 } else {
     213            0 :                     println!("{}", summary.summary_string());
     214            0 :                 }
     215            0 :                 if summary.is_fatal() {
     216            0 :                     bail!("Fatal scrub errors detected");
     217            0 :                 }
     218            0 :                 if summary.is_empty() {
     219            0 :                     // Strictly speaking an empty bucket is a valid bucket, but if someone ran the
     220            0 :                     // scrubber they were likely expecting to scan something, and if we see no timelines
     221            0 :                     // at all then it's likely due to some configuration issues like a bad prefix
     222            0 :                     bail!("No timelines found in {}", bucket_config.desc_str());
     223            0 :                 }
     224            0 :                 Ok(())
     225            0 :             } else {
     226            0 :                 scan_pageserver_metadata_cmd(
     227            0 :                     bucket_config,
     228            0 :                     controller_client.as_ref(),
     229            0 :                     tenant_ids,
     230            0 :                     json,
     231            0 :                     post_to_storcon,
     232            0 :                     verbose,
     233            0 :                     cli.exit_code,
     234            0 :                 )
     235            0 :                 .await
     236            0 :             }
     237            0 :         }
     238            0 :         Command::FindGarbage {
     239            0 :             node_kind,
     240            0 :             depth,
     241            0 :             tenant_id_prefix,
     242            0 :             output_path,
     243            0 :         } => {
     244            0 :             let console_config = ConsoleConfig::from_env()?;
     245            0 :             find_garbage(
     246            0 :                 bucket_config,
     247            0 :                 console_config,
     248            0 :                 depth,
     249            0 :                 node_kind,
     250            0 :                 tenant_id_prefix,
     251            0 :                 output_path,
     252            0 :             )
     253            0 :             .await
     254            0 :         }
     255            0 :         Command::PurgeGarbage {
     256            0 :             input_path,
     257            0 :             mode,
     258            0 :             min_age,
     259            0 :         } => purge_garbage(input_path, mode, min_age.into(), !cli.delete).await,
     260            0 :         Command::TenantSnapshot {
     261            0 :             tenant_id,
     262            0 :             output_path,
     263            0 :             concurrency,
     264            0 :         } => {
     265            0 :             let downloader =
     266            0 :                 SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency).await?;
     267            0 :             downloader.download().await
     268            0 :         }
     269            0 :         Command::PageserverPhysicalGc {
     270            0 :             tenant_ids,
     271            0 :             min_age,
     272            0 :             mode,
     273            0 :         } => {
     274            0 :             pageserver_physical_gc_cmd(
     275            0 :                 &bucket_config,
     276            0 :                 controller_client.as_ref(),
     277            0 :                 tenant_ids,
     278            0 :                 min_age,
     279            0 :                 mode,
     280            0 :             )
     281            0 :             .await
     282            0 :         }
     283            0 :         Command::FindLargeObjects {
     284            0 :             min_size,
     285            0 :             ignore_deltas,
     286            0 :             concurrency,
     287            0 :         } => {
     288            0 :             let summary = find_large_objects::find_large_objects(
     289            0 :                 bucket_config,
     290            0 :                 min_size,
     291            0 :                 ignore_deltas,
     292            0 :                 concurrency,
     293            0 :             )
     294            0 :             .await?;
     295            0 :             println!("{}", serde_json::to_string(&summary).unwrap());
     296            0 :             Ok(())
     297            0 :         }
     298            0 :         Command::CronJob {
     299            0 :             gc_min_age,
     300            0 :             gc_mode,
     301            0 :             post_to_storcon,
     302            0 :         } => {
     303            0 :             run_cron_job(
     304            0 :                 bucket_config,
     305            0 :                 controller_client.as_ref(),
     306            0 :                 gc_min_age,
     307            0 :                 gc_mode,
     308            0 :                 post_to_storcon,
     309            0 :                 cli.exit_code,
     310            0 :             )
     311            0 :             .await
     312            0 :         }
     313            0 :     }
     314            0 : }
     315              : 
     316              : /// Runs the scrubber cron job.
     317              : /// 1. Do pageserver physical gc
     318              : /// 2. Scan pageserver metadata
     319            0 : pub async fn run_cron_job(
     320            0 :     bucket_config: BucketConfig,
     321            0 :     controller_client: Option<&control_api::Client>,
     322            0 :     gc_min_age: humantime::Duration,
     323            0 :     gc_mode: GcMode,
     324            0 :     post_to_storcon: bool,
     325            0 :     exit_code: bool,
     326            0 : ) -> anyhow::Result<()> {
     327            0 :     tracing::info!(%gc_min_age, %gc_mode, "Running pageserver-physical-gc");
     328            0 :     pageserver_physical_gc_cmd(
     329            0 :         &bucket_config,
     330            0 :         controller_client,
     331            0 :         Vec::new(),
     332            0 :         gc_min_age,
     333            0 :         gc_mode,
     334            0 :     )
     335            0 :     .await?;
     336            0 :     tracing::info!(%post_to_storcon, node_kind = %NodeKind::Pageserver, "Running scan-metadata");
     337            0 :     scan_pageserver_metadata_cmd(
     338            0 :         bucket_config,
     339            0 :         controller_client,
     340            0 :         Vec::new(),
     341            0 :         true,
     342            0 :         post_to_storcon,
     343            0 :         false, // default to non-verbose mode
     344            0 :         exit_code,
     345            0 :     )
     346            0 :     .await?;
     347              : 
     348            0 :     Ok(())
     349            0 : }
     350              : 
     351            0 : pub async fn pageserver_physical_gc_cmd(
     352            0 :     bucket_config: &BucketConfig,
     353            0 :     controller_client: Option<&control_api::Client>,
     354            0 :     tenant_shard_ids: Vec<TenantShardId>,
     355            0 :     min_age: humantime::Duration,
     356            0 :     mode: GcMode,
     357            0 : ) -> anyhow::Result<()> {
     358            0 :     match (controller_client, mode) {
     359            0 :         (Some(_), _) => {
     360            0 :             // Any mode may run when controller API is set
     361            0 :         }
     362              :         (None, GcMode::Full) => {
     363              :             // The part of physical GC where we erase ancestor layers cannot be done safely without
     364              :             // confirming the most recent complete shard split with the controller.  Refuse to run, rather
     365              :             // than doing it unsafely.
     366            0 :             return Err(anyhow!(
     367            0 :                 "Full physical GC requires `--controller-api` and `--controller-jwt` to run"
     368            0 :             ));
     369              :         }
     370            0 :         (None, GcMode::DryRun | GcMode::IndicesOnly) => {
     371            0 :             // These GcModes do not require the controller to run.
     372            0 :         }
     373              :     }
     374              : 
     375            0 :     let summary = pageserver_physical_gc(
     376            0 :         bucket_config,
     377            0 :         controller_client,
     378            0 :         tenant_shard_ids,
     379            0 :         min_age.into(),
     380            0 :         mode,
     381            0 :     )
     382            0 :     .await?;
     383            0 :     println!("{}", serde_json::to_string(&summary).unwrap());
     384            0 :     Ok(())
     385            0 : }
     386              : 
     387            0 : pub async fn scan_pageserver_metadata_cmd(
     388            0 :     bucket_config: BucketConfig,
     389            0 :     controller_client: Option<&control_api::Client>,
     390            0 :     tenant_shard_ids: Vec<TenantShardId>,
     391            0 :     json: bool,
     392            0 :     post_to_storcon: bool,
     393            0 :     verbose: bool,
     394            0 :     exit_code: bool,
     395            0 : ) -> anyhow::Result<()> {
     396            0 :     if controller_client.is_none() && post_to_storcon {
     397            0 :         return Err(anyhow!(
     398            0 :             "Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"
     399            0 :         ));
     400            0 :     }
     401            0 :     match scan_pageserver_metadata(bucket_config.clone(), tenant_shard_ids, verbose).await {
     402            0 :         Err(e) => {
     403            0 :             tracing::error!("Failed: {e}");
     404            0 :             Err(e)
     405              :         }
     406            0 :         Ok(summary) => {
     407            0 :             if json {
     408            0 :                 println!("{}", serde_json::to_string(&summary).unwrap())
     409            0 :             } else {
     410            0 :                 println!("{}", summary.summary_string());
     411            0 :             }
     412              : 
     413            0 :             if post_to_storcon {
     414            0 :                 if let Some(client) = controller_client {
     415            0 :                     let body = summary.build_health_update_request();
     416            0 :                     client
     417            0 :                         .dispatch::<MetadataHealthUpdateRequest, MetadataHealthUpdateResponse>(
     418            0 :                             Method::POST,
     419            0 :                             "control/v1/metadata_health/update".to_string(),
     420            0 :                             Some(body),
     421            0 :                         )
     422            0 :                         .await?;
     423            0 :                 }
     424            0 :             }
     425              : 
     426            0 :             if summary.is_fatal() {
     427            0 :                 tracing::error!("Fatal scrub errors detected");
     428            0 :                 if exit_code {
     429            0 :                     std::process::exit(1);
     430            0 :                 }
     431            0 :             } else if summary.is_empty() {
     432              :                 // Strictly speaking an empty bucket is a valid bucket, but if someone ran the
     433              :                 // scrubber they were likely expecting to scan something, and if we see no timelines
     434              :                 // at all then it's likely due to some configuration issues like a bad prefix
     435            0 :                 tracing::error!("No timelines found in {}", bucket_config.desc_str());
     436            0 :                 if exit_code {
     437            0 :                     std::process::exit(1);
     438            0 :                 }
     439            0 :             }
     440              : 
     441            0 :             Ok(())
     442              :         }
     443              :     }
     444            0 : }
         |