Line data Source code
1 : use anyhow::{anyhow, bail};
2 : use camino::Utf8PathBuf;
3 : use pageserver_api::controller_api::{MetadataHealthUpdateRequest, MetadataHealthUpdateResponse};
4 : use pageserver_api::shard::TenantShardId;
5 : use reqwest::{Method, Url};
6 : use storage_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
7 : use storage_scrubber::pageserver_physical_gc::GcMode;
8 : use storage_scrubber::scan_pageserver_metadata::scan_metadata;
9 : use storage_scrubber::tenant_snapshot::SnapshotDownloader;
10 : use storage_scrubber::{find_large_objects, ControllerClientConfig};
11 : use storage_scrubber::{
12 : init_logging, pageserver_physical_gc::pageserver_physical_gc,
13 : scan_safekeeper_metadata::scan_safekeeper_metadata, BucketConfig, ConsoleConfig, NodeKind,
14 : TraversingDepth,
15 : };
16 :
17 : use clap::{Parser, Subcommand};
18 : use utils::id::TenantId;
19 :
20 : use utils::{project_build_tag, project_git_version};
21 :
22 : project_git_version!(GIT_VERSION);
23 : project_build_tag!(BUILD_TAG);
24 :
25 0 : #[derive(Parser)]
26 : #[command(author, version, about, long_about = None)]
27 : #[command(arg_required_else_help(true))]
28 : struct Cli {
29 : #[command(subcommand)]
30 : command: Command,
31 :
32 0 : #[arg(short, long, default_value_t = false)]
33 0 : delete: bool,
34 :
35 : #[arg(long)]
36 : /// URL to storage controller. e.g. http://127.0.0.1:1234 when using `neon_local`
37 : controller_api: Option<Url>,
38 :
39 : #[arg(long)]
40 : /// JWT token for authenticating with storage controller. Requires scope 'scrubber' or 'admin'.
41 : controller_jwt: Option<String>,
42 : }
43 :
44 0 : #[derive(Subcommand, Debug)]
45 : enum Command {
46 : FindGarbage {
47 : #[arg(short, long)]
48 0 : node_kind: NodeKind,
49 0 : #[arg(short, long, default_value_t=TraversingDepth::Tenant)]
50 0 : depth: TraversingDepth,
51 0 : #[arg(short, long, default_value_t = String::from("garbage.json"))]
52 0 : output_path: String,
53 : },
54 : PurgeGarbage {
55 : #[arg(short, long)]
56 0 : input_path: String,
57 0 : #[arg(short, long, default_value_t = PurgeMode::DeletedOnly)]
58 0 : mode: PurgeMode,
59 : #[arg(long = "min-age")]
60 0 : min_age: humantime::Duration,
61 : },
62 : #[command(verbatim_doc_comment)]
63 : ScanMetadata {
64 : #[arg(short, long)]
65 0 : node_kind: NodeKind,
66 0 : #[arg(short, long, default_value_t = false)]
67 0 : json: bool,
68 : #[arg(long = "tenant-id", num_args = 0..)]
69 0 : tenant_ids: Vec<TenantShardId>,
70 0 : #[arg(long = "post", default_value_t = false)]
71 0 : post_to_storage_controller: bool,
72 : #[arg(long, default_value = None)]
73 : /// For safekeeper node_kind only, points to db with debug dump
74 : dump_db_connstr: Option<String>,
75 : /// For safekeeper node_kind only, table in the db with debug dump
76 : #[arg(long, default_value = None)]
77 : dump_db_table: Option<String>,
78 : },
79 : TenantSnapshot {
80 : #[arg(long = "tenant-id")]
81 0 : tenant_id: TenantId,
82 0 : #[arg(long = "concurrency", short = 'j', default_value_t = 8)]
83 0 : concurrency: usize,
84 : #[arg(short, long)]
85 0 : output_path: Utf8PathBuf,
86 : },
87 : PageserverPhysicalGc {
88 : #[arg(long = "tenant-id", num_args = 0..)]
89 0 : tenant_ids: Vec<TenantShardId>,
90 : #[arg(long = "min-age")]
91 0 : min_age: humantime::Duration,
92 0 : #[arg(short, long, default_value_t = GcMode::IndicesOnly)]
93 0 : mode: GcMode,
94 : },
95 : FindLargeObjects {
96 : #[arg(long = "min-size")]
97 0 : min_size: u64,
98 0 : #[arg(short, long, default_value_t = false)]
99 0 : ignore_deltas: bool,
100 0 : #[arg(long = "concurrency", short = 'j', default_value_t = 64)]
101 0 : concurrency: usize,
102 : },
103 : }
104 :
105 : #[tokio::main]
106 0 : async fn main() -> anyhow::Result<()> {
107 0 : let cli = Cli::parse();
108 0 :
109 0 : tracing::info!("version: {}, build_tag {}", GIT_VERSION, BUILD_TAG);
110 0 :
111 0 : let bucket_config = BucketConfig::from_env()?;
112 0 :
113 0 : let command_log_name = match &cli.command {
114 0 : Command::ScanMetadata { .. } => "scan",
115 0 : Command::FindGarbage { .. } => "find-garbage",
116 0 : Command::PurgeGarbage { .. } => "purge-garbage",
117 0 : Command::TenantSnapshot { .. } => "tenant-snapshot",
118 0 : Command::PageserverPhysicalGc { .. } => "pageserver-physical-gc",
119 0 : Command::FindLargeObjects { .. } => "find-large-objects",
120 0 : };
121 0 : let _guard = init_logging(&format!(
122 0 : "{}_{}_{}_{}.log",
123 0 : std::env::args().next().unwrap(),
124 0 : command_log_name,
125 0 : bucket_config.bucket,
126 0 : chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
127 0 : ));
128 0 :
129 0 : let controller_client_conf = cli.controller_api.map(|controller_api| {
130 0 : ControllerClientConfig {
131 0 : controller_api,
132 0 : // Default to no key: this is a convenience when working in a development environment
133 0 : controller_jwt: cli.controller_jwt.unwrap_or("".to_owned()),
134 0 : }
135 0 : });
136 0 :
137 0 : match cli.command {
138 0 : Command::ScanMetadata {
139 0 : json,
140 0 : tenant_ids,
141 0 : node_kind,
142 0 : post_to_storage_controller,
143 0 : dump_db_connstr,
144 0 : dump_db_table,
145 0 : } => {
146 0 : if let NodeKind::Safekeeper = node_kind {
147 0 : let dump_db_connstr =
148 0 : dump_db_connstr.ok_or(anyhow::anyhow!("dump_db_connstr not specified"))?;
149 0 : let dump_db_table =
150 0 : dump_db_table.ok_or(anyhow::anyhow!("dump_db_table not specified"))?;
151 0 :
152 0 : let summary = scan_safekeeper_metadata(
153 0 : bucket_config.clone(),
154 0 : tenant_ids.iter().map(|tshid| tshid.tenant_id).collect(),
155 0 : dump_db_connstr,
156 0 : dump_db_table,
157 0 : )
158 0 : .await?;
159 0 : if json {
160 0 : println!("{}", serde_json::to_string(&summary).unwrap())
161 0 : } else {
162 0 : println!("{}", summary.summary_string());
163 0 : }
164 0 : if summary.is_fatal() {
165 0 : bail!("Fatal scrub errors detected");
166 0 : }
167 0 : if summary.is_empty() {
168 0 : // Strictly speaking an empty bucket is a valid bucket, but if someone ran the
169 0 : // scrubber they were likely expecting to scan something, and if we see no timelines
170 0 : // at all then it's likely due to some configuration issues like a bad prefix
171 0 : bail!(
172 0 : "No timelines found in bucket {} prefix {}",
173 0 : bucket_config.bucket,
174 0 : bucket_config
175 0 : .prefix_in_bucket
176 0 : .unwrap_or("<none>".to_string())
177 0 : );
178 0 : }
179 0 : Ok(())
180 0 : } else {
181 0 : if controller_client_conf.is_none() && post_to_storage_controller {
182 0 : return Err(anyhow!("Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"));
183 0 : }
184 0 : match scan_metadata(bucket_config.clone(), tenant_ids).await {
185 0 : Err(e) => {
186 0 : tracing::error!("Failed: {e}");
187 0 : Err(e)
188 0 : }
189 0 : Ok(summary) => {
190 0 : if json {
191 0 : println!("{}", serde_json::to_string(&summary).unwrap())
192 0 : } else {
193 0 : println!("{}", summary.summary_string());
194 0 : }
195 0 :
196 0 : if post_to_storage_controller {
197 0 : if let Some(conf) = controller_client_conf {
198 0 : let controller_client = conf.build_client();
199 0 : let body = summary.build_health_update_request();
200 0 : controller_client
201 0 : .dispatch::<MetadataHealthUpdateRequest, MetadataHealthUpdateResponse>(
202 0 : Method::POST,
203 0 : "control/v1/metadata_health/update".to_string(),
204 0 : Some(body),
205 0 : )
206 0 : .await?;
207 0 : }
208 0 : }
209 0 :
210 0 : if summary.is_fatal() {
211 0 : Err(anyhow::anyhow!("Fatal scrub errors detected"))
212 0 : } else if summary.is_empty() {
213 0 : // Strictly speaking an empty bucket is a valid bucket, but if someone ran the
214 0 : // scrubber they were likely expecting to scan something, and if we see no timelines
215 0 : // at all then it's likely due to some configuration issues like a bad prefix
216 0 : Err(anyhow::anyhow!(
217 0 : "No timelines found in bucket {} prefix {}",
218 0 : bucket_config.bucket,
219 0 : bucket_config
220 0 : .prefix_in_bucket
221 0 : .unwrap_or("<none>".to_string())
222 0 : ))
223 0 : } else {
224 0 : Ok(())
225 0 : }
226 0 : }
227 0 : }
228 0 : }
229 0 : }
230 0 : Command::FindGarbage {
231 0 : node_kind,
232 0 : depth,
233 0 : output_path,
234 0 : } => {
235 0 : let console_config = ConsoleConfig::from_env()?;
236 0 : find_garbage(bucket_config, console_config, depth, node_kind, output_path).await
237 0 : }
238 0 : Command::PurgeGarbage {
239 0 : input_path,
240 0 : mode,
241 0 : min_age,
242 0 : } => purge_garbage(input_path, mode, min_age.into(), !cli.delete).await,
243 0 : Command::TenantSnapshot {
244 0 : tenant_id,
245 0 : output_path,
246 0 : concurrency,
247 0 : } => {
248 0 : let downloader =
249 0 : SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency).await?;
250 0 : downloader.download().await
251 0 : }
252 0 : Command::PageserverPhysicalGc {
253 0 : tenant_ids,
254 0 : min_age,
255 0 : mode,
256 0 : } => {
257 0 : match (&controller_client_conf, mode) {
258 0 : (Some(_), _) => {
259 0 : // Any mode may run when controller API is set
260 0 : }
261 0 : (None, GcMode::Full) => {
262 0 : // The part of physical GC where we erase ancestor layers cannot be done safely without
263 0 : // confirming the most recent complete shard split with the controller. Refuse to run, rather
264 0 : // than doing it unsafely.
265 0 : return Err(anyhow!("Full physical GC requires `--controller-api` and `--controller-jwt` to run"));
266 0 : }
267 0 : (None, GcMode::DryRun | GcMode::IndicesOnly) => {
268 0 : // These GcModes do not require the controller to run.
269 0 : }
270 0 : }
271 0 :
272 0 : let summary = pageserver_physical_gc(
273 0 : bucket_config,
274 0 : controller_client_conf,
275 0 : tenant_ids,
276 0 : min_age.into(),
277 0 : mode,
278 0 : )
279 0 : .await?;
280 0 : println!("{}", serde_json::to_string(&summary).unwrap());
281 0 : Ok(())
282 0 : }
283 0 : Command::FindLargeObjects {
284 0 : min_size,
285 0 : ignore_deltas,
286 0 : concurrency,
287 0 : } => {
288 0 : let summary = find_large_objects::find_large_objects(
289 0 : bucket_config,
290 0 : min_size,
291 0 : ignore_deltas,
292 0 : concurrency,
293 0 : )
294 0 : .await?;
295 0 : println!("{}", serde_json::to_string(&summary).unwrap());
296 0 : Ok(())
297 0 : }
298 0 : }
299 0 : }
|