Line data Source code
1 : use anyhow::{anyhow, bail};
2 : use camino::Utf8PathBuf;
3 : use pageserver_api::controller_api::{MetadataHealthUpdateRequest, MetadataHealthUpdateResponse};
4 : use pageserver_api::shard::TenantShardId;
5 : use reqwest::{Method, Url};
6 : use storage_controller_client::control_api;
7 : use storage_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
8 : use storage_scrubber::pageserver_physical_gc::GcMode;
9 : use storage_scrubber::scan_pageserver_metadata::scan_pageserver_metadata;
10 : use storage_scrubber::tenant_snapshot::SnapshotDownloader;
11 : use storage_scrubber::{find_large_objects, ControllerClientConfig};
12 : use storage_scrubber::{
13 : init_logging, pageserver_physical_gc::pageserver_physical_gc,
14 : scan_safekeeper_metadata::scan_safekeeper_metadata, BucketConfig, ConsoleConfig, NodeKind,
15 : TraversingDepth,
16 : };
17 :
18 : use clap::{Parser, Subcommand};
19 : use utils::id::TenantId;
20 :
21 : use utils::{project_build_tag, project_git_version};
22 :
23 : project_git_version!(GIT_VERSION);
24 : project_build_tag!(BUILD_TAG);
25 :
26 0 : #[derive(Parser)]
27 : #[command(author, version, about, long_about = None)]
28 : #[command(arg_required_else_help(true))]
29 : struct Cli {
30 : #[command(subcommand)]
31 : command: Command,
32 :
33 0 : #[arg(short, long, default_value_t = false)]
34 0 : delete: bool,
35 :
36 : #[arg(long)]
37 : /// URL to storage controller. e.g. http://127.0.0.1:1234 when using `neon_local`
38 : controller_api: Option<Url>,
39 :
40 : #[arg(long)]
41 : /// JWT token for authenticating with storage controller. Requires scope 'scrubber' or 'admin'.
42 : controller_jwt: Option<String>,
43 : }
44 :
45 0 : #[derive(Subcommand, Debug)]
46 : enum Command {
47 : FindGarbage {
48 : #[arg(short, long)]
49 0 : node_kind: NodeKind,
50 0 : #[arg(short, long, default_value_t=TraversingDepth::Tenant)]
51 0 : depth: TraversingDepth,
52 0 : #[arg(short, long, default_value_t = String::from("garbage.json"))]
53 0 : output_path: String,
54 : },
55 : PurgeGarbage {
56 : #[arg(short, long)]
57 0 : input_path: String,
58 0 : #[arg(short, long, default_value_t = PurgeMode::DeletedOnly)]
59 0 : mode: PurgeMode,
60 : #[arg(long = "min-age")]
61 0 : min_age: humantime::Duration,
62 : },
63 : #[command(verbatim_doc_comment)]
64 : ScanMetadata {
65 : #[arg(short, long)]
66 0 : node_kind: NodeKind,
67 0 : #[arg(short, long, default_value_t = false)]
68 0 : json: bool,
69 : #[arg(long = "tenant-id", num_args = 0..)]
70 0 : tenant_ids: Vec<TenantShardId>,
71 0 : #[arg(long = "post", default_value_t = false)]
72 0 : post_to_storcon: bool,
73 : #[arg(long, default_value = None)]
74 : /// For safekeeper node_kind only, points to db with debug dump
75 : dump_db_connstr: Option<String>,
76 : /// For safekeeper node_kind only, table in the db with debug dump
77 : #[arg(long, default_value = None)]
78 : dump_db_table: Option<String>,
79 : },
80 : TenantSnapshot {
81 : #[arg(long = "tenant-id")]
82 0 : tenant_id: TenantId,
83 0 : #[arg(long = "concurrency", short = 'j', default_value_t = 8)]
84 0 : concurrency: usize,
85 : #[arg(short, long)]
86 0 : output_path: Utf8PathBuf,
87 : },
88 : PageserverPhysicalGc {
89 : #[arg(long = "tenant-id", num_args = 0..)]
90 0 : tenant_ids: Vec<TenantShardId>,
91 : #[arg(long = "min-age")]
92 0 : min_age: humantime::Duration,
93 0 : #[arg(short, long, default_value_t = GcMode::IndicesOnly)]
94 0 : mode: GcMode,
95 : },
96 : FindLargeObjects {
97 : #[arg(long = "min-size")]
98 0 : min_size: u64,
99 0 : #[arg(short, long, default_value_t = false)]
100 0 : ignore_deltas: bool,
101 0 : #[arg(long = "concurrency", short = 'j', default_value_t = 64)]
102 0 : concurrency: usize,
103 : },
104 : CronJob {
105 : // PageserverPhysicalGc
106 : #[arg(long = "min-age")]
107 0 : gc_min_age: humantime::Duration,
108 0 : #[arg(short, long, default_value_t = GcMode::IndicesOnly)]
109 0 : gc_mode: GcMode,
110 : // ScanMetadata
111 0 : #[arg(long = "post", default_value_t = false)]
112 0 : post_to_storcon: bool,
113 : },
114 : }
115 :
116 : #[tokio::main]
117 0 : async fn main() -> anyhow::Result<()> {
118 0 : let cli = Cli::parse();
119 0 :
120 0 : tracing::info!("version: {}, build_tag {}", GIT_VERSION, BUILD_TAG);
121 0 :
122 0 : let bucket_config = BucketConfig::from_env()?;
123 0 :
124 0 : let command_log_name = match &cli.command {
125 0 : Command::ScanMetadata { .. } => "scan",
126 0 : Command::FindGarbage { .. } => "find-garbage",
127 0 : Command::PurgeGarbage { .. } => "purge-garbage",
128 0 : Command::TenantSnapshot { .. } => "tenant-snapshot",
129 0 : Command::PageserverPhysicalGc { .. } => "pageserver-physical-gc",
130 0 : Command::FindLargeObjects { .. } => "find-large-objects",
131 0 : Command::CronJob { .. } => "cron-job",
132 0 : };
133 0 : let _guard = init_logging(&format!(
134 0 : "{}_{}_{}_{}.log",
135 0 : std::env::args().next().unwrap(),
136 0 : command_log_name,
137 0 : bucket_config.bucket,
138 0 : chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
139 0 : ));
140 0 :
141 0 : let controller_client = cli.controller_api.map(|controller_api| {
142 0 : ControllerClientConfig {
143 0 : controller_api,
144 0 : // Default to no key: this is a convenience when working in a development environment
145 0 : controller_jwt: cli.controller_jwt.unwrap_or("".to_owned()),
146 0 : }
147 0 : .build_client()
148 0 : });
149 0 :
150 0 : match cli.command {
151 0 : Command::ScanMetadata {
152 0 : json,
153 0 : tenant_ids,
154 0 : node_kind,
155 0 : post_to_storcon,
156 0 : dump_db_connstr,
157 0 : dump_db_table,
158 0 : } => {
159 0 : if let NodeKind::Safekeeper = node_kind {
160 0 : let dump_db_connstr =
161 0 : dump_db_connstr.ok_or(anyhow::anyhow!("dump_db_connstr not specified"))?;
162 0 : let dump_db_table =
163 0 : dump_db_table.ok_or(anyhow::anyhow!("dump_db_table not specified"))?;
164 0 :
165 0 : let summary = scan_safekeeper_metadata(
166 0 : bucket_config.clone(),
167 0 : tenant_ids.iter().map(|tshid| tshid.tenant_id).collect(),
168 0 : dump_db_connstr,
169 0 : dump_db_table,
170 0 : )
171 0 : .await?;
172 0 : if json {
173 0 : println!("{}", serde_json::to_string(&summary).unwrap())
174 0 : } else {
175 0 : println!("{}", summary.summary_string());
176 0 : }
177 0 : if summary.is_fatal() {
178 0 : bail!("Fatal scrub errors detected");
179 0 : }
180 0 : if summary.is_empty() {
181 0 : // Strictly speaking an empty bucket is a valid bucket, but if someone ran the
182 0 : // scrubber they were likely expecting to scan something, and if we see no timelines
183 0 : // at all then it's likely due to some configuration issues like a bad prefix
184 0 : bail!(
185 0 : "No timelines found in bucket {} prefix {}",
186 0 : bucket_config.bucket,
187 0 : bucket_config
188 0 : .prefix_in_bucket
189 0 : .unwrap_or("<none>".to_string())
190 0 : );
191 0 : }
192 0 : Ok(())
193 0 : } else {
194 0 : scan_pageserver_metadata_cmd(
195 0 : bucket_config,
196 0 : controller_client.as_ref(),
197 0 : tenant_ids,
198 0 : json,
199 0 : post_to_storcon,
200 0 : )
201 0 : .await
202 0 : }
203 0 : }
204 0 : Command::FindGarbage {
205 0 : node_kind,
206 0 : depth,
207 0 : output_path,
208 0 : } => {
209 0 : let console_config = ConsoleConfig::from_env()?;
210 0 : find_garbage(bucket_config, console_config, depth, node_kind, output_path).await
211 0 : }
212 0 : Command::PurgeGarbage {
213 0 : input_path,
214 0 : mode,
215 0 : min_age,
216 0 : } => purge_garbage(input_path, mode, min_age.into(), !cli.delete).await,
217 0 : Command::TenantSnapshot {
218 0 : tenant_id,
219 0 : output_path,
220 0 : concurrency,
221 0 : } => {
222 0 : let downloader =
223 0 : SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency).await?;
224 0 : downloader.download().await
225 0 : }
226 0 : Command::PageserverPhysicalGc {
227 0 : tenant_ids,
228 0 : min_age,
229 0 : mode,
230 0 : } => {
231 0 : pageserver_physical_gc_cmd(
232 0 : &bucket_config,
233 0 : controller_client.as_ref(),
234 0 : tenant_ids,
235 0 : min_age,
236 0 : mode,
237 0 : )
238 0 : .await
239 0 : }
240 0 : Command::FindLargeObjects {
241 0 : min_size,
242 0 : ignore_deltas,
243 0 : concurrency,
244 0 : } => {
245 0 : let summary = find_large_objects::find_large_objects(
246 0 : bucket_config,
247 0 : min_size,
248 0 : ignore_deltas,
249 0 : concurrency,
250 0 : )
251 0 : .await?;
252 0 : println!("{}", serde_json::to_string(&summary).unwrap());
253 0 : Ok(())
254 0 : }
255 0 : Command::CronJob {
256 0 : gc_min_age,
257 0 : gc_mode,
258 0 : post_to_storcon,
259 0 : } => {
260 0 : run_cron_job(
261 0 : bucket_config,
262 0 : controller_client.as_ref(),
263 0 : gc_min_age,
264 0 : gc_mode,
265 0 : post_to_storcon,
266 0 : )
267 0 : .await
268 0 : }
269 0 : }
270 0 : }
271 :
272 : /// Runs the scrubber cron job.
273 : /// 1. Do pageserver physical gc
274 : /// 2. Scan pageserver metadata
275 0 : pub async fn run_cron_job(
276 0 : bucket_config: BucketConfig,
277 0 : controller_client: Option<&control_api::Client>,
278 0 : gc_min_age: humantime::Duration,
279 0 : gc_mode: GcMode,
280 0 : post_to_storcon: bool,
281 0 : ) -> anyhow::Result<()> {
282 0 : tracing::info!(%gc_min_age, %gc_mode, "Running pageserver-physical-gc");
283 0 : pageserver_physical_gc_cmd(
284 0 : &bucket_config,
285 0 : controller_client,
286 0 : Vec::new(),
287 0 : gc_min_age,
288 0 : gc_mode,
289 0 : )
290 0 : .await?;
291 0 : tracing::info!(%post_to_storcon, node_kind = %NodeKind::Pageserver, "Running scan-metadata");
292 0 : scan_pageserver_metadata_cmd(
293 0 : bucket_config,
294 0 : controller_client,
295 0 : Vec::new(),
296 0 : true,
297 0 : post_to_storcon,
298 0 : )
299 0 : .await?;
300 :
301 0 : Ok(())
302 0 : }
303 :
304 0 : pub async fn pageserver_physical_gc_cmd(
305 0 : bucket_config: &BucketConfig,
306 0 : controller_client: Option<&control_api::Client>,
307 0 : tenant_shard_ids: Vec<TenantShardId>,
308 0 : min_age: humantime::Duration,
309 0 : mode: GcMode,
310 0 : ) -> anyhow::Result<()> {
311 0 : match (controller_client, mode) {
312 0 : (Some(_), _) => {
313 0 : // Any mode may run when controller API is set
314 0 : }
315 : (None, GcMode::Full) => {
316 : // The part of physical GC where we erase ancestor layers cannot be done safely without
317 : // confirming the most recent complete shard split with the controller. Refuse to run, rather
318 : // than doing it unsafely.
319 0 : return Err(anyhow!(
320 0 : "Full physical GC requires `--controller-api` and `--controller-jwt` to run"
321 0 : ));
322 : }
323 0 : (None, GcMode::DryRun | GcMode::IndicesOnly) => {
324 0 : // These GcModes do not require the controller to run.
325 0 : }
326 : }
327 :
328 0 : let summary = pageserver_physical_gc(
329 0 : bucket_config,
330 0 : controller_client,
331 0 : tenant_shard_ids,
332 0 : min_age.into(),
333 0 : mode,
334 0 : )
335 0 : .await?;
336 0 : println!("{}", serde_json::to_string(&summary).unwrap());
337 0 : Ok(())
338 0 : }
339 :
340 0 : pub async fn scan_pageserver_metadata_cmd(
341 0 : bucket_config: BucketConfig,
342 0 : controller_client: Option<&control_api::Client>,
343 0 : tenant_shard_ids: Vec<TenantShardId>,
344 0 : json: bool,
345 0 : post_to_storcon: bool,
346 0 : ) -> anyhow::Result<()> {
347 0 : if controller_client.is_none() && post_to_storcon {
348 0 : return Err(anyhow!("Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"));
349 0 : }
350 0 : match scan_pageserver_metadata(bucket_config.clone(), tenant_shard_ids).await {
351 0 : Err(e) => {
352 0 : tracing::error!("Failed: {e}");
353 0 : Err(e)
354 : }
355 0 : Ok(summary) => {
356 0 : if json {
357 0 : println!("{}", serde_json::to_string(&summary).unwrap())
358 0 : } else {
359 0 : println!("{}", summary.summary_string());
360 0 : }
361 :
362 0 : if post_to_storcon {
363 0 : if let Some(client) = controller_client {
364 0 : let body = summary.build_health_update_request();
365 0 : client
366 0 : .dispatch::<MetadataHealthUpdateRequest, MetadataHealthUpdateResponse>(
367 0 : Method::POST,
368 0 : "control/v1/metadata_health/update".to_string(),
369 0 : Some(body),
370 0 : )
371 0 : .await?;
372 0 : }
373 0 : }
374 :
375 0 : if summary.is_fatal() {
376 0 : tracing::error!("Fatal scrub errors detected");
377 0 : } else if summary.is_empty() {
378 : // Strictly speaking an empty bucket is a valid bucket, but if someone ran the
379 : // scrubber they were likely expecting to scan something, and if we see no timelines
380 : // at all then it's likely due to some configuration issues like a bad prefix
381 0 : tracing::error!(
382 0 : "No timelines found in bucket {} prefix {}",
383 0 : bucket_config.bucket,
384 0 : bucket_config
385 0 : .prefix_in_bucket
386 0 : .unwrap_or("<none>".to_string())
387 : );
388 0 : }
389 :
390 0 : Ok(())
391 : }
392 : }
393 0 : }
|