Line data Source code
1 : use anyhow::bail;
2 : use camino::Utf8PathBuf;
3 : use pageserver_api::shard::TenantShardId;
4 : use storage_scrubber::find_large_objects;
5 : use storage_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
6 : use storage_scrubber::pageserver_physical_gc::GcMode;
7 : use storage_scrubber::scan_pageserver_metadata::scan_metadata;
8 : use storage_scrubber::tenant_snapshot::SnapshotDownloader;
9 : use storage_scrubber::{
10 : init_logging, pageserver_physical_gc::pageserver_physical_gc,
11 : scan_safekeeper_metadata::scan_safekeeper_metadata, BucketConfig, ConsoleConfig, NodeKind,
12 : TraversingDepth,
13 : };
14 :
15 : use clap::{Parser, Subcommand};
16 : use utils::id::TenantId;
17 :
18 0 : #[derive(Parser)]
19 : #[command(author, version, about, long_about = None)]
20 : #[command(arg_required_else_help(true))]
21 : struct Cli {
22 : #[command(subcommand)]
23 : command: Command,
24 :
25 0 : #[arg(short, long, default_value_t = false)]
26 0 : delete: bool,
27 : }
28 :
29 0 : #[derive(Subcommand, Debug)]
30 : enum Command {
31 : FindGarbage {
32 : #[arg(short, long)]
33 0 : node_kind: NodeKind,
34 0 : #[arg(short, long, default_value_t=TraversingDepth::Tenant)]
35 0 : depth: TraversingDepth,
36 0 : #[arg(short, long, default_value_t = String::from("garbage.json"))]
37 0 : output_path: String,
38 : },
39 : PurgeGarbage {
40 : #[arg(short, long)]
41 0 : input_path: String,
42 0 : #[arg(short, long, default_value_t = PurgeMode::DeletedOnly)]
43 0 : mode: PurgeMode,
44 : },
45 : #[command(verbatim_doc_comment)]
46 : ScanMetadata {
47 : #[arg(short, long)]
48 0 : node_kind: NodeKind,
49 0 : #[arg(short, long, default_value_t = false)]
50 0 : json: bool,
51 : #[arg(long = "tenant-id", num_args = 0..)]
52 0 : tenant_ids: Vec<TenantShardId>,
53 : #[arg(long, default_value = None)]
54 : /// For safekeeper node_kind only, points to db with debug dump
55 : dump_db_connstr: Option<String>,
56 : /// For safekeeper node_kind only, table in the db with debug dump
57 : #[arg(long, default_value = None)]
58 : dump_db_table: Option<String>,
59 : },
60 : TenantSnapshot {
61 : #[arg(long = "tenant-id")]
62 0 : tenant_id: TenantId,
63 0 : #[arg(long = "concurrency", short = 'j', default_value_t = 8)]
64 0 : concurrency: usize,
65 : #[arg(short, long)]
66 0 : output_path: Utf8PathBuf,
67 : },
68 : PageserverPhysicalGc {
69 : #[arg(long = "tenant-id", num_args = 0..)]
70 0 : tenant_ids: Vec<TenantShardId>,
71 : #[arg(long = "min-age")]
72 0 : min_age: humantime::Duration,
73 0 : #[arg(short, long, default_value_t = GcMode::IndicesOnly)]
74 0 : mode: GcMode,
75 : },
76 : FindLargeObjects {
77 : #[arg(long = "min-size")]
78 0 : min_size: u64,
79 0 : #[arg(short, long, default_value_t = false)]
80 0 : ignore_deltas: bool,
81 0 : #[arg(long = "concurrency", short = 'j', default_value_t = 64)]
82 0 : concurrency: usize,
83 : },
84 : }
85 :
86 : #[tokio::main]
87 0 : async fn main() -> anyhow::Result<()> {
88 0 : let cli = Cli::parse();
89 0 :
90 0 : let bucket_config = BucketConfig::from_env()?;
91 0 :
92 0 : let command_log_name = match &cli.command {
93 0 : Command::ScanMetadata { .. } => "scan",
94 0 : Command::FindGarbage { .. } => "find-garbage",
95 0 : Command::PurgeGarbage { .. } => "purge-garbage",
96 0 : Command::TenantSnapshot { .. } => "tenant-snapshot",
97 0 : Command::PageserverPhysicalGc { .. } => "pageserver-physical-gc",
98 0 : Command::FindLargeObjects { .. } => "find-large-objects",
99 0 : };
100 0 : let _guard = init_logging(&format!(
101 0 : "{}_{}_{}_{}.log",
102 0 : std::env::args().next().unwrap(),
103 0 : command_log_name,
104 0 : bucket_config.bucket,
105 0 : chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
106 0 : ));
107 0 :
108 0 : match cli.command {
109 0 : Command::ScanMetadata {
110 0 : json,
111 0 : tenant_ids,
112 0 : node_kind,
113 0 : dump_db_connstr,
114 0 : dump_db_table,
115 0 : } => {
116 0 : if let NodeKind::Safekeeper = node_kind {
117 0 : let dump_db_connstr =
118 0 : dump_db_connstr.ok_or(anyhow::anyhow!("dump_db_connstr not specified"))?;
119 0 : let dump_db_table =
120 0 : dump_db_table.ok_or(anyhow::anyhow!("dump_db_table not specified"))?;
121 0 :
122 0 : let summary = scan_safekeeper_metadata(
123 0 : bucket_config.clone(),
124 0 : tenant_ids.iter().map(|tshid| tshid.tenant_id).collect(),
125 0 : dump_db_connstr,
126 0 : dump_db_table,
127 0 : )
128 0 : .await?;
129 0 : if json {
130 0 : println!("{}", serde_json::to_string(&summary).unwrap())
131 0 : } else {
132 0 : println!("{}", summary.summary_string());
133 0 : }
134 0 : if summary.is_fatal() {
135 0 : bail!("Fatal scrub errors detected");
136 0 : }
137 0 : if summary.is_empty() {
138 0 : // Strictly speaking an empty bucket is a valid bucket, but if someone ran the
139 0 : // scrubber they were likely expecting to scan something, and if we see no timelines
140 0 : // at all then it's likely due to some configuration issues like a bad prefix
141 0 : bail!(
142 0 : "No timelines found in bucket {} prefix {}",
143 0 : bucket_config.bucket,
144 0 : bucket_config
145 0 : .prefix_in_bucket
146 0 : .unwrap_or("<none>".to_string())
147 0 : );
148 0 : }
149 0 : Ok(())
150 0 : } else {
151 0 : match scan_metadata(bucket_config.clone(), tenant_ids).await {
152 0 : Err(e) => {
153 0 : tracing::error!("Failed: {e}");
154 0 : Err(e)
155 0 : }
156 0 : Ok(summary) => {
157 0 : if json {
158 0 : println!("{}", serde_json::to_string(&summary).unwrap())
159 0 : } else {
160 0 : println!("{}", summary.summary_string());
161 0 : }
162 0 : if summary.is_fatal() {
163 0 : Err(anyhow::anyhow!("Fatal scrub errors detected"))
164 0 : } else if summary.is_empty() {
165 0 : // Strictly speaking an empty bucket is a valid bucket, but if someone ran the
166 0 : // scrubber they were likely expecting to scan something, and if we see no timelines
167 0 : // at all then it's likely due to some configuration issues like a bad prefix
168 0 : Err(anyhow::anyhow!(
169 0 : "No timelines found in bucket {} prefix {}",
170 0 : bucket_config.bucket,
171 0 : bucket_config
172 0 : .prefix_in_bucket
173 0 : .unwrap_or("<none>".to_string())
174 0 : ))
175 0 : } else {
176 0 : Ok(())
177 0 : }
178 0 : }
179 0 : }
180 0 : }
181 0 : }
182 0 : Command::FindGarbage {
183 0 : node_kind,
184 0 : depth,
185 0 : output_path,
186 0 : } => {
187 0 : let console_config = ConsoleConfig::from_env()?;
188 0 : find_garbage(bucket_config, console_config, depth, node_kind, output_path).await
189 0 : }
190 0 : Command::PurgeGarbage { input_path, mode } => {
191 0 : purge_garbage(input_path, mode, !cli.delete).await
192 0 : }
193 0 : Command::TenantSnapshot {
194 0 : tenant_id,
195 0 : output_path,
196 0 : concurrency,
197 0 : } => {
198 0 : let downloader =
199 0 : SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency)?;
200 0 : downloader.download().await
201 0 : }
202 0 : Command::PageserverPhysicalGc {
203 0 : tenant_ids,
204 0 : min_age,
205 0 : mode,
206 0 : } => {
207 0 : let summary =
208 0 : pageserver_physical_gc(bucket_config, tenant_ids, min_age.into(), mode).await?;
209 0 : println!("{}", serde_json::to_string(&summary).unwrap());
210 0 : Ok(())
211 0 : }
212 0 : Command::FindLargeObjects {
213 0 : min_size,
214 0 : ignore_deltas,
215 0 : concurrency,
216 0 : } => {
217 0 : let summary = find_large_objects::find_large_objects(
218 0 : bucket_config,
219 0 : min_size,
220 0 : ignore_deltas,
221 0 : concurrency,
222 0 : )
223 0 : .await?;
224 0 : println!("{}", serde_json::to_string(&summary).unwrap());
225 0 : Ok(())
226 0 : }
227 0 : }
228 0 : }
|