Line data Source code
1 : pub mod checks;
2 : pub mod cloud_admin_api;
3 : pub mod delete_batch_producer;
4 : pub mod metadata_stream;
5 : mod s3_deletion;
6 : pub mod scan_metadata;
7 :
8 : use std::env;
9 : use std::fmt::Display;
10 : use std::time::Duration;
11 :
12 : use anyhow::Context;
13 : use aws_config::environment::EnvironmentVariableCredentialsProvider;
14 : use aws_config::imds::credentials::ImdsCredentialsProvider;
15 : use aws_config::meta::credentials::CredentialsProviderChain;
16 : use aws_config::sso::SsoCredentialsProvider;
17 : use aws_sdk_s3::config::Region;
18 : use aws_sdk_s3::{Client, Config};
19 :
20 : use reqwest::Url;
21 : pub use s3_deletion::S3Deleter;
22 : use tokio::io::AsyncReadExt;
23 : use tracing::error;
24 : use tracing_appender::non_blocking::WorkerGuard;
25 : use tracing_subscriber::{fmt, prelude::*, EnvFilter};
26 : use utils::id::{TenantId, TenantTimelineId};
27 :
28 : const MAX_RETRIES: usize = 20;
29 : const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
30 :
31 : pub const CLI_NAME: &str = "s3-scrubber";
32 :
33 0 : #[derive(Debug, Clone)]
34 : pub struct S3Target {
35 : pub bucket_name: String,
36 : pub prefix_in_bucket: String,
37 : pub delimiter: String,
38 : }
39 :
40 0 : #[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
41 : pub enum TraversingDepth {
42 : Tenant,
43 : Timeline,
44 : }
45 :
46 : impl Display for TraversingDepth {
47 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 0 : f.write_str(match self {
49 0 : Self::Tenant => "tenant",
50 0 : Self::Timeline => "timeline",
51 : })
52 0 : }
53 : }
54 :
55 : impl S3Target {
56 0 : pub fn with_sub_segment(&self, new_segment: &str) -> Self {
57 0 : let mut new_self = self.clone();
58 0 : let _ = new_self.prefix_in_bucket.pop();
59 0 : new_self.prefix_in_bucket =
60 0 : [&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter);
61 0 : new_self
62 0 : }
63 : }
64 :
65 0 : #[derive(Clone)]
66 : pub enum RootTarget {
67 : Pageserver(S3Target),
68 : Safekeeper(S3Target),
69 : }
70 :
71 : impl RootTarget {
72 0 : pub fn tenants_root(&self) -> &S3Target {
73 0 : match self {
74 0 : Self::Pageserver(root) => root,
75 0 : Self::Safekeeper(root) => root,
76 : }
77 0 : }
78 :
79 0 : pub fn tenant_root(&self, tenant_id: &TenantId) -> S3Target {
80 0 : self.tenants_root().with_sub_segment(&tenant_id.to_string())
81 0 : }
82 :
83 0 : pub fn timelines_root(&self, tenant_id: &TenantId) -> S3Target {
84 0 : match self {
85 0 : Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
86 0 : Self::Safekeeper(_) => self.tenant_root(tenant_id),
87 : }
88 0 : }
89 :
90 0 : pub fn timeline_root(&self, id: &TenantTimelineId) -> S3Target {
91 0 : self.timelines_root(&id.tenant_id)
92 0 : .with_sub_segment(&id.timeline_id.to_string())
93 0 : }
94 :
95 0 : pub fn bucket_name(&self) -> &str {
96 0 : match self {
97 0 : Self::Pageserver(root) => &root.bucket_name,
98 0 : Self::Safekeeper(root) => &root.bucket_name,
99 : }
100 0 : }
101 :
102 0 : pub fn delimiter(&self) -> &str {
103 0 : match self {
104 0 : Self::Pageserver(root) => &root.delimiter,
105 0 : Self::Safekeeper(root) => &root.delimiter,
106 : }
107 0 : }
108 : }
109 :
110 : pub struct BucketConfig {
111 : pub region: String,
112 : pub bucket: String,
113 :
114 : /// Use SSO if this is set, else rely on AWS_* environment vars
115 : pub sso_account_id: Option<String>,
116 : }
117 :
118 : impl Display for BucketConfig {
119 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 0 : write!(
121 0 : f,
122 0 : "{}/{}/{}",
123 0 : self.sso_account_id.as_deref().unwrap_or("<none>"),
124 0 : self.region,
125 0 : self.bucket
126 0 : )
127 0 : }
128 : }
129 :
130 : impl BucketConfig {
131 0 : pub fn from_env() -> anyhow::Result<Self> {
132 0 : let sso_account_id = env::var("SSO_ACCOUNT_ID").ok();
133 0 : let region = env::var("REGION").context("'REGION' param retrieval")?;
134 0 : let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?;
135 :
136 0 : Ok(Self {
137 0 : region,
138 0 : bucket,
139 0 : sso_account_id,
140 0 : })
141 0 : }
142 : }
143 :
144 : pub struct ConsoleConfig {
145 : pub admin_api_url: Url,
146 : }
147 :
148 : impl ConsoleConfig {
149 0 : pub fn from_env() -> anyhow::Result<Self> {
150 0 : let admin_api_url: Url = env::var("CLOUD_ADMIN_API_URL")
151 0 : .context("'CLOUD_ADMIN_API_URL' param retrieval")?
152 0 : .parse()
153 0 : .context("'CLOUD_ADMIN_API_URL' param parsing")?;
154 :
155 0 : Ok(Self { admin_api_url })
156 0 : }
157 : }
158 :
159 0 : pub fn get_cloud_admin_api_token_or_exit() -> String {
160 0 : match env::var(CLOUD_ADMIN_API_TOKEN_ENV_VAR) {
161 0 : Ok(token) => token,
162 : Err(env::VarError::NotPresent) => {
163 0 : error!("{CLOUD_ADMIN_API_TOKEN_ENV_VAR} env variable is not present");
164 0 : std::process::exit(1);
165 : }
166 0 : Err(env::VarError::NotUnicode(not_unicode_string)) => {
167 0 : error!("{CLOUD_ADMIN_API_TOKEN_ENV_VAR} env variable's value is not a valid unicode string: {not_unicode_string:?}");
168 0 : std::process::exit(1);
169 : }
170 : }
171 0 : }
172 :
173 0 : pub fn init_logging(file_name: &str) -> WorkerGuard {
174 0 : let (file_writer, guard) =
175 0 : tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
176 0 :
177 0 : let file_logs = fmt::Layer::new()
178 0 : .with_target(false)
179 0 : .with_ansi(false)
180 0 : .with_writer(file_writer);
181 0 : let stdout_logs = fmt::Layer::new()
182 0 : .with_target(false)
183 0 : .with_writer(std::io::stdout);
184 0 : tracing_subscriber::registry()
185 0 : .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
186 0 : .with(file_logs)
187 0 : .with(stdout_logs)
188 0 : .init();
189 0 :
190 0 : guard
191 0 : }
192 :
193 0 : pub fn init_s3_client(account_id: Option<String>, bucket_region: Region) -> Client {
194 0 : let credentials_provider = {
195 : // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
196 0 : let chain = CredentialsProviderChain::first_try(
197 0 : "env",
198 0 : EnvironmentVariableCredentialsProvider::new(),
199 0 : );
200 0 :
201 0 : // Use SSO if we were given an account ID
202 0 : match account_id {
203 0 : Some(sso_account) => chain.or_else(
204 0 : "sso",
205 0 : SsoCredentialsProvider::builder()
206 0 : .account_id(sso_account)
207 0 : .role_name("PowerUserAccess")
208 0 : .start_url("https://neondb.awsapps.com/start")
209 0 : .region(Region::from_static("eu-central-1"))
210 0 : .build(),
211 0 : ),
212 0 : None => chain,
213 : }
214 0 : .or_else(
215 0 : // Finally try IMDS
216 0 : "imds",
217 0 : ImdsCredentialsProvider::builder().build(),
218 0 : )
219 0 : };
220 0 :
221 0 : let mut builder = Config::builder()
222 0 : .region(bucket_region)
223 0 : .credentials_provider(credentials_provider);
224 :
225 0 : if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") {
226 0 : builder = builder.endpoint_url(endpoint)
227 0 : }
228 :
229 0 : Client::from_conf(builder.build())
230 0 : }
231 :
232 0 : async fn list_objects_with_retries(
233 0 : s3_client: &Client,
234 0 : s3_target: &S3Target,
235 0 : continuation_token: Option<String>,
236 0 : ) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
237 0 : for _ in 0..MAX_RETRIES {
238 0 : match s3_client
239 0 : .list_objects_v2()
240 0 : .bucket(&s3_target.bucket_name)
241 0 : .prefix(&s3_target.prefix_in_bucket)
242 0 : .delimiter(&s3_target.delimiter)
243 0 : .set_continuation_token(continuation_token.clone())
244 0 : .send()
245 0 : .await
246 : {
247 0 : Ok(response) => return Ok(response),
248 0 : Err(e) => {
249 0 : error!("list_objects_v2 query failed: {e}");
250 0 : tokio::time::sleep(Duration::from_secs(1)).await;
251 : }
252 : }
253 : }
254 :
255 0 : anyhow::bail!("Failed to list objects {MAX_RETRIES} times")
256 0 : }
257 :
258 0 : async fn download_object_with_retries(
259 0 : s3_client: &Client,
260 0 : bucket_name: &str,
261 0 : key: &str,
262 0 : ) -> anyhow::Result<Vec<u8>> {
263 0 : for _ in 0..MAX_RETRIES {
264 0 : let mut body_buf = Vec::new();
265 0 : let response_stream = match s3_client
266 0 : .get_object()
267 0 : .bucket(bucket_name)
268 0 : .key(key)
269 0 : .send()
270 0 : .await
271 : {
272 0 : Ok(response) => response,
273 0 : Err(e) => {
274 0 : error!("Failed to download object for key {key}: {e}");
275 0 : tokio::time::sleep(Duration::from_secs(1)).await;
276 0 : continue;
277 : }
278 : };
279 :
280 0 : match response_stream
281 0 : .body
282 0 : .into_async_read()
283 0 : .read_to_end(&mut body_buf)
284 0 : .await
285 : {
286 0 : Ok(bytes_read) => {
287 0 : tracing::info!("Downloaded {bytes_read} bytes for object object with key {key}");
288 0 : return Ok(body_buf);
289 : }
290 0 : Err(e) => {
291 0 : error!("Failed to stream object body for key {key}: {e}");
292 0 : tokio::time::sleep(Duration::from_secs(1)).await;
293 : }
294 : }
295 : }
296 :
297 0 : anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
298 0 : }
|