Line data Source code
1 : #![deny(unsafe_code)]
2 : #![deny(clippy::undocumented_unsafe_blocks)]
3 : pub mod checks;
4 : pub mod cloud_admin_api;
5 : pub mod find_large_objects;
6 : pub mod garbage;
7 : pub mod metadata_stream;
8 : pub mod pageserver_physical_gc;
9 : pub mod scan_pageserver_metadata;
10 : pub mod scan_safekeeper_metadata;
11 : pub mod tenant_snapshot;
12 :
13 : use std::env;
14 : use std::fmt::Display;
15 : use std::sync::Arc;
16 : use std::time::Duration;
17 :
18 : use anyhow::Context;
19 : use aws_config::environment::EnvironmentVariableCredentialsProvider;
20 : use aws_config::imds::credentials::ImdsCredentialsProvider;
21 : use aws_config::meta::credentials::CredentialsProviderChain;
22 : use aws_config::profile::ProfileFileCredentialsProvider;
23 : use aws_config::retry::RetryConfig;
24 : use aws_config::sso::SsoCredentialsProvider;
25 : use aws_config::BehaviorVersion;
26 : use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep};
27 : use aws_sdk_s3::{Client, Config};
28 : use aws_smithy_async::rt::sleep::TokioSleep;
29 :
30 : use camino::{Utf8Path, Utf8PathBuf};
31 : use clap::ValueEnum;
32 : use pageserver::tenant::TENANTS_SEGMENT_NAME;
33 : use pageserver_api::shard::TenantShardId;
34 : use reqwest::Url;
35 : use serde::{Deserialize, Serialize};
36 : use tokio::io::AsyncReadExt;
37 : use tracing::error;
38 : use tracing_appender::non_blocking::WorkerGuard;
39 : use tracing_subscriber::{fmt, prelude::*, EnvFilter};
40 : use utils::fs_ext;
41 : use utils::id::{TenantId, TimelineId};
42 :
43 : const MAX_RETRIES: usize = 20;
44 : const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
45 :
46 : #[derive(Debug, Clone)]
47 : pub struct S3Target {
48 : pub bucket_name: String,
49 : /// This `prefix_in_bucket` is only equal to the PS/SK config of the same
50 : /// name for the RootTarget: other instances of S3Target will have prefix_in_bucket
51 : /// with extra parts.
52 : pub prefix_in_bucket: String,
53 : pub delimiter: String,
54 : }
55 :
56 : /// Convenience for referring to timelines within a particular shard: more ergonomic
57 : /// than using a 2-tuple.
58 : ///
59 : /// This is the shard-aware equivalent of TenantTimelineId. It's defined here rather
60 : /// than somewhere more broadly exposed, because this kind of thing is rarely needed
61 : /// in the pageserver, as all timeline objects existing in the scope of a particular
62 : /// tenant: the scrubber is different in that it handles collections of data referring to many
63 : /// TenantShardTimelineIds in on place.
64 0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)]
65 : pub struct TenantShardTimelineId {
66 : tenant_shard_id: TenantShardId,
67 : timeline_id: TimelineId,
68 : }
69 :
70 : impl TenantShardTimelineId {
71 0 : fn new(tenant_shard_id: TenantShardId, timeline_id: TimelineId) -> Self {
72 0 : Self {
73 0 : tenant_shard_id,
74 0 : timeline_id,
75 0 : }
76 0 : }
77 : }
78 :
79 : impl Display for TenantShardTimelineId {
80 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 0 : write!(f, "{}/{}", self.tenant_shard_id, self.timeline_id)
82 0 : }
83 : }
84 :
85 0 : #[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
86 : pub enum TraversingDepth {
87 : Tenant,
88 : Timeline,
89 : }
90 :
91 : impl Display for TraversingDepth {
92 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 0 : f.write_str(match self {
94 0 : Self::Tenant => "tenant",
95 0 : Self::Timeline => "timeline",
96 : })
97 0 : }
98 : }
99 :
100 0 : #[derive(ValueEnum, Clone, Copy, Eq, PartialEq, Debug, Serialize, Deserialize)]
101 : pub enum NodeKind {
102 : Safekeeper,
103 : Pageserver,
104 : }
105 :
106 : impl NodeKind {
107 0 : fn as_str(&self) -> &'static str {
108 0 : match self {
109 0 : Self::Safekeeper => "safekeeper",
110 0 : Self::Pageserver => "pageserver",
111 : }
112 0 : }
113 : }
114 :
115 : impl Display for NodeKind {
116 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 0 : f.write_str(self.as_str())
118 0 : }
119 : }
120 :
121 : impl S3Target {
122 0 : pub fn with_sub_segment(&self, new_segment: &str) -> Self {
123 0 : let mut new_self = self.clone();
124 0 : if new_self.prefix_in_bucket.is_empty() {
125 0 : new_self.prefix_in_bucket = format!("/{}/", new_segment);
126 0 : } else {
127 0 : if new_self.prefix_in_bucket.ends_with('/') {
128 0 : new_self.prefix_in_bucket.pop();
129 0 : }
130 0 : new_self.prefix_in_bucket =
131 0 : [&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter);
132 : }
133 0 : new_self
134 0 : }
135 : }
136 :
137 : #[derive(Clone)]
138 : pub enum RootTarget {
139 : Pageserver(S3Target),
140 : Safekeeper(S3Target),
141 : }
142 :
143 : impl RootTarget {
144 0 : pub fn tenants_root(&self) -> S3Target {
145 0 : match self {
146 0 : Self::Pageserver(root) => root.with_sub_segment(TENANTS_SEGMENT_NAME),
147 0 : Self::Safekeeper(root) => root.clone(),
148 : }
149 0 : }
150 :
151 0 : pub fn tenant_root(&self, tenant_id: &TenantShardId) -> S3Target {
152 0 : match self {
153 0 : Self::Pageserver(_) => self.tenants_root().with_sub_segment(&tenant_id.to_string()),
154 0 : Self::Safekeeper(_) => self
155 0 : .tenants_root()
156 0 : .with_sub_segment(&tenant_id.tenant_id.to_string()),
157 : }
158 0 : }
159 :
160 0 : pub(crate) fn tenant_shards_prefix(&self, tenant_id: &TenantId) -> S3Target {
161 0 : // Only pageserver remote storage contains tenant-shards
162 0 : assert!(matches!(self, Self::Pageserver(_)));
163 0 : let Self::Pageserver(root) = self else {
164 0 : panic!();
165 : };
166 :
167 0 : S3Target {
168 0 : bucket_name: root.bucket_name.clone(),
169 0 : prefix_in_bucket: format!(
170 0 : "{}/{TENANTS_SEGMENT_NAME}/{tenant_id}",
171 0 : root.prefix_in_bucket
172 0 : ),
173 0 : delimiter: root.delimiter.clone(),
174 0 : }
175 0 : }
176 :
177 0 : pub fn timelines_root(&self, tenant_id: &TenantShardId) -> S3Target {
178 0 : match self {
179 0 : Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
180 0 : Self::Safekeeper(_) => self.tenant_root(tenant_id),
181 : }
182 0 : }
183 :
184 0 : pub fn timeline_root(&self, id: &TenantShardTimelineId) -> S3Target {
185 0 : self.timelines_root(&id.tenant_shard_id)
186 0 : .with_sub_segment(&id.timeline_id.to_string())
187 0 : }
188 :
189 0 : pub fn bucket_name(&self) -> &str {
190 0 : match self {
191 0 : Self::Pageserver(root) => &root.bucket_name,
192 0 : Self::Safekeeper(root) => &root.bucket_name,
193 : }
194 0 : }
195 :
196 0 : pub fn delimiter(&self) -> &str {
197 0 : match self {
198 0 : Self::Pageserver(root) => &root.delimiter,
199 0 : Self::Safekeeper(root) => &root.delimiter,
200 : }
201 0 : }
202 : }
203 :
204 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
205 : #[serde(deny_unknown_fields)]
206 : pub struct BucketConfig {
207 : pub region: String,
208 : pub bucket: String,
209 : pub prefix_in_bucket: Option<String>,
210 : }
211 :
212 : impl BucketConfig {
213 0 : pub fn from_env() -> anyhow::Result<Self> {
214 0 : let region = env::var("REGION").context("'REGION' param retrieval")?;
215 0 : let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?;
216 0 : let prefix_in_bucket = env::var("BUCKET_PREFIX").ok();
217 0 :
218 0 : Ok(Self {
219 0 : region,
220 0 : bucket,
221 0 : prefix_in_bucket,
222 0 : })
223 0 : }
224 : }
225 :
226 : pub struct ConsoleConfig {
227 : pub token: String,
228 : pub base_url: Url,
229 : }
230 :
231 : impl ConsoleConfig {
232 0 : pub fn from_env() -> anyhow::Result<Self> {
233 0 : let base_url: Url = env::var("CLOUD_ADMIN_API_URL")
234 0 : .context("'CLOUD_ADMIN_API_URL' param retrieval")?
235 0 : .parse()
236 0 : .context("'CLOUD_ADMIN_API_URL' param parsing")?;
237 :
238 0 : let token = env::var(CLOUD_ADMIN_API_TOKEN_ENV_VAR)
239 0 : .context("'CLOUD_ADMIN_API_TOKEN' environment variable fetch")?;
240 :
241 0 : Ok(Self { base_url, token })
242 0 : }
243 : }
244 :
245 0 : pub fn init_logging(file_name: &str) -> WorkerGuard {
246 0 : let (file_writer, guard) =
247 0 : tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
248 0 :
249 0 : let file_logs = fmt::Layer::new()
250 0 : .with_target(false)
251 0 : .with_ansi(false)
252 0 : .with_writer(file_writer);
253 0 : let stderr_logs = fmt::Layer::new()
254 0 : .with_target(false)
255 0 : .with_writer(std::io::stderr);
256 0 : tracing_subscriber::registry()
257 0 : .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
258 0 : .with(file_logs)
259 0 : .with(stderr_logs)
260 0 : .init();
261 0 :
262 0 : guard
263 0 : }
264 :
265 0 : pub fn init_s3_client(bucket_region: Region) -> Client {
266 0 : let credentials_provider = {
267 : // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
268 0 : let chain = CredentialsProviderChain::first_try(
269 0 : "env",
270 0 : EnvironmentVariableCredentialsProvider::new(),
271 0 : )
272 0 : // uses "AWS_PROFILE" / `aws sso login --profile <profile>`
273 0 : .or_else(
274 0 : "profile-sso",
275 0 : ProfileFileCredentialsProvider::builder().build(),
276 0 : );
277 0 :
278 0 : // Use SSO if we were given an account ID
279 0 : match std::env::var("SSO_ACCOUNT_ID").ok() {
280 0 : Some(sso_account) => chain.or_else(
281 0 : "sso",
282 0 : SsoCredentialsProvider::builder()
283 0 : .account_id(sso_account)
284 0 : .role_name("PowerUserAccess")
285 0 : .start_url("https://neondb.awsapps.com/start")
286 0 : .region(bucket_region.clone())
287 0 : .build(),
288 0 : ),
289 0 : None => chain,
290 : }
291 0 : .or_else(
292 0 : // Finally try IMDS
293 0 : "imds",
294 0 : ImdsCredentialsProvider::builder().build(),
295 0 : )
296 0 : };
297 0 :
298 0 : let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
299 0 :
300 0 : let mut builder = Config::builder()
301 0 : .behavior_version(
302 0 : #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
303 0 : BehaviorVersion::v2023_11_09(),
304 0 : )
305 0 : .region(bucket_region)
306 0 : .retry_config(RetryConfig::adaptive().with_max_attempts(3))
307 0 : .sleep_impl(SharedAsyncSleep::from(sleep_impl))
308 0 : .credentials_provider(credentials_provider);
309 :
310 0 : if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") {
311 0 : builder = builder.endpoint_url(endpoint)
312 0 : }
313 :
314 0 : Client::from_conf(builder.build())
315 0 : }
316 :
317 0 : fn init_remote(
318 0 : bucket_config: BucketConfig,
319 0 : node_kind: NodeKind,
320 0 : ) -> anyhow::Result<(Arc<Client>, RootTarget)> {
321 0 : let bucket_region = Region::new(bucket_config.region);
322 0 : let delimiter = "/".to_string();
323 0 : let s3_client = Arc::new(init_s3_client(bucket_region));
324 :
325 0 : let s3_root = match node_kind {
326 0 : NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
327 0 : bucket_name: bucket_config.bucket,
328 0 : prefix_in_bucket: bucket_config
329 0 : .prefix_in_bucket
330 0 : .unwrap_or("pageserver/v1".to_string()),
331 0 : delimiter,
332 0 : }),
333 0 : NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target {
334 0 : bucket_name: bucket_config.bucket,
335 0 : prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or("wal/".to_string()),
336 0 : delimiter,
337 0 : }),
338 : };
339 :
340 0 : Ok((s3_client, s3_root))
341 0 : }
342 :
343 0 : async fn list_objects_with_retries(
344 0 : s3_client: &Client,
345 0 : s3_target: &S3Target,
346 0 : continuation_token: Option<String>,
347 0 : ) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
348 0 : for _ in 0..MAX_RETRIES {
349 0 : match s3_client
350 0 : .list_objects_v2()
351 0 : .bucket(&s3_target.bucket_name)
352 0 : .prefix(&s3_target.prefix_in_bucket)
353 0 : .delimiter(&s3_target.delimiter)
354 0 : .set_continuation_token(continuation_token.clone())
355 0 : .send()
356 0 : .await
357 : {
358 0 : Ok(response) => return Ok(response),
359 0 : Err(e) => {
360 0 : error!(
361 0 : "list_objects_v2 query failed: {e}, bucket_name={}, prefix={}, delimiter={}",
362 : s3_target.bucket_name, s3_target.prefix_in_bucket, s3_target.delimiter
363 : );
364 0 : tokio::time::sleep(Duration::from_secs(1)).await;
365 : }
366 : }
367 : }
368 :
369 0 : anyhow::bail!("Failed to list objects {MAX_RETRIES} times")
370 0 : }
371 :
372 0 : async fn download_object_with_retries(
373 0 : s3_client: &Client,
374 0 : bucket_name: &str,
375 0 : key: &str,
376 0 : ) -> anyhow::Result<Vec<u8>> {
377 0 : for _ in 0..MAX_RETRIES {
378 0 : let mut body_buf = Vec::new();
379 0 : let response_stream = match s3_client
380 0 : .get_object()
381 0 : .bucket(bucket_name)
382 0 : .key(key)
383 0 : .send()
384 0 : .await
385 : {
386 0 : Ok(response) => response,
387 0 : Err(e) => {
388 0 : error!("Failed to download object for key {key}: {e}");
389 0 : tokio::time::sleep(Duration::from_secs(1)).await;
390 0 : continue;
391 : }
392 : };
393 :
394 0 : match response_stream
395 0 : .body
396 0 : .into_async_read()
397 0 : .read_to_end(&mut body_buf)
398 0 : .await
399 : {
400 0 : Ok(bytes_read) => {
401 0 : tracing::debug!("Downloaded {bytes_read} bytes for object {key}");
402 0 : return Ok(body_buf);
403 : }
404 0 : Err(e) => {
405 0 : error!("Failed to stream object body for key {key}: {e}");
406 0 : tokio::time::sleep(Duration::from_secs(1)).await;
407 : }
408 : }
409 : }
410 :
411 0 : anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
412 0 : }
413 :
414 0 : async fn download_object_to_file(
415 0 : s3_client: &Client,
416 0 : bucket_name: &str,
417 0 : key: &str,
418 0 : version_id: Option<&str>,
419 0 : local_path: &Utf8Path,
420 0 : ) -> anyhow::Result<()> {
421 0 : let tmp_path = Utf8PathBuf::from(format!("{local_path}.tmp"));
422 0 : for _ in 0..MAX_RETRIES {
423 0 : tokio::fs::remove_file(&tmp_path)
424 0 : .await
425 0 : .or_else(fs_ext::ignore_not_found)?;
426 :
427 0 : let mut file = tokio::fs::File::create(&tmp_path)
428 0 : .await
429 0 : .context("Opening output file")?;
430 :
431 0 : let request = s3_client.get_object().bucket(bucket_name).key(key);
432 :
433 0 : let request = match version_id {
434 0 : Some(version_id) => request.version_id(version_id),
435 0 : None => request,
436 : };
437 :
438 0 : let response_stream = match request.send().await {
439 0 : Ok(response) => response,
440 0 : Err(e) => {
441 0 : error!(
442 0 : "Failed to download object for key {key} version {}: {e:#}",
443 0 : version_id.unwrap_or("")
444 : );
445 0 : tokio::time::sleep(Duration::from_secs(1)).await;
446 0 : continue;
447 : }
448 : };
449 :
450 0 : let mut read_stream = response_stream.body.into_async_read();
451 0 :
452 0 : tokio::io::copy(&mut read_stream, &mut file).await?;
453 :
454 0 : tokio::fs::rename(&tmp_path, local_path).await?;
455 0 : return Ok(());
456 : }
457 :
458 0 : anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
459 0 : }
|