Line data Source code
1 : use anyhow::{Context, anyhow, bail};
2 : use camino::Utf8PathBuf;
3 : use clap::{Parser, Subcommand};
4 : use pageserver_api::controller_api::{MetadataHealthUpdateRequest, MetadataHealthUpdateResponse};
5 : use pageserver_api::shard::TenantShardId;
6 : use reqwest::{Certificate, Method, Url};
7 : use storage_controller_client::control_api;
8 : use storage_scrubber::garbage::{PurgeMode, find_garbage, purge_garbage};
9 : use storage_scrubber::pageserver_physical_gc::{GcMode, pageserver_physical_gc};
10 : use storage_scrubber::scan_pageserver_metadata::scan_pageserver_metadata;
11 : use storage_scrubber::scan_safekeeper_metadata::{DatabaseOrList, scan_safekeeper_metadata};
12 : use storage_scrubber::tenant_snapshot::SnapshotDownloader;
13 : use storage_scrubber::{
14 : BucketConfig, ConsoleConfig, ControllerClientConfig, NodeKind, TraversingDepth,
15 : find_large_objects, init_logging,
16 : };
17 : use utils::id::TenantId;
18 : use utils::{project_build_tag, project_git_version};
19 :
20 : project_git_version!(GIT_VERSION);
21 : project_build_tag!(BUILD_TAG);
22 :
23 : #[derive(Parser)]
24 : #[command(author, version, about, long_about = None)]
25 : #[command(arg_required_else_help(true))]
26 : struct Cli {
27 : #[command(subcommand)]
28 : command: Command,
29 :
30 0 : #[arg(short, long, default_value_t = false)]
31 0 : delete: bool,
32 :
33 : #[arg(long)]
34 : /// URL to storage controller. e.g. http://127.0.0.1:1234 when using `neon_local`
35 : controller_api: Option<Url>,
36 :
37 : #[arg(long)]
38 : /// JWT token for authenticating with storage controller. Requires scope 'scrubber' or 'admin'.
39 : controller_jwt: Option<String>,
40 :
41 : /// If set to true, the scrubber will exit with error code on fatal error.
42 0 : #[arg(long, default_value_t = false)]
43 0 : exit_code: bool,
44 :
45 : /// Trusted root CA certificates to use in https APIs.
46 : #[arg(long)]
47 : ssl_ca_file: Option<Utf8PathBuf>,
48 : }
49 :
50 : #[derive(Subcommand, Debug)]
51 : enum Command {
52 : FindGarbage {
53 : #[arg(short, long)]
54 0 : node_kind: NodeKind,
55 0 : #[arg(short, long, default_value_t=TraversingDepth::Tenant)]
56 0 : depth: TraversingDepth,
57 : #[arg(short, long, default_value=None)]
58 : tenant_id_prefix: Option<String>,
59 0 : #[arg(short, long, default_value_t = String::from("garbage.json"))]
60 0 : output_path: String,
61 : },
62 : PurgeGarbage {
63 : #[arg(short, long)]
64 0 : input_path: String,
65 0 : #[arg(short, long, default_value_t = PurgeMode::DeletedOnly)]
66 0 : mode: PurgeMode,
67 : #[arg(long = "min-age")]
68 0 : min_age: humantime::Duration,
69 : },
70 : #[command(verbatim_doc_comment)]
71 : ScanMetadata {
72 : #[arg(short, long)]
73 0 : node_kind: NodeKind,
74 0 : #[arg(short, long, default_value_t = false)]
75 0 : json: bool,
76 : #[arg(long = "tenant-id", num_args = 0..)]
77 0 : tenant_ids: Vec<TenantShardId>,
78 0 : #[arg(long = "post", default_value_t = false)]
79 0 : post_to_storcon: bool,
80 : #[arg(long, default_value = None)]
81 : /// For safekeeper node_kind only, points to db with debug dump
82 : dump_db_connstr: Option<String>,
83 : /// For safekeeper node_kind only, table in the db with debug dump
84 : #[arg(long, default_value = None)]
85 : dump_db_table: Option<String>,
86 : /// For safekeeper node_kind only, json list of timelines and their lsn info
87 : #[arg(long, default_value = None)]
88 : timeline_lsns: Option<String>,
89 0 : #[arg(long, default_value_t = false)]
90 0 : verbose: bool,
91 : },
92 : TenantSnapshot {
93 : #[arg(long = "tenant-id")]
94 0 : tenant_id: TenantId,
95 0 : #[arg(long = "concurrency", short = 'j', default_value_t = 8)]
96 0 : concurrency: usize,
97 : #[arg(short, long)]
98 0 : output_path: Utf8PathBuf,
99 : },
100 : PageserverPhysicalGc {
101 : #[arg(long = "tenant-id", num_args = 0..)]
102 0 : tenant_ids: Vec<TenantShardId>,
103 : #[arg(long = "min-age")]
104 0 : min_age: humantime::Duration,
105 0 : #[arg(short, long, default_value_t = GcMode::IndicesOnly)]
106 0 : mode: GcMode,
107 : },
108 : FindLargeObjects {
109 : #[arg(long = "min-size")]
110 0 : min_size: u64,
111 0 : #[arg(short, long, default_value_t = false)]
112 0 : ignore_deltas: bool,
113 0 : #[arg(long = "concurrency", short = 'j', default_value_t = 64)]
114 0 : concurrency: usize,
115 : },
116 : CronJob {
117 : // PageserverPhysicalGc
118 : #[arg(long = "min-age")]
119 0 : gc_min_age: humantime::Duration,
120 0 : #[arg(short, long, default_value_t = GcMode::IndicesOnly)]
121 0 : gc_mode: GcMode,
122 : // ScanMetadata
123 0 : #[arg(long = "post", default_value_t = false)]
124 0 : post_to_storcon: bool,
125 : },
126 : }
127 :
128 : #[tokio::main]
129 0 : async fn main() -> anyhow::Result<()> {
130 0 : let cli = Cli::parse();
131 0 :
132 0 : let bucket_config = BucketConfig::from_env()?;
133 0 :
134 0 : let command_log_name = match &cli.command {
135 0 : Command::ScanMetadata { .. } => "scan",
136 0 : Command::FindGarbage { .. } => "find-garbage",
137 0 : Command::PurgeGarbage { .. } => "purge-garbage",
138 0 : Command::TenantSnapshot { .. } => "tenant-snapshot",
139 0 : Command::PageserverPhysicalGc { .. } => "pageserver-physical-gc",
140 0 : Command::FindLargeObjects { .. } => "find-large-objects",
141 0 : Command::CronJob { .. } => "cron-job",
142 0 : };
143 0 : let _guard = init_logging(&format!(
144 0 : "{}_{}_{}_{}.log",
145 0 : std::env::args().next().unwrap(),
146 0 : command_log_name,
147 0 : bucket_config.bucket_name().unwrap_or("nobucket"),
148 0 : chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
149 0 : ));
150 0 :
151 0 : tracing::info!("version: {}, build_tag {}", GIT_VERSION, BUILD_TAG);
152 0 :
153 0 : let ssl_ca_certs = match cli.ssl_ca_file.as_ref() {
154 0 : Some(ssl_ca_file) => {
155 0 : tracing::info!("Using ssl root CA file: {ssl_ca_file:?}");
156 0 : let buf = tokio::fs::read(ssl_ca_file).await?;
157 0 : Certificate::from_pem_bundle(&buf)?
158 0 : }
159 0 : None => Vec::new(),
160 0 : };
161 0 :
162 0 : let mut http_client = reqwest::Client::builder();
163 0 : for cert in ssl_ca_certs {
164 0 : http_client = http_client.add_root_certificate(cert);
165 0 : }
166 0 : let http_client = http_client.build()?;
167 0 :
168 0 : let controller_client = cli.controller_api.map(|controller_api| {
169 0 : ControllerClientConfig {
170 0 : controller_api,
171 0 : // Default to no key: this is a convenience when working in a development environment
172 0 : controller_jwt: cli.controller_jwt.unwrap_or("".to_owned()),
173 0 : }
174 0 : .build_client(http_client)
175 0 : });
176 0 :
177 0 : match cli.command {
178 0 : Command::ScanMetadata {
179 0 : json,
180 0 : tenant_ids,
181 0 : node_kind,
182 0 : post_to_storcon,
183 0 : dump_db_connstr,
184 0 : dump_db_table,
185 0 : timeline_lsns,
186 0 : verbose,
187 0 : } => {
188 0 : if let NodeKind::Safekeeper = node_kind {
189 0 : let db_or_list = match (timeline_lsns, dump_db_connstr) {
190 0 : (Some(timeline_lsns), _) => {
191 0 : let timeline_lsns = serde_json::from_str(&timeline_lsns)
192 0 : .context("parsing timeline_lsns")?;
193 0 : DatabaseOrList::List(timeline_lsns)
194 0 : }
195 0 : (None, Some(dump_db_connstr)) => {
196 0 : let dump_db_table = dump_db_table
197 0 : .ok_or_else(|| anyhow::anyhow!("dump_db_table not specified"))?;
198 0 : let tenant_ids = tenant_ids.iter().map(|tshid| tshid.tenant_id).collect();
199 0 : DatabaseOrList::Database {
200 0 : tenant_ids,
201 0 : connstr: dump_db_connstr,
202 0 : table: dump_db_table,
203 0 : }
204 0 : }
205 0 : (None, None) => anyhow::bail!(
206 0 : "neither `timeline_lsns` specified, nor `dump_db_connstr` and `dump_db_table`"
207 0 : ),
208 0 : };
209 0 : let summary = scan_safekeeper_metadata(bucket_config.clone(), db_or_list).await?;
210 0 : if json {
211 0 : println!("{}", serde_json::to_string(&summary).unwrap())
212 0 : } else {
213 0 : println!("{}", summary.summary_string());
214 0 : }
215 0 : if summary.is_fatal() {
216 0 : bail!("Fatal scrub errors detected");
217 0 : }
218 0 : if summary.is_empty() {
219 0 : // Strictly speaking an empty bucket is a valid bucket, but if someone ran the
220 0 : // scrubber they were likely expecting to scan something, and if we see no timelines
221 0 : // at all then it's likely due to some configuration issues like a bad prefix
222 0 : bail!("No timelines found in {}", bucket_config.desc_str());
223 0 : }
224 0 : Ok(())
225 0 : } else {
226 0 : scan_pageserver_metadata_cmd(
227 0 : bucket_config,
228 0 : controller_client.as_ref(),
229 0 : tenant_ids,
230 0 : json,
231 0 : post_to_storcon,
232 0 : verbose,
233 0 : cli.exit_code,
234 0 : )
235 0 : .await
236 0 : }
237 0 : }
238 0 : Command::FindGarbage {
239 0 : node_kind,
240 0 : depth,
241 0 : tenant_id_prefix,
242 0 : output_path,
243 0 : } => {
244 0 : let console_config = ConsoleConfig::from_env()?;
245 0 : find_garbage(
246 0 : bucket_config,
247 0 : console_config,
248 0 : depth,
249 0 : node_kind,
250 0 : tenant_id_prefix,
251 0 : output_path,
252 0 : )
253 0 : .await
254 0 : }
255 0 : Command::PurgeGarbage {
256 0 : input_path,
257 0 : mode,
258 0 : min_age,
259 0 : } => purge_garbage(input_path, mode, min_age.into(), !cli.delete).await,
260 0 : Command::TenantSnapshot {
261 0 : tenant_id,
262 0 : output_path,
263 0 : concurrency,
264 0 : } => {
265 0 : let downloader =
266 0 : SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency).await?;
267 0 : downloader.download().await
268 0 : }
269 0 : Command::PageserverPhysicalGc {
270 0 : tenant_ids,
271 0 : min_age,
272 0 : mode,
273 0 : } => {
274 0 : pageserver_physical_gc_cmd(
275 0 : &bucket_config,
276 0 : controller_client.as_ref(),
277 0 : tenant_ids,
278 0 : min_age,
279 0 : mode,
280 0 : )
281 0 : .await
282 0 : }
283 0 : Command::FindLargeObjects {
284 0 : min_size,
285 0 : ignore_deltas,
286 0 : concurrency,
287 0 : } => {
288 0 : let summary = find_large_objects::find_large_objects(
289 0 : bucket_config,
290 0 : min_size,
291 0 : ignore_deltas,
292 0 : concurrency,
293 0 : )
294 0 : .await?;
295 0 : println!("{}", serde_json::to_string(&summary).unwrap());
296 0 : Ok(())
297 0 : }
298 0 : Command::CronJob {
299 0 : gc_min_age,
300 0 : gc_mode,
301 0 : post_to_storcon,
302 0 : } => {
303 0 : run_cron_job(
304 0 : bucket_config,
305 0 : controller_client.as_ref(),
306 0 : gc_min_age,
307 0 : gc_mode,
308 0 : post_to_storcon,
309 0 : cli.exit_code,
310 0 : )
311 0 : .await
312 0 : }
313 0 : }
314 0 : }
315 :
316 : /// Runs the scrubber cron job.
317 : /// 1. Do pageserver physical gc
318 : /// 2. Scan pageserver metadata
319 0 : pub async fn run_cron_job(
320 0 : bucket_config: BucketConfig,
321 0 : controller_client: Option<&control_api::Client>,
322 0 : gc_min_age: humantime::Duration,
323 0 : gc_mode: GcMode,
324 0 : post_to_storcon: bool,
325 0 : exit_code: bool,
326 0 : ) -> anyhow::Result<()> {
327 0 : tracing::info!(%gc_min_age, %gc_mode, "Running pageserver-physical-gc");
328 0 : pageserver_physical_gc_cmd(
329 0 : &bucket_config,
330 0 : controller_client,
331 0 : Vec::new(),
332 0 : gc_min_age,
333 0 : gc_mode,
334 0 : )
335 0 : .await?;
336 0 : tracing::info!(%post_to_storcon, node_kind = %NodeKind::Pageserver, "Running scan-metadata");
337 0 : scan_pageserver_metadata_cmd(
338 0 : bucket_config,
339 0 : controller_client,
340 0 : Vec::new(),
341 0 : true,
342 0 : post_to_storcon,
343 0 : false, // default to non-verbose mode
344 0 : exit_code,
345 0 : )
346 0 : .await?;
347 :
348 0 : Ok(())
349 0 : }
350 :
351 0 : pub async fn pageserver_physical_gc_cmd(
352 0 : bucket_config: &BucketConfig,
353 0 : controller_client: Option<&control_api::Client>,
354 0 : tenant_shard_ids: Vec<TenantShardId>,
355 0 : min_age: humantime::Duration,
356 0 : mode: GcMode,
357 0 : ) -> anyhow::Result<()> {
358 0 : match (controller_client, mode) {
359 0 : (Some(_), _) => {
360 0 : // Any mode may run when controller API is set
361 0 : }
362 : (None, GcMode::Full) => {
363 : // The part of physical GC where we erase ancestor layers cannot be done safely without
364 : // confirming the most recent complete shard split with the controller. Refuse to run, rather
365 : // than doing it unsafely.
366 0 : return Err(anyhow!(
367 0 : "Full physical GC requires `--controller-api` and `--controller-jwt` to run"
368 0 : ));
369 : }
370 0 : (None, GcMode::DryRun | GcMode::IndicesOnly) => {
371 0 : // These GcModes do not require the controller to run.
372 0 : }
373 : }
374 :
375 0 : let summary = pageserver_physical_gc(
376 0 : bucket_config,
377 0 : controller_client,
378 0 : tenant_shard_ids,
379 0 : min_age.into(),
380 0 : mode,
381 0 : )
382 0 : .await?;
383 0 : println!("{}", serde_json::to_string(&summary).unwrap());
384 0 : Ok(())
385 0 : }
386 :
387 0 : pub async fn scan_pageserver_metadata_cmd(
388 0 : bucket_config: BucketConfig,
389 0 : controller_client: Option<&control_api::Client>,
390 0 : tenant_shard_ids: Vec<TenantShardId>,
391 0 : json: bool,
392 0 : post_to_storcon: bool,
393 0 : verbose: bool,
394 0 : exit_code: bool,
395 0 : ) -> anyhow::Result<()> {
396 0 : if controller_client.is_none() && post_to_storcon {
397 0 : return Err(anyhow!(
398 0 : "Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"
399 0 : ));
400 0 : }
401 0 : match scan_pageserver_metadata(bucket_config.clone(), tenant_shard_ids, verbose).await {
402 0 : Err(e) => {
403 0 : tracing::error!("Failed: {e}");
404 0 : Err(e)
405 : }
406 0 : Ok(summary) => {
407 0 : if json {
408 0 : println!("{}", serde_json::to_string(&summary).unwrap())
409 0 : } else {
410 0 : println!("{}", summary.summary_string());
411 0 : }
412 :
413 0 : if post_to_storcon {
414 0 : if let Some(client) = controller_client {
415 0 : let body = summary.build_health_update_request();
416 0 : client
417 0 : .dispatch::<MetadataHealthUpdateRequest, MetadataHealthUpdateResponse>(
418 0 : Method::POST,
419 0 : "control/v1/metadata_health/update".to_string(),
420 0 : Some(body),
421 0 : )
422 0 : .await?;
423 0 : }
424 0 : }
425 :
426 0 : if summary.is_fatal() {
427 0 : tracing::error!("Fatal scrub errors detected");
428 0 : if exit_code {
429 0 : std::process::exit(1);
430 0 : }
431 0 : } else if summary.is_empty() {
432 : // Strictly speaking an empty bucket is a valid bucket, but if someone ran the
433 : // scrubber they were likely expecting to scan something, and if we see no timelines
434 : // at all then it's likely due to some configuration issues like a bad prefix
435 0 : tracing::error!("No timelines found in {}", bucket_config.desc_str());
436 0 : if exit_code {
437 0 : std::process::exit(1);
438 0 : }
439 0 : }
440 :
441 0 : Ok(())
442 : }
443 : }
444 0 : }
|