Line data Source code
1 : #![deny(unsafe_code)]
2 : #![deny(clippy::undocumented_unsafe_blocks)]
3 : pub mod checks;
4 : pub mod cloud_admin_api;
5 : pub mod garbage;
6 : pub mod metadata_stream;
7 : pub mod scan_metadata;
8 :
9 : use std::env;
10 : use std::fmt::Display;
11 : use std::sync::Arc;
12 : use std::time::Duration;
13 :
14 : use anyhow::Context;
15 : use aws_config::environment::EnvironmentVariableCredentialsProvider;
16 : use aws_config::imds::credentials::ImdsCredentialsProvider;
17 : use aws_config::meta::credentials::CredentialsProviderChain;
18 : use aws_config::profile::ProfileFileCredentialsProvider;
19 : use aws_config::retry::RetryConfig;
20 : use aws_config::sso::SsoCredentialsProvider;
21 : use aws_config::BehaviorVersion;
22 : use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep};
23 : use aws_sdk_s3::{Client, Config};
24 : use aws_smithy_async::rt::sleep::TokioSleep;
25 :
26 : use clap::ValueEnum;
27 : use pageserver::tenant::TENANTS_SEGMENT_NAME;
28 : use pageserver_api::shard::TenantShardId;
29 : use reqwest::Url;
30 : use serde::{Deserialize, Serialize};
31 : use std::io::IsTerminal;
32 : use tokio::io::AsyncReadExt;
33 : use tracing::error;
34 : use tracing_appender::non_blocking::WorkerGuard;
35 : use tracing_subscriber::{fmt, prelude::*, EnvFilter};
36 : use utils::id::TimelineId;
37 :
38 : const MAX_RETRIES: usize = 20;
39 : const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
40 :
41 0 : #[derive(Debug, Clone)]
42 : pub struct S3Target {
43 : pub bucket_name: String,
44 : /// This `prefix_in_bucket` is only equal to the PS/SK config of the same
45 : /// name for the RootTarget: other instances of S3Target will have prefix_in_bucket
46 : /// with extra parts.
47 : pub prefix_in_bucket: String,
48 : pub delimiter: String,
49 : }
50 :
51 : /// Convenience for referring to timelines within a particular shard: more ergonomic
52 : /// than using a 2-tuple.
53 : ///
54 : /// This is the shard-aware equivalent of TenantTimelineId. It's defined here rather
55 : /// than somewhere more broadly exposed, because this kind of thing is rarely needed
56 : /// in the pageserver, as all timeline objects existing in the scope of a particular
57 : /// tenant: the scrubber is different in that it handles collections of data referring to many
58 : /// TenantShardTimelineIds in on place.
59 0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)]
60 : pub struct TenantShardTimelineId {
61 : tenant_shard_id: TenantShardId,
62 : timeline_id: TimelineId,
63 : }
64 :
65 : impl TenantShardTimelineId {
66 0 : fn new(tenant_shard_id: TenantShardId, timeline_id: TimelineId) -> Self {
67 0 : Self {
68 0 : tenant_shard_id,
69 0 : timeline_id,
70 0 : }
71 0 : }
72 : }
73 :
74 : impl Display for TenantShardTimelineId {
75 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 0 : write!(f, "{}/{}", self.tenant_shard_id, self.timeline_id)
77 0 : }
78 : }
79 :
80 0 : #[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
81 : pub enum TraversingDepth {
82 : Tenant,
83 : Timeline,
84 : }
85 :
86 : impl Display for TraversingDepth {
87 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 0 : f.write_str(match self {
89 0 : Self::Tenant => "tenant",
90 0 : Self::Timeline => "timeline",
91 : })
92 0 : }
93 : }
94 :
95 0 : #[derive(ValueEnum, Clone, Copy, Eq, PartialEq, Debug, Serialize, Deserialize)]
96 : pub enum NodeKind {
97 : Safekeeper,
98 : Pageserver,
99 : }
100 :
101 : impl NodeKind {
102 0 : fn as_str(&self) -> &'static str {
103 0 : match self {
104 0 : Self::Safekeeper => "safekeeper",
105 0 : Self::Pageserver => "pageserver",
106 : }
107 0 : }
108 : }
109 :
110 : impl Display for NodeKind {
111 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 0 : f.write_str(self.as_str())
113 0 : }
114 : }
115 :
116 : impl S3Target {
117 0 : pub fn with_sub_segment(&self, new_segment: &str) -> Self {
118 0 : let mut new_self = self.clone();
119 0 : if new_self.prefix_in_bucket.is_empty() {
120 0 : new_self.prefix_in_bucket = format!("/{}/", new_segment);
121 0 : } else {
122 0 : if new_self.prefix_in_bucket.ends_with('/') {
123 0 : new_self.prefix_in_bucket.pop();
124 0 : }
125 0 : new_self.prefix_in_bucket =
126 0 : [&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter);
127 : }
128 0 : new_self
129 0 : }
130 : }
131 :
132 0 : #[derive(Clone)]
133 : pub enum RootTarget {
134 : Pageserver(S3Target),
135 : Safekeeper(S3Target),
136 : }
137 :
138 : impl RootTarget {
139 0 : pub fn tenants_root(&self) -> S3Target {
140 0 : match self {
141 0 : Self::Pageserver(root) => root.with_sub_segment(TENANTS_SEGMENT_NAME),
142 0 : Self::Safekeeper(root) => root.with_sub_segment("wal"),
143 : }
144 0 : }
145 :
146 0 : pub fn tenant_root(&self, tenant_id: &TenantShardId) -> S3Target {
147 0 : self.tenants_root().with_sub_segment(&tenant_id.to_string())
148 0 : }
149 :
150 0 : pub fn timelines_root(&self, tenant_id: &TenantShardId) -> S3Target {
151 0 : match self {
152 0 : Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
153 0 : Self::Safekeeper(_) => self.tenant_root(tenant_id),
154 : }
155 0 : }
156 :
157 0 : pub fn timeline_root(&self, id: &TenantShardTimelineId) -> S3Target {
158 0 : self.timelines_root(&id.tenant_shard_id)
159 0 : .with_sub_segment(&id.timeline_id.to_string())
160 0 : }
161 :
162 0 : pub fn bucket_name(&self) -> &str {
163 0 : match self {
164 0 : Self::Pageserver(root) => &root.bucket_name,
165 0 : Self::Safekeeper(root) => &root.bucket_name,
166 : }
167 0 : }
168 :
169 0 : pub fn delimiter(&self) -> &str {
170 0 : match self {
171 0 : Self::Pageserver(root) => &root.delimiter,
172 0 : Self::Safekeeper(root) => &root.delimiter,
173 : }
174 0 : }
175 : }
176 :
177 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
178 : pub struct BucketConfig {
179 : pub region: String,
180 : pub bucket: String,
181 : pub prefix_in_bucket: Option<String>,
182 :
183 : /// Use SSO if this is set, else rely on AWS_* environment vars
184 : pub sso_account_id: Option<String>,
185 : }
186 :
187 : impl Display for BucketConfig {
188 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189 0 : write!(
190 0 : f,
191 0 : "{}/{}/{}",
192 0 : self.sso_account_id.as_deref().unwrap_or("<none>"),
193 0 : self.region,
194 0 : self.bucket
195 0 : )
196 0 : }
197 : }
198 :
199 : impl BucketConfig {
200 0 : pub fn from_env() -> anyhow::Result<Self> {
201 0 : let sso_account_id = env::var("SSO_ACCOUNT_ID").ok();
202 0 : let region = env::var("REGION").context("'REGION' param retrieval")?;
203 0 : let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?;
204 0 : let prefix_in_bucket = env::var("BUCKET_PREFIX").ok();
205 0 :
206 0 : Ok(Self {
207 0 : region,
208 0 : bucket,
209 0 : prefix_in_bucket,
210 0 : sso_account_id,
211 0 : })
212 0 : }
213 : }
214 :
215 : pub struct ConsoleConfig {
216 : pub token: String,
217 : pub base_url: Url,
218 : }
219 :
220 : impl ConsoleConfig {
221 0 : pub fn from_env() -> anyhow::Result<Self> {
222 0 : let base_url: Url = env::var("CLOUD_ADMIN_API_URL")
223 0 : .context("'CLOUD_ADMIN_API_URL' param retrieval")?
224 0 : .parse()
225 0 : .context("'CLOUD_ADMIN_API_URL' param parsing")?;
226 :
227 0 : let token = env::var(CLOUD_ADMIN_API_TOKEN_ENV_VAR)
228 0 : .context("'CLOUD_ADMIN_API_TOKEN' environment variable fetch")?;
229 :
230 0 : Ok(Self { base_url, token })
231 0 : }
232 : }
233 :
234 0 : pub fn init_logging(file_name: &str) -> WorkerGuard {
235 0 : let (file_writer, guard) =
236 0 : tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
237 0 :
238 0 : let file_logs = fmt::Layer::new()
239 0 : .with_target(false)
240 0 : .with_ansi(false)
241 0 : .with_writer(file_writer);
242 0 : let stderr_logs = fmt::Layer::new()
243 0 : .with_ansi(std::io::stderr().is_terminal())
244 0 : .with_target(false)
245 0 : .with_writer(std::io::stderr);
246 0 : tracing_subscriber::registry()
247 0 : .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
248 0 : .with(file_logs)
249 0 : .with(stderr_logs)
250 0 : .init();
251 0 :
252 0 : guard
253 0 : }
254 :
255 0 : pub fn init_s3_client(account_id: Option<String>, bucket_region: Region) -> Client {
256 0 : let credentials_provider = {
257 : // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
258 0 : let chain = CredentialsProviderChain::first_try(
259 0 : "env",
260 0 : EnvironmentVariableCredentialsProvider::new(),
261 0 : )
262 0 : // uses "AWS_PROFILE" / `aws sso login --profile <profile>`
263 0 : .or_else(
264 0 : "profile-sso",
265 0 : ProfileFileCredentialsProvider::builder().build(),
266 0 : );
267 0 :
268 0 : // Use SSO if we were given an account ID
269 0 : match account_id {
270 0 : Some(sso_account) => chain.or_else(
271 0 : "sso",
272 0 : SsoCredentialsProvider::builder()
273 0 : .account_id(sso_account)
274 0 : .role_name("PowerUserAccess")
275 0 : .start_url("https://neondb.awsapps.com/start")
276 0 : .region(bucket_region.clone())
277 0 : .build(),
278 0 : ),
279 0 : None => chain,
280 : }
281 0 : .or_else(
282 0 : // Finally try IMDS
283 0 : "imds",
284 0 : ImdsCredentialsProvider::builder().build(),
285 0 : )
286 0 : };
287 0 :
288 0 : let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
289 0 :
290 0 : let mut builder = Config::builder()
291 0 : .behavior_version(BehaviorVersion::v2023_11_09())
292 0 : .region(bucket_region)
293 0 : .retry_config(RetryConfig::adaptive().with_max_attempts(3))
294 0 : .sleep_impl(SharedAsyncSleep::from(sleep_impl))
295 0 : .credentials_provider(credentials_provider);
296 :
297 0 : if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") {
298 0 : builder = builder.endpoint_url(endpoint)
299 0 : }
300 :
301 0 : Client::from_conf(builder.build())
302 0 : }
303 :
304 0 : fn init_remote(
305 0 : bucket_config: BucketConfig,
306 0 : node_kind: NodeKind,
307 0 : ) -> anyhow::Result<(Arc<Client>, RootTarget)> {
308 0 : let bucket_region = Region::new(bucket_config.region);
309 0 : let delimiter = "/".to_string();
310 0 : let s3_client = Arc::new(init_s3_client(bucket_config.sso_account_id, bucket_region));
311 :
312 0 : let s3_root = match node_kind {
313 0 : NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
314 0 : bucket_name: bucket_config.bucket,
315 0 : prefix_in_bucket: bucket_config
316 0 : .prefix_in_bucket
317 0 : .unwrap_or("pageserver/v1".to_string()),
318 0 : delimiter,
319 0 : }),
320 0 : NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target {
321 0 : bucket_name: bucket_config.bucket,
322 0 : prefix_in_bucket: bucket_config
323 0 : .prefix_in_bucket
324 0 : .unwrap_or("safekeeper/v1".to_string()),
325 0 : delimiter,
326 0 : }),
327 : };
328 :
329 0 : Ok((s3_client, s3_root))
330 0 : }
331 :
332 0 : async fn list_objects_with_retries(
333 0 : s3_client: &Client,
334 0 : s3_target: &S3Target,
335 0 : continuation_token: Option<String>,
336 0 : ) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
337 0 : for _ in 0..MAX_RETRIES {
338 0 : match s3_client
339 0 : .list_objects_v2()
340 0 : .bucket(&s3_target.bucket_name)
341 0 : .prefix(&s3_target.prefix_in_bucket)
342 0 : .delimiter(&s3_target.delimiter)
343 0 : .set_continuation_token(continuation_token.clone())
344 0 : .send()
345 0 : .await
346 : {
347 0 : Ok(response) => return Ok(response),
348 0 : Err(e) => {
349 0 : error!("list_objects_v2 query failed: {e}");
350 0 : tokio::time::sleep(Duration::from_secs(1)).await;
351 : }
352 : }
353 : }
354 :
355 0 : anyhow::bail!("Failed to list objects {MAX_RETRIES} times")
356 0 : }
357 :
358 0 : async fn download_object_with_retries(
359 0 : s3_client: &Client,
360 0 : bucket_name: &str,
361 0 : key: &str,
362 0 : ) -> anyhow::Result<Vec<u8>> {
363 0 : for _ in 0..MAX_RETRIES {
364 0 : let mut body_buf = Vec::new();
365 0 : let response_stream = match s3_client
366 0 : .get_object()
367 0 : .bucket(bucket_name)
368 0 : .key(key)
369 0 : .send()
370 0 : .await
371 : {
372 0 : Ok(response) => response,
373 0 : Err(e) => {
374 0 : error!("Failed to download object for key {key}: {e}");
375 0 : tokio::time::sleep(Duration::from_secs(1)).await;
376 0 : continue;
377 : }
378 : };
379 :
380 0 : match response_stream
381 0 : .body
382 0 : .into_async_read()
383 0 : .read_to_end(&mut body_buf)
384 0 : .await
385 : {
386 0 : Ok(bytes_read) => {
387 0 : tracing::info!("Downloaded {bytes_read} bytes for object object with key {key}");
388 0 : return Ok(body_buf);
389 : }
390 0 : Err(e) => {
391 0 : error!("Failed to stream object body for key {key}: {e}");
392 0 : tokio::time::sleep(Duration::from_secs(1)).await;
393 : }
394 : }
395 : }
396 :
397 0 : anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
398 0 : }
|