LCOV - code coverage report
Current view: top level - s3_scrubber/src - lib.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 0.0 % 288 0
Test Date: 2024-05-10 13:18:37 Functions: 0.0 % 65 0

            Line data    Source code
       1              : #![deny(unsafe_code)]
       2              : #![deny(clippy::undocumented_unsafe_blocks)]
       3              : pub mod checks;
       4              : pub mod cloud_admin_api;
       5              : pub mod garbage;
       6              : pub mod metadata_stream;
       7              : pub mod scan_pageserver_metadata;
       8              : pub mod scan_safekeeper_metadata;
       9              : pub mod tenant_snapshot;
      10              : 
      11              : use std::env;
      12              : use std::fmt::Display;
      13              : use std::sync::Arc;
      14              : use std::time::Duration;
      15              : 
      16              : use anyhow::Context;
      17              : use aws_config::environment::EnvironmentVariableCredentialsProvider;
      18              : use aws_config::imds::credentials::ImdsCredentialsProvider;
      19              : use aws_config::meta::credentials::CredentialsProviderChain;
      20              : use aws_config::profile::ProfileFileCredentialsProvider;
      21              : use aws_config::retry::RetryConfig;
      22              : use aws_config::sso::SsoCredentialsProvider;
      23              : use aws_config::BehaviorVersion;
      24              : use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep};
      25              : use aws_sdk_s3::{Client, Config};
      26              : use aws_smithy_async::rt::sleep::TokioSleep;
      27              : 
      28              : use camino::{Utf8Path, Utf8PathBuf};
      29              : use clap::ValueEnum;
      30              : use pageserver::tenant::TENANTS_SEGMENT_NAME;
      31              : use pageserver_api::shard::TenantShardId;
      32              : use reqwest::Url;
      33              : use serde::{Deserialize, Serialize};
      34              : use tokio::io::AsyncReadExt;
      35              : use tracing::error;
      36              : use tracing_appender::non_blocking::WorkerGuard;
      37              : use tracing_subscriber::{fmt, prelude::*, EnvFilter};
      38              : use utils::fs_ext;
      39              : use utils::id::{TenantId, TimelineId};
      40              : 
      41              : const MAX_RETRIES: usize = 20;
      42              : const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
      43              : 
      44              : #[derive(Debug, Clone)]
      45              : pub struct S3Target {
      46              :     pub bucket_name: String,
      47              :     /// This `prefix_in_bucket` is only equal to the PS/SK config of the same
      48              :     /// name for the RootTarget: other instances of S3Target will have prefix_in_bucket
      49              :     /// with extra parts.
      50              :     pub prefix_in_bucket: String,
      51              :     pub delimiter: String,
      52              : }
      53              : 
      54              : /// Convenience for referring to timelines within a particular shard: more ergonomic
      55              : /// than using a 2-tuple.
      56              : ///
      57              : /// This is the shard-aware equivalent of TenantTimelineId.  It's defined here rather
      58              : /// than somewhere more broadly exposed, because this kind of thing is rarely needed
      59              : /// in the pageserver, as all timeline objects existing in the scope of a particular
      60              : /// tenant: the scrubber is different in that it handles collections of data referring to many
      61              : /// TenantShardTimelineIds in on place.
      62            0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)]
      63              : pub struct TenantShardTimelineId {
      64              :     tenant_shard_id: TenantShardId,
      65              :     timeline_id: TimelineId,
      66              : }
      67              : 
      68              : impl TenantShardTimelineId {
      69            0 :     fn new(tenant_shard_id: TenantShardId, timeline_id: TimelineId) -> Self {
      70            0 :         Self {
      71            0 :             tenant_shard_id,
      72            0 :             timeline_id,
      73            0 :         }
      74            0 :     }
      75              : }
      76              : 
      77              : impl Display for TenantShardTimelineId {
      78            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      79            0 :         write!(f, "{}/{}", self.tenant_shard_id, self.timeline_id)
      80            0 :     }
      81              : }
      82              : 
      83            0 : #[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
      84              : pub enum TraversingDepth {
      85              :     Tenant,
      86              :     Timeline,
      87              : }
      88              : 
      89              : impl Display for TraversingDepth {
      90            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      91            0 :         f.write_str(match self {
      92            0 :             Self::Tenant => "tenant",
      93            0 :             Self::Timeline => "timeline",
      94              :         })
      95            0 :     }
      96              : }
      97              : 
      98            0 : #[derive(ValueEnum, Clone, Copy, Eq, PartialEq, Debug, Serialize, Deserialize)]
      99              : pub enum NodeKind {
     100              :     Safekeeper,
     101              :     Pageserver,
     102              : }
     103              : 
     104              : impl NodeKind {
     105            0 :     fn as_str(&self) -> &'static str {
     106            0 :         match self {
     107            0 :             Self::Safekeeper => "safekeeper",
     108            0 :             Self::Pageserver => "pageserver",
     109              :         }
     110            0 :     }
     111              : }
     112              : 
     113              : impl Display for NodeKind {
     114            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     115            0 :         f.write_str(self.as_str())
     116            0 :     }
     117              : }
     118              : 
     119              : impl S3Target {
     120            0 :     pub fn with_sub_segment(&self, new_segment: &str) -> Self {
     121            0 :         let mut new_self = self.clone();
     122            0 :         if new_self.prefix_in_bucket.is_empty() {
     123            0 :             new_self.prefix_in_bucket = format!("/{}/", new_segment);
     124            0 :         } else {
     125            0 :             if new_self.prefix_in_bucket.ends_with('/') {
     126            0 :                 new_self.prefix_in_bucket.pop();
     127            0 :             }
     128            0 :             new_self.prefix_in_bucket =
     129            0 :                 [&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter);
     130              :         }
     131            0 :         new_self
     132            0 :     }
     133              : }
     134              : 
     135              : #[derive(Clone)]
     136              : pub enum RootTarget {
     137              :     Pageserver(S3Target),
     138              :     Safekeeper(S3Target),
     139              : }
     140              : 
     141              : impl RootTarget {
     142            0 :     pub fn tenants_root(&self) -> S3Target {
     143            0 :         match self {
     144            0 :             Self::Pageserver(root) => root.with_sub_segment(TENANTS_SEGMENT_NAME),
     145            0 :             Self::Safekeeper(root) => root.clone(),
     146              :         }
     147            0 :     }
     148              : 
     149            0 :     pub fn tenant_root(&self, tenant_id: &TenantShardId) -> S3Target {
     150            0 :         match self {
     151            0 :             Self::Pageserver(_) => self.tenants_root().with_sub_segment(&tenant_id.to_string()),
     152            0 :             Self::Safekeeper(_) => self
     153            0 :                 .tenants_root()
     154            0 :                 .with_sub_segment(&tenant_id.tenant_id.to_string()),
     155              :         }
     156            0 :     }
     157              : 
     158            0 :     pub(crate) fn tenant_shards_prefix(&self, tenant_id: &TenantId) -> S3Target {
     159            0 :         // Only pageserver remote storage contains tenant-shards
     160            0 :         assert!(matches!(self, Self::Pageserver(_)));
     161            0 :         let Self::Pageserver(root) = self else {
     162            0 :             panic!();
     163              :         };
     164              : 
     165            0 :         S3Target {
     166            0 :             bucket_name: root.bucket_name.clone(),
     167            0 :             prefix_in_bucket: format!(
     168            0 :                 "{}/{TENANTS_SEGMENT_NAME}/{tenant_id}",
     169            0 :                 root.prefix_in_bucket
     170            0 :             ),
     171            0 :             delimiter: root.delimiter.clone(),
     172            0 :         }
     173            0 :     }
     174              : 
     175            0 :     pub fn timelines_root(&self, tenant_id: &TenantShardId) -> S3Target {
     176            0 :         match self {
     177            0 :             Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
     178            0 :             Self::Safekeeper(_) => self.tenant_root(tenant_id),
     179              :         }
     180            0 :     }
     181              : 
     182            0 :     pub fn timeline_root(&self, id: &TenantShardTimelineId) -> S3Target {
     183            0 :         self.timelines_root(&id.tenant_shard_id)
     184            0 :             .with_sub_segment(&id.timeline_id.to_string())
     185            0 :     }
     186              : 
     187            0 :     pub fn bucket_name(&self) -> &str {
     188            0 :         match self {
     189            0 :             Self::Pageserver(root) => &root.bucket_name,
     190            0 :             Self::Safekeeper(root) => &root.bucket_name,
     191              :         }
     192            0 :     }
     193              : 
     194            0 :     pub fn delimiter(&self) -> &str {
     195            0 :         match self {
     196            0 :             Self::Pageserver(root) => &root.delimiter,
     197            0 :             Self::Safekeeper(root) => &root.delimiter,
     198              :         }
     199            0 :     }
     200              : }
     201              : 
     202            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     203              : pub struct BucketConfig {
     204              :     pub region: String,
     205              :     pub bucket: String,
     206              :     pub prefix_in_bucket: Option<String>,
     207              : 
     208              :     /// Use SSO if this is set, else rely on AWS_* environment vars
     209              :     pub sso_account_id: Option<String>,
     210              : }
     211              : 
     212              : impl Display for BucketConfig {
     213            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     214            0 :         write!(
     215            0 :             f,
     216            0 :             "{}/{}/{}",
     217            0 :             self.sso_account_id.as_deref().unwrap_or("<none>"),
     218            0 :             self.region,
     219            0 :             self.bucket
     220            0 :         )
     221            0 :     }
     222              : }
     223              : 
     224              : impl BucketConfig {
     225            0 :     pub fn from_env() -> anyhow::Result<Self> {
     226            0 :         let sso_account_id = env::var("SSO_ACCOUNT_ID").ok();
     227            0 :         let region = env::var("REGION").context("'REGION' param retrieval")?;
     228            0 :         let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?;
     229            0 :         let prefix_in_bucket = env::var("BUCKET_PREFIX").ok();
     230            0 : 
     231            0 :         Ok(Self {
     232            0 :             region,
     233            0 :             bucket,
     234            0 :             prefix_in_bucket,
     235            0 :             sso_account_id,
     236            0 :         })
     237            0 :     }
     238              : }
     239              : 
     240              : pub struct ConsoleConfig {
     241              :     pub token: String,
     242              :     pub base_url: Url,
     243              : }
     244              : 
     245              : impl ConsoleConfig {
     246            0 :     pub fn from_env() -> anyhow::Result<Self> {
     247            0 :         let base_url: Url = env::var("CLOUD_ADMIN_API_URL")
     248            0 :             .context("'CLOUD_ADMIN_API_URL' param retrieval")?
     249            0 :             .parse()
     250            0 :             .context("'CLOUD_ADMIN_API_URL' param parsing")?;
     251              : 
     252            0 :         let token = env::var(CLOUD_ADMIN_API_TOKEN_ENV_VAR)
     253            0 :             .context("'CLOUD_ADMIN_API_TOKEN' environment variable fetch")?;
     254              : 
     255            0 :         Ok(Self { base_url, token })
     256            0 :     }
     257              : }
     258              : 
     259            0 : pub fn init_logging(file_name: &str) -> WorkerGuard {
     260            0 :     let (file_writer, guard) =
     261            0 :         tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
     262            0 : 
     263            0 :     let file_logs = fmt::Layer::new()
     264            0 :         .with_target(false)
     265            0 :         .with_ansi(false)
     266            0 :         .with_writer(file_writer);
     267            0 :     let stderr_logs = fmt::Layer::new()
     268            0 :         .with_target(false)
     269            0 :         .with_writer(std::io::stderr);
     270            0 :     tracing_subscriber::registry()
     271            0 :         .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
     272            0 :         .with(file_logs)
     273            0 :         .with(stderr_logs)
     274            0 :         .init();
     275            0 : 
     276            0 :     guard
     277            0 : }
     278              : 
     279            0 : pub fn init_s3_client(account_id: Option<String>, bucket_region: Region) -> Client {
     280            0 :     let credentials_provider = {
     281              :         // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
     282            0 :         let chain = CredentialsProviderChain::first_try(
     283            0 :             "env",
     284            0 :             EnvironmentVariableCredentialsProvider::new(),
     285            0 :         )
     286            0 :         // uses "AWS_PROFILE" / `aws sso login --profile <profile>`
     287            0 :         .or_else(
     288            0 :             "profile-sso",
     289            0 :             ProfileFileCredentialsProvider::builder().build(),
     290            0 :         );
     291            0 : 
     292            0 :         // Use SSO if we were given an account ID
     293            0 :         match account_id {
     294            0 :             Some(sso_account) => chain.or_else(
     295            0 :                 "sso",
     296            0 :                 SsoCredentialsProvider::builder()
     297            0 :                     .account_id(sso_account)
     298            0 :                     .role_name("PowerUserAccess")
     299            0 :                     .start_url("https://neondb.awsapps.com/start")
     300            0 :                     .region(bucket_region.clone())
     301            0 :                     .build(),
     302            0 :             ),
     303            0 :             None => chain,
     304              :         }
     305            0 :         .or_else(
     306            0 :             // Finally try IMDS
     307            0 :             "imds",
     308            0 :             ImdsCredentialsProvider::builder().build(),
     309            0 :         )
     310            0 :     };
     311            0 : 
     312            0 :     let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
     313            0 : 
     314            0 :     let mut builder = Config::builder()
     315            0 :         .behavior_version(
     316            0 :             #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
     317            0 :             BehaviorVersion::v2023_11_09(),
     318            0 :         )
     319            0 :         .region(bucket_region)
     320            0 :         .retry_config(RetryConfig::adaptive().with_max_attempts(3))
     321            0 :         .sleep_impl(SharedAsyncSleep::from(sleep_impl))
     322            0 :         .credentials_provider(credentials_provider);
     323              : 
     324            0 :     if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") {
     325            0 :         builder = builder.endpoint_url(endpoint)
     326            0 :     }
     327              : 
     328            0 :     Client::from_conf(builder.build())
     329            0 : }
     330              : 
     331            0 : fn init_remote(
     332            0 :     bucket_config: BucketConfig,
     333            0 :     node_kind: NodeKind,
     334            0 : ) -> anyhow::Result<(Arc<Client>, RootTarget)> {
     335            0 :     let bucket_region = Region::new(bucket_config.region);
     336            0 :     let delimiter = "/".to_string();
     337            0 :     let s3_client = Arc::new(init_s3_client(bucket_config.sso_account_id, bucket_region));
     338              : 
     339            0 :     let s3_root = match node_kind {
     340            0 :         NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
     341            0 :             bucket_name: bucket_config.bucket,
     342            0 :             prefix_in_bucket: bucket_config
     343            0 :                 .prefix_in_bucket
     344            0 :                 .unwrap_or("pageserver/v1".to_string()),
     345            0 :             delimiter,
     346            0 :         }),
     347            0 :         NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target {
     348            0 :             bucket_name: bucket_config.bucket,
     349            0 :             prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or("wal/".to_string()),
     350            0 :             delimiter,
     351            0 :         }),
     352              :     };
     353              : 
     354            0 :     Ok((s3_client, s3_root))
     355            0 : }
     356              : 
     357            0 : async fn list_objects_with_retries(
     358            0 :     s3_client: &Client,
     359            0 :     s3_target: &S3Target,
     360            0 :     continuation_token: Option<String>,
     361            0 : ) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
     362            0 :     for _ in 0..MAX_RETRIES {
     363            0 :         match s3_client
     364            0 :             .list_objects_v2()
     365            0 :             .bucket(&s3_target.bucket_name)
     366            0 :             .prefix(&s3_target.prefix_in_bucket)
     367            0 :             .delimiter(&s3_target.delimiter)
     368            0 :             .set_continuation_token(continuation_token.clone())
     369            0 :             .send()
     370            0 :             .await
     371              :         {
     372            0 :             Ok(response) => return Ok(response),
     373            0 :             Err(e) => {
     374            0 :                 error!(
     375            0 :                     "list_objects_v2 query failed: {e}, bucket_name={}, prefix={}, delimiter={}",
     376              :                     s3_target.bucket_name, s3_target.prefix_in_bucket, s3_target.delimiter
     377              :                 );
     378            0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     379              :             }
     380              :         }
     381              :     }
     382              : 
     383            0 :     anyhow::bail!("Failed to list objects {MAX_RETRIES} times")
     384            0 : }
     385              : 
     386            0 : async fn download_object_with_retries(
     387            0 :     s3_client: &Client,
     388            0 :     bucket_name: &str,
     389            0 :     key: &str,
     390            0 : ) -> anyhow::Result<Vec<u8>> {
     391            0 :     for _ in 0..MAX_RETRIES {
     392            0 :         let mut body_buf = Vec::new();
     393            0 :         let response_stream = match s3_client
     394            0 :             .get_object()
     395            0 :             .bucket(bucket_name)
     396            0 :             .key(key)
     397            0 :             .send()
     398            0 :             .await
     399              :         {
     400            0 :             Ok(response) => response,
     401            0 :             Err(e) => {
     402            0 :                 error!("Failed to download object for key {key}: {e}");
     403            0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     404            0 :                 continue;
     405              :             }
     406              :         };
     407              : 
     408            0 :         match response_stream
     409            0 :             .body
     410            0 :             .into_async_read()
     411            0 :             .read_to_end(&mut body_buf)
     412            0 :             .await
     413              :         {
     414            0 :             Ok(bytes_read) => {
     415            0 :                 tracing::info!("Downloaded {bytes_read} bytes for object object with key {key}");
     416            0 :                 return Ok(body_buf);
     417              :             }
     418            0 :             Err(e) => {
     419            0 :                 error!("Failed to stream object body for key {key}: {e}");
     420            0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     421              :             }
     422              :         }
     423              :     }
     424              : 
     425            0 :     anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
     426            0 : }
     427              : 
     428            0 : async fn download_object_to_file(
     429            0 :     s3_client: &Client,
     430            0 :     bucket_name: &str,
     431            0 :     key: &str,
     432            0 :     version_id: Option<&str>,
     433            0 :     local_path: &Utf8Path,
     434            0 : ) -> anyhow::Result<()> {
     435            0 :     let tmp_path = Utf8PathBuf::from(format!("{local_path}.tmp"));
     436            0 :     for _ in 0..MAX_RETRIES {
     437            0 :         tokio::fs::remove_file(&tmp_path)
     438            0 :             .await
     439            0 :             .or_else(fs_ext::ignore_not_found)?;
     440              : 
     441            0 :         let mut file = tokio::fs::File::create(&tmp_path)
     442            0 :             .await
     443            0 :             .context("Opening output file")?;
     444              : 
     445            0 :         let request = s3_client.get_object().bucket(bucket_name).key(key);
     446              : 
     447            0 :         let request = match version_id {
     448            0 :             Some(version_id) => request.version_id(version_id),
     449            0 :             None => request,
     450              :         };
     451              : 
     452            0 :         let response_stream = match request.send().await {
     453            0 :             Ok(response) => response,
     454            0 :             Err(e) => {
     455            0 :                 error!(
     456            0 :                     "Failed to download object for key {key} version {}: {e:#}",
     457            0 :                     version_id.unwrap_or("")
     458              :                 );
     459            0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     460            0 :                 continue;
     461              :             }
     462              :         };
     463              : 
     464            0 :         let mut read_stream = response_stream.body.into_async_read();
     465            0 : 
     466            0 :         tokio::io::copy(&mut read_stream, &mut file).await?;
     467              : 
     468            0 :         tokio::fs::rename(&tmp_path, local_path).await?;
     469            0 :         return Ok(());
     470              :     }
     471              : 
     472            0 :     anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
     473            0 : }
        

Generated by: LCOV version 2.1-beta