TLA Line data Source code
1 : pub mod checks;
2 : pub mod cloud_admin_api;
3 : pub mod delete_batch_producer;
4 : pub mod metadata_stream;
5 : mod s3_deletion;
6 : pub mod scan_metadata;
7 :
8 : use std::env;
9 : use std::fmt::Display;
10 : use std::time::Duration;
11 :
12 : use anyhow::Context;
13 : use aws_config::environment::EnvironmentVariableCredentialsProvider;
14 : use aws_config::imds::credentials::ImdsCredentialsProvider;
15 : use aws_config::meta::credentials::CredentialsProviderChain;
16 : use aws_config::sso::SsoCredentialsProvider;
17 : use aws_sdk_s3::config::Region;
18 : use aws_sdk_s3::{Client, Config};
19 :
20 : use reqwest::Url;
21 : pub use s3_deletion::S3Deleter;
22 : use std::io::IsTerminal;
23 : use tokio::io::AsyncReadExt;
24 : use tracing::error;
25 : use tracing_appender::non_blocking::WorkerGuard;
26 : use tracing_subscriber::{fmt, prelude::*, EnvFilter};
27 : use utils::id::{TenantId, TenantTimelineId};
28 :
29 : const MAX_RETRIES: usize = 20;
30 : const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
31 :
32 : pub const CLI_NAME: &str = "s3-scrubber";
33 :
34 UBC 0 : #[derive(Debug, Clone)]
35 : pub struct S3Target {
36 : pub bucket_name: String,
37 : pub prefix_in_bucket: String,
38 : pub delimiter: String,
39 : }
40 :
41 0 : #[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
42 : pub enum TraversingDepth {
43 : Tenant,
44 : Timeline,
45 : }
46 :
47 : impl Display for TraversingDepth {
48 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 0 : f.write_str(match self {
50 0 : Self::Tenant => "tenant",
51 0 : Self::Timeline => "timeline",
52 : })
53 0 : }
54 : }
55 :
56 : impl S3Target {
57 0 : pub fn with_sub_segment(&self, new_segment: &str) -> Self {
58 0 : let mut new_self = self.clone();
59 0 : let _ = new_self.prefix_in_bucket.pop();
60 0 : new_self.prefix_in_bucket =
61 0 : [&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter);
62 0 : new_self
63 0 : }
64 : }
65 :
66 0 : #[derive(Clone)]
67 : pub enum RootTarget {
68 : Pageserver(S3Target),
69 : Safekeeper(S3Target),
70 : }
71 :
72 : impl RootTarget {
73 0 : pub fn tenants_root(&self) -> &S3Target {
74 0 : match self {
75 0 : Self::Pageserver(root) => root,
76 0 : Self::Safekeeper(root) => root,
77 : }
78 0 : }
79 :
80 0 : pub fn tenant_root(&self, tenant_id: &TenantId) -> S3Target {
81 0 : self.tenants_root().with_sub_segment(&tenant_id.to_string())
82 0 : }
83 :
84 0 : pub fn timelines_root(&self, tenant_id: &TenantId) -> S3Target {
85 0 : match self {
86 0 : Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
87 0 : Self::Safekeeper(_) => self.tenant_root(tenant_id),
88 : }
89 0 : }
90 :
91 0 : pub fn timeline_root(&self, id: &TenantTimelineId) -> S3Target {
92 0 : self.timelines_root(&id.tenant_id)
93 0 : .with_sub_segment(&id.timeline_id.to_string())
94 0 : }
95 :
96 0 : pub fn bucket_name(&self) -> &str {
97 0 : match self {
98 0 : Self::Pageserver(root) => &root.bucket_name,
99 0 : Self::Safekeeper(root) => &root.bucket_name,
100 : }
101 0 : }
102 :
103 0 : pub fn delimiter(&self) -> &str {
104 0 : match self {
105 0 : Self::Pageserver(root) => &root.delimiter,
106 0 : Self::Safekeeper(root) => &root.delimiter,
107 : }
108 0 : }
109 : }
110 :
111 : pub struct BucketConfig {
112 : pub region: String,
113 : pub bucket: String,
114 :
115 : /// Use SSO if this is set, else rely on AWS_* environment vars
116 : pub sso_account_id: Option<String>,
117 : }
118 :
119 : impl Display for BucketConfig {
120 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 0 : write!(
122 0 : f,
123 0 : "{}/{}/{}",
124 0 : self.sso_account_id.as_deref().unwrap_or("<none>"),
125 0 : self.region,
126 0 : self.bucket
127 0 : )
128 0 : }
129 : }
130 :
131 : impl BucketConfig {
132 0 : pub fn from_env() -> anyhow::Result<Self> {
133 0 : let sso_account_id = env::var("SSO_ACCOUNT_ID").ok();
134 0 : let region = env::var("REGION").context("'REGION' param retrieval")?;
135 0 : let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?;
136 :
137 0 : Ok(Self {
138 0 : region,
139 0 : bucket,
140 0 : sso_account_id,
141 0 : })
142 0 : }
143 : }
144 :
145 : pub struct ConsoleConfig {
146 : pub admin_api_url: Url,
147 : }
148 :
149 : impl ConsoleConfig {
150 0 : pub fn from_env() -> anyhow::Result<Self> {
151 0 : let admin_api_url: Url = env::var("CLOUD_ADMIN_API_URL")
152 0 : .context("'CLOUD_ADMIN_API_URL' param retrieval")?
153 0 : .parse()
154 0 : .context("'CLOUD_ADMIN_API_URL' param parsing")?;
155 :
156 0 : Ok(Self { admin_api_url })
157 0 : }
158 : }
159 :
160 0 : pub fn get_cloud_admin_api_token_or_exit() -> String {
161 0 : match env::var(CLOUD_ADMIN_API_TOKEN_ENV_VAR) {
162 0 : Ok(token) => token,
163 : Err(env::VarError::NotPresent) => {
164 0 : error!("{CLOUD_ADMIN_API_TOKEN_ENV_VAR} env variable is not present");
165 0 : std::process::exit(1);
166 : }
167 0 : Err(env::VarError::NotUnicode(not_unicode_string)) => {
168 0 : error!("{CLOUD_ADMIN_API_TOKEN_ENV_VAR} env variable's value is not a valid unicode string: {not_unicode_string:?}");
169 0 : std::process::exit(1);
170 : }
171 : }
172 0 : }
173 :
174 0 : pub fn init_logging(file_name: &str) -> WorkerGuard {
175 0 : let (file_writer, guard) =
176 0 : tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
177 0 :
178 0 : let file_logs = fmt::Layer::new()
179 0 : .with_target(false)
180 0 : .with_ansi(false)
181 0 : .with_writer(file_writer);
182 0 : let stdout_logs = fmt::Layer::new()
183 0 : .with_ansi(std::io::stdout().is_terminal())
184 0 : .with_target(false)
185 0 : .with_writer(std::io::stdout);
186 0 : tracing_subscriber::registry()
187 0 : .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
188 0 : .with(file_logs)
189 0 : .with(stdout_logs)
190 0 : .init();
191 0 :
192 0 : guard
193 0 : }
194 :
195 0 : pub fn init_s3_client(account_id: Option<String>, bucket_region: Region) -> Client {
196 0 : let credentials_provider = {
197 : // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
198 0 : let chain = CredentialsProviderChain::first_try(
199 0 : "env",
200 0 : EnvironmentVariableCredentialsProvider::new(),
201 0 : );
202 0 :
203 0 : // Use SSO if we were given an account ID
204 0 : match account_id {
205 0 : Some(sso_account) => chain.or_else(
206 0 : "sso",
207 0 : SsoCredentialsProvider::builder()
208 0 : .account_id(sso_account)
209 0 : .role_name("PowerUserAccess")
210 0 : .start_url("https://neondb.awsapps.com/start")
211 0 : .region(Region::from_static("eu-central-1"))
212 0 : .build(),
213 0 : ),
214 0 : None => chain,
215 : }
216 0 : .or_else(
217 0 : // Finally try IMDS
218 0 : "imds",
219 0 : ImdsCredentialsProvider::builder().build(),
220 0 : )
221 0 : };
222 0 :
223 0 : let mut builder = Config::builder()
224 0 : .region(bucket_region)
225 0 : .credentials_provider(credentials_provider);
226 :
227 0 : if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") {
228 0 : builder = builder.endpoint_url(endpoint)
229 0 : }
230 :
231 0 : Client::from_conf(builder.build())
232 0 : }
233 :
234 0 : async fn list_objects_with_retries(
235 0 : s3_client: &Client,
236 0 : s3_target: &S3Target,
237 0 : continuation_token: Option<String>,
238 0 : ) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
239 0 : for _ in 0..MAX_RETRIES {
240 0 : match s3_client
241 0 : .list_objects_v2()
242 0 : .bucket(&s3_target.bucket_name)
243 0 : .prefix(&s3_target.prefix_in_bucket)
244 0 : .delimiter(&s3_target.delimiter)
245 0 : .set_continuation_token(continuation_token.clone())
246 0 : .send()
247 0 : .await
248 : {
249 0 : Ok(response) => return Ok(response),
250 0 : Err(e) => {
251 0 : error!("list_objects_v2 query failed: {e}");
252 0 : tokio::time::sleep(Duration::from_secs(1)).await;
253 : }
254 : }
255 : }
256 :
257 0 : anyhow::bail!("Failed to list objects {MAX_RETRIES} times")
258 0 : }
259 :
260 0 : async fn download_object_with_retries(
261 0 : s3_client: &Client,
262 0 : bucket_name: &str,
263 0 : key: &str,
264 0 : ) -> anyhow::Result<Vec<u8>> {
265 0 : for _ in 0..MAX_RETRIES {
266 0 : let mut body_buf = Vec::new();
267 0 : let response_stream = match s3_client
268 0 : .get_object()
269 0 : .bucket(bucket_name)
270 0 : .key(key)
271 0 : .send()
272 0 : .await
273 : {
274 0 : Ok(response) => response,
275 0 : Err(e) => {
276 0 : error!("Failed to download object for key {key}: {e}");
277 0 : tokio::time::sleep(Duration::from_secs(1)).await;
278 0 : continue;
279 : }
280 : };
281 :
282 0 : match response_stream
283 0 : .body
284 0 : .into_async_read()
285 0 : .read_to_end(&mut body_buf)
286 0 : .await
287 : {
288 0 : Ok(bytes_read) => {
289 0 : tracing::info!("Downloaded {bytes_read} bytes for object object with key {key}");
290 0 : return Ok(body_buf);
291 : }
292 0 : Err(e) => {
293 0 : error!("Failed to stream object body for key {key}: {e}");
294 0 : tokio::time::sleep(Duration::from_secs(1)).await;
295 : }
296 : }
297 : }
298 :
299 0 : anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
300 0 : }
|