TLA Line data Source code
1 : use std::collections::HashMap;
2 : use std::fmt::Display;
3 : use std::num::NonZeroUsize;
4 : use std::sync::Arc;
5 :
6 : use anyhow::Context;
7 : use aws_sdk_s3::config::Region;
8 : use s3_scrubber::cloud_admin_api::CloudAdminApiClient;
9 : use s3_scrubber::delete_batch_producer::DeleteBatchProducer;
10 : use s3_scrubber::scan_metadata::scan_metadata;
11 : use s3_scrubber::{
12 : checks, get_cloud_admin_api_token_or_exit, init_logging, init_s3_client, BucketConfig,
13 : ConsoleConfig, RootTarget, S3Deleter, S3Target, TraversingDepth, CLI_NAME,
14 : };
15 : use tracing::{info, warn};
16 :
17 : use clap::{Parser, Subcommand, ValueEnum};
18 :
19 UBC 0 : #[derive(Parser)]
20 : #[command(author, version, about, long_about = None)]
21 : #[command(arg_required_else_help(true))]
22 : struct Cli {
23 : #[command(subcommand)]
24 : command: Command,
25 :
26 0 : #[arg(short, long, default_value_t = false)]
27 0 : delete: bool,
28 0 : }
29 :
30 0 : #[derive(ValueEnum, Clone, Copy, Eq, PartialEq)]
31 : enum NodeKind {
32 : Safekeeper,
33 : Pageserver,
34 : }
35 :
36 : impl NodeKind {
37 0 : fn as_str(&self) -> &'static str {
38 0 : match self {
39 0 : Self::Safekeeper => "safekeeper",
40 0 : Self::Pageserver => "pageserver",
41 : }
42 0 : }
43 : }
44 :
45 : impl Display for NodeKind {
46 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 0 : f.write_str(self.as_str())
48 0 : }
49 : }
50 :
51 0 : #[derive(Subcommand)]
52 : enum Command {
53 : Tidy {
54 : #[arg(short, long)]
55 0 : node_kind: NodeKind,
56 0 : #[arg(short, long, default_value_t=TraversingDepth::Tenant)]
57 0 : depth: TraversingDepth,
58 0 : #[arg(short, long, default_value_t = false)]
59 0 : skip_validation: bool,
60 0 : },
61 : ScanMetadata {},
62 : }
63 :
64 0 : async fn tidy(
65 0 : cli: &Cli,
66 0 : bucket_config: BucketConfig,
67 0 : console_config: ConsoleConfig,
68 0 : node_kind: NodeKind,
69 0 : depth: TraversingDepth,
70 0 : skip_validation: bool,
71 0 : ) -> anyhow::Result<()> {
72 0 : let dry_run = !cli.delete;
73 0 : let file_name = if dry_run {
74 0 : format!(
75 0 : "{}_{}_{}__dry.log",
76 0 : CLI_NAME,
77 0 : node_kind,
78 0 : chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
79 0 : )
80 : } else {
81 0 : format!(
82 0 : "{}_{}_{}.log",
83 0 : CLI_NAME,
84 0 : node_kind,
85 0 : chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
86 0 : )
87 : };
88 :
89 0 : let _guard = init_logging(&file_name);
90 0 :
91 0 : if dry_run {
92 0 : info!("Dry run, not removing items for real");
93 : } else {
94 0 : warn!("Dry run disabled, removing bucket items for real");
95 : }
96 :
97 0 : info!("skip_validation={skip_validation}");
98 :
99 0 : info!("Starting extra S3 removal in {bucket_config} for node kind '{node_kind}', traversing depth: {depth:?}");
100 :
101 0 : info!("Starting extra tenant S3 removal in {bucket_config} for node kind '{node_kind}'");
102 0 : let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(
103 0 : get_cloud_admin_api_token_or_exit(),
104 0 : console_config.admin_api_url,
105 0 : ));
106 0 :
107 0 : let bucket_region = Region::new(bucket_config.region);
108 0 : let delimiter = "/".to_string();
109 0 : let s3_client = Arc::new(init_s3_client(bucket_config.sso_account_id, bucket_region));
110 0 : let s3_root = match node_kind {
111 0 : NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
112 0 : bucket_name: bucket_config.bucket,
113 0 : prefix_in_bucket: ["pageserver", "v1", "tenants", ""].join(&delimiter),
114 0 : delimiter,
115 0 : }),
116 0 : NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target {
117 0 : bucket_name: bucket_config.bucket,
118 0 : prefix_in_bucket: ["safekeeper", "v1", "wal", ""].join(&delimiter),
119 0 : delimiter,
120 0 : }),
121 : };
122 :
123 0 : let delete_batch_producer = DeleteBatchProducer::start(
124 0 : Arc::clone(&cloud_admin_api_client),
125 0 : Arc::clone(&s3_client),
126 0 : s3_root.clone(),
127 0 : depth,
128 0 : );
129 0 :
130 0 : let s3_deleter = S3Deleter::new(
131 0 : dry_run,
132 0 : NonZeroUsize::new(15).unwrap(),
133 0 : Arc::clone(&s3_client),
134 0 : delete_batch_producer.subscribe(),
135 0 : s3_root.clone(),
136 0 : );
137 :
138 0 : let (deleter_task_result, batch_producer_task_result) =
139 0 : tokio::join!(s3_deleter.remove_all(), delete_batch_producer.join());
140 :
141 0 : let deletion_stats = deleter_task_result.context("s3 deletion")?;
142 0 : info!(
143 0 : "Deleted {} tenants ({} keys) and {} timelines ({} keys) total. Dry run: {}",
144 0 : deletion_stats.deleted_tenant_keys.len(),
145 0 : deletion_stats.deleted_tenant_keys.values().sum::<usize>(),
146 0 : deletion_stats.deleted_timeline_keys.len(),
147 0 : deletion_stats.deleted_timeline_keys.values().sum::<usize>(),
148 0 : dry_run,
149 0 : );
150 0 : info!(
151 0 : "Total tenant deletion stats: {:?}",
152 0 : deletion_stats
153 0 : .deleted_tenant_keys
154 0 : .into_iter()
155 0 : .map(|(id, key)| (id.to_string(), key))
156 0 : .collect::<HashMap<_, _>>()
157 0 : );
158 0 : info!(
159 0 : "Total timeline deletion stats: {:?}",
160 0 : deletion_stats
161 0 : .deleted_timeline_keys
162 0 : .into_iter()
163 0 : .map(|(id, key)| (id.to_string(), key))
164 0 : .collect::<HashMap<_, _>>()
165 0 : );
166 :
167 0 : let batch_producer_stats = batch_producer_task_result.context("delete batch producer join")?;
168 0 : info!(
169 0 : "Total bucket tenants listed: {}; for {} active tenants, timelines checked: {}",
170 0 : batch_producer_stats.tenants_checked(),
171 0 : batch_producer_stats.active_tenants(),
172 0 : batch_producer_stats.timelines_checked()
173 0 : );
174 :
175 0 : if node_kind == NodeKind::Pageserver {
176 0 : info!("node_kind != pageserver, finish without performing validation step");
177 0 : return Ok(());
178 0 : }
179 0 :
180 0 : if skip_validation {
181 0 : info!("--skip-validation is set, exiting");
182 0 : return Ok(());
183 0 : }
184 :
185 0 : info!("validating active tenants and timelines for pageserver S3 data");
186 :
187 : // TODO kb real stats for validation + better stats for every place: add and print `min`, `max`, `mean` values at least
188 0 : let validation_stats = checks::validate_pageserver_active_tenant_and_timelines(
189 0 : s3_client,
190 0 : s3_root,
191 0 : cloud_admin_api_client,
192 0 : batch_producer_stats,
193 0 : )
194 0 : .await
195 0 : .context("active tenant and timeline validation")?;
196 0 : info!("Finished active tenant and timeline validation, correct timelines: {}, timeline validation errors: {}",
197 0 : validation_stats.normal_timelines.len(), validation_stats.timelines_with_errors.len());
198 0 : if !validation_stats.timelines_with_errors.is_empty() {
199 0 : warn!(
200 0 : "Validation errors: {:#?}",
201 0 : validation_stats
202 0 : .timelines_with_errors
203 0 : .into_iter()
204 0 : .map(|(id, errors)| (id.to_string(), format!("{errors:?}")))
205 0 : .collect::<HashMap<_, _>>()
206 0 : );
207 0 : }
208 :
209 0 : info!("Done");
210 0 : Ok(())
211 0 : }
212 :
213 : #[tokio::main]
214 0 : async fn main() -> anyhow::Result<()> {
215 0 : let cli = Cli::parse();
216 :
217 0 : let bucket_config = BucketConfig::from_env()?;
218 :
219 0 : match cli.command {
220 : Command::Tidy {
221 0 : node_kind,
222 0 : depth,
223 0 : skip_validation,
224 : } => {
225 0 : let console_config = ConsoleConfig::from_env()?;
226 0 : tidy(
227 0 : &cli,
228 0 : bucket_config,
229 0 : console_config,
230 0 : node_kind,
231 0 : depth,
232 0 : skip_validation,
233 0 : )
234 0 : .await
235 : }
236 0 : Command::ScanMetadata {} => match scan_metadata(bucket_config).await {
237 0 : Err(e) => {
238 0 : tracing::error!("Failed: {e}");
239 0 : Err(e)
240 : }
241 0 : Ok(summary) => {
242 0 : println!("{}", summary.summary_string());
243 0 : if summary.is_fatal() {
244 0 : Err(anyhow::anyhow!("Fatal scrub errors detected"))
245 : } else {
246 0 : Ok(())
247 : }
248 : }
249 : },
250 : }
251 : }
|