LCOV - differential code coverage report
Current view: top level - s3_scrubber/src - lib.rs (source / functions) Coverage Total Hit UBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 0.0 % 237 0 237
Current Date: 2024-01-09 02:06:09 Functions: 0.0 % 98 0 98
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : #![deny(unsafe_code)]
       2                 : #![deny(clippy::undocumented_unsafe_blocks)]
       3                 : pub mod checks;
       4                 : pub mod cloud_admin_api;
       5                 : pub mod garbage;
       6                 : pub mod metadata_stream;
       7                 : pub mod scan_metadata;
       8                 : 
       9                 : use std::env;
      10                 : use std::fmt::Display;
      11                 : use std::sync::Arc;
      12                 : use std::time::Duration;
      13                 : 
      14                 : use anyhow::Context;
      15                 : use aws_config::environment::EnvironmentVariableCredentialsProvider;
      16                 : use aws_config::imds::credentials::ImdsCredentialsProvider;
      17                 : use aws_config::meta::credentials::CredentialsProviderChain;
      18                 : use aws_config::profile::ProfileFileCredentialsProvider;
      19                 : use aws_config::retry::RetryConfig;
      20                 : use aws_config::sso::SsoCredentialsProvider;
      21                 : use aws_config::BehaviorVersion;
      22                 : use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep};
      23                 : use aws_sdk_s3::{Client, Config};
      24                 : use aws_smithy_async::rt::sleep::TokioSleep;
      25                 : 
      26                 : use clap::ValueEnum;
      27                 : use pageserver::tenant::TENANTS_SEGMENT_NAME;
      28                 : use pageserver_api::shard::TenantShardId;
      29                 : use reqwest::Url;
      30                 : use serde::{Deserialize, Serialize};
      31                 : use std::io::IsTerminal;
      32                 : use tokio::io::AsyncReadExt;
      33                 : use tracing::error;
      34                 : use tracing_appender::non_blocking::WorkerGuard;
      35                 : use tracing_subscriber::{fmt, prelude::*, EnvFilter};
      36                 : use utils::id::TimelineId;
      37                 : 
      38                 : const MAX_RETRIES: usize = 20;
      39                 : const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
      40                 : 
      41 UBC           0 : #[derive(Debug, Clone)]
      42                 : pub struct S3Target {
      43                 :     pub bucket_name: String,
      44                 :     /// This `prefix_in_bucket` is only equal to the PS/SK config of the same
      45                 :     /// name for the RootTarget: other instances of S3Target will have prefix_in_bucket
      46                 :     /// with extra parts.
      47                 :     pub prefix_in_bucket: String,
      48                 :     pub delimiter: String,
      49                 : }
      50                 : 
      51                 : /// Convenience for referring to timelines within a particular shard: more ergonomic
      52                 : /// than using a 2-tuple.
      53                 : ///
      54                 : /// This is the shard-aware equivalent of TenantTimelineId.  It's defined here rather
      55                 : /// than somewhere more broadly exposed, because this kind of thing is rarely needed
      56                 : /// in the pageserver, as all timeline objects existing in the scope of a particular
      57                 : /// tenant: the scrubber is different in that it handles collections of data referring to many
      58                 : /// TenantShardTimelineIds in on place.
      59               0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)]
      60                 : pub struct TenantShardTimelineId {
      61                 :     tenant_shard_id: TenantShardId,
      62                 :     timeline_id: TimelineId,
      63                 : }
      64                 : 
      65                 : impl TenantShardTimelineId {
      66               0 :     fn new(tenant_shard_id: TenantShardId, timeline_id: TimelineId) -> Self {
      67               0 :         Self {
      68               0 :             tenant_shard_id,
      69               0 :             timeline_id,
      70               0 :         }
      71               0 :     }
      72                 : }
      73                 : 
      74                 : impl Display for TenantShardTimelineId {
      75               0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      76               0 :         write!(f, "{}/{}", self.tenant_shard_id, self.timeline_id)
      77               0 :     }
      78                 : }
      79                 : 
      80               0 : #[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
      81                 : pub enum TraversingDepth {
      82                 :     Tenant,
      83                 :     Timeline,
      84                 : }
      85                 : 
      86                 : impl Display for TraversingDepth {
      87               0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      88               0 :         f.write_str(match self {
      89               0 :             Self::Tenant => "tenant",
      90               0 :             Self::Timeline => "timeline",
      91                 :         })
      92               0 :     }
      93                 : }
      94                 : 
      95               0 : #[derive(ValueEnum, Clone, Copy, Eq, PartialEq, Debug, Serialize, Deserialize)]
      96                 : pub enum NodeKind {
      97                 :     Safekeeper,
      98                 :     Pageserver,
      99                 : }
     100                 : 
     101                 : impl NodeKind {
     102               0 :     fn as_str(&self) -> &'static str {
     103               0 :         match self {
     104               0 :             Self::Safekeeper => "safekeeper",
     105               0 :             Self::Pageserver => "pageserver",
     106                 :         }
     107               0 :     }
     108                 : }
     109                 : 
     110                 : impl Display for NodeKind {
     111               0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     112               0 :         f.write_str(self.as_str())
     113               0 :     }
     114                 : }
     115                 : 
     116                 : impl S3Target {
     117               0 :     pub fn with_sub_segment(&self, new_segment: &str) -> Self {
     118               0 :         let mut new_self = self.clone();
     119               0 :         if new_self.prefix_in_bucket.is_empty() {
     120               0 :             new_self.prefix_in_bucket = format!("/{}/", new_segment);
     121               0 :         } else {
     122               0 :             if new_self.prefix_in_bucket.ends_with('/') {
     123               0 :                 new_self.prefix_in_bucket.pop();
     124               0 :             }
     125               0 :             new_self.prefix_in_bucket =
     126               0 :                 [&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter);
     127                 :         }
     128               0 :         new_self
     129               0 :     }
     130                 : }
     131                 : 
     132               0 : #[derive(Clone)]
     133                 : pub enum RootTarget {
     134                 :     Pageserver(S3Target),
     135                 :     Safekeeper(S3Target),
     136                 : }
     137                 : 
     138                 : impl RootTarget {
     139               0 :     pub fn tenants_root(&self) -> S3Target {
     140               0 :         match self {
     141               0 :             Self::Pageserver(root) => root.with_sub_segment(TENANTS_SEGMENT_NAME),
     142               0 :             Self::Safekeeper(root) => root.with_sub_segment("wal"),
     143                 :         }
     144               0 :     }
     145                 : 
     146               0 :     pub fn tenant_root(&self, tenant_id: &TenantShardId) -> S3Target {
     147               0 :         self.tenants_root().with_sub_segment(&tenant_id.to_string())
     148               0 :     }
     149                 : 
     150               0 :     pub fn timelines_root(&self, tenant_id: &TenantShardId) -> S3Target {
     151               0 :         match self {
     152               0 :             Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
     153               0 :             Self::Safekeeper(_) => self.tenant_root(tenant_id),
     154                 :         }
     155               0 :     }
     156                 : 
     157               0 :     pub fn timeline_root(&self, id: &TenantShardTimelineId) -> S3Target {
     158               0 :         self.timelines_root(&id.tenant_shard_id)
     159               0 :             .with_sub_segment(&id.timeline_id.to_string())
     160               0 :     }
     161                 : 
     162               0 :     pub fn bucket_name(&self) -> &str {
     163               0 :         match self {
     164               0 :             Self::Pageserver(root) => &root.bucket_name,
     165               0 :             Self::Safekeeper(root) => &root.bucket_name,
     166                 :         }
     167               0 :     }
     168                 : 
     169               0 :     pub fn delimiter(&self) -> &str {
     170               0 :         match self {
     171               0 :             Self::Pageserver(root) => &root.delimiter,
     172               0 :             Self::Safekeeper(root) => &root.delimiter,
     173                 :         }
     174               0 :     }
     175                 : }
     176                 : 
     177               0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     178                 : pub struct BucketConfig {
     179                 :     pub region: String,
     180                 :     pub bucket: String,
     181                 :     pub prefix_in_bucket: Option<String>,
     182                 : 
     183                 :     /// Use SSO if this is set, else rely on AWS_* environment vars
     184                 :     pub sso_account_id: Option<String>,
     185                 : }
     186                 : 
     187                 : impl Display for BucketConfig {
     188               0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     189               0 :         write!(
     190               0 :             f,
     191               0 :             "{}/{}/{}",
     192               0 :             self.sso_account_id.as_deref().unwrap_or("<none>"),
     193               0 :             self.region,
     194               0 :             self.bucket
     195               0 :         )
     196               0 :     }
     197                 : }
     198                 : 
     199                 : impl BucketConfig {
     200               0 :     pub fn from_env() -> anyhow::Result<Self> {
     201               0 :         let sso_account_id = env::var("SSO_ACCOUNT_ID").ok();
     202               0 :         let region = env::var("REGION").context("'REGION' param retrieval")?;
     203               0 :         let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?;
     204               0 :         let prefix_in_bucket = env::var("BUCKET_PREFIX").ok();
     205               0 : 
     206               0 :         Ok(Self {
     207               0 :             region,
     208               0 :             bucket,
     209               0 :             prefix_in_bucket,
     210               0 :             sso_account_id,
     211               0 :         })
     212               0 :     }
     213                 : }
     214                 : 
     215                 : pub struct ConsoleConfig {
     216                 :     pub token: String,
     217                 :     pub base_url: Url,
     218                 : }
     219                 : 
     220                 : impl ConsoleConfig {
     221               0 :     pub fn from_env() -> anyhow::Result<Self> {
     222               0 :         let base_url: Url = env::var("CLOUD_ADMIN_API_URL")
     223               0 :             .context("'CLOUD_ADMIN_API_URL' param retrieval")?
     224               0 :             .parse()
     225               0 :             .context("'CLOUD_ADMIN_API_URL' param parsing")?;
     226                 : 
     227               0 :         let token = env::var(CLOUD_ADMIN_API_TOKEN_ENV_VAR)
     228               0 :             .context("'CLOUD_ADMIN_API_TOKEN' environment variable fetch")?;
     229                 : 
     230               0 :         Ok(Self { base_url, token })
     231               0 :     }
     232                 : }
     233                 : 
     234               0 : pub fn init_logging(file_name: &str) -> WorkerGuard {
     235               0 :     let (file_writer, guard) =
     236               0 :         tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
     237               0 : 
     238               0 :     let file_logs = fmt::Layer::new()
     239               0 :         .with_target(false)
     240               0 :         .with_ansi(false)
     241               0 :         .with_writer(file_writer);
     242               0 :     let stderr_logs = fmt::Layer::new()
     243               0 :         .with_ansi(std::io::stderr().is_terminal())
     244               0 :         .with_target(false)
     245               0 :         .with_writer(std::io::stderr);
     246               0 :     tracing_subscriber::registry()
     247               0 :         .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
     248               0 :         .with(file_logs)
     249               0 :         .with(stderr_logs)
     250               0 :         .init();
     251               0 : 
     252               0 :     guard
     253               0 : }
     254                 : 
     255               0 : pub fn init_s3_client(account_id: Option<String>, bucket_region: Region) -> Client {
     256               0 :     let credentials_provider = {
     257                 :         // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
     258               0 :         let chain = CredentialsProviderChain::first_try(
     259               0 :             "env",
     260               0 :             EnvironmentVariableCredentialsProvider::new(),
     261               0 :         )
     262               0 :         // uses "AWS_PROFILE" / `aws sso login --profile <profile>`
     263               0 :         .or_else(
     264               0 :             "profile-sso",
     265               0 :             ProfileFileCredentialsProvider::builder().build(),
     266               0 :         );
     267               0 : 
     268               0 :         // Use SSO if we were given an account ID
     269               0 :         match account_id {
     270               0 :             Some(sso_account) => chain.or_else(
     271               0 :                 "sso",
     272               0 :                 SsoCredentialsProvider::builder()
     273               0 :                     .account_id(sso_account)
     274               0 :                     .role_name("PowerUserAccess")
     275               0 :                     .start_url("https://neondb.awsapps.com/start")
     276               0 :                     .region(bucket_region.clone())
     277               0 :                     .build(),
     278               0 :             ),
     279               0 :             None => chain,
     280                 :         }
     281               0 :         .or_else(
     282               0 :             // Finally try IMDS
     283               0 :             "imds",
     284               0 :             ImdsCredentialsProvider::builder().build(),
     285               0 :         )
     286               0 :     };
     287               0 : 
     288               0 :     let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
     289               0 : 
     290               0 :     let mut builder = Config::builder()
     291               0 :         .behavior_version(BehaviorVersion::v2023_11_09())
     292               0 :         .region(bucket_region)
     293               0 :         .retry_config(RetryConfig::adaptive().with_max_attempts(3))
     294               0 :         .sleep_impl(SharedAsyncSleep::from(sleep_impl))
     295               0 :         .credentials_provider(credentials_provider);
     296                 : 
     297               0 :     if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") {
     298               0 :         builder = builder.endpoint_url(endpoint)
     299               0 :     }
     300                 : 
     301               0 :     Client::from_conf(builder.build())
     302               0 : }
     303                 : 
     304               0 : fn init_remote(
     305               0 :     bucket_config: BucketConfig,
     306               0 :     node_kind: NodeKind,
     307               0 : ) -> anyhow::Result<(Arc<Client>, RootTarget)> {
     308               0 :     let bucket_region = Region::new(bucket_config.region);
     309               0 :     let delimiter = "/".to_string();
     310               0 :     let s3_client = Arc::new(init_s3_client(bucket_config.sso_account_id, bucket_region));
     311                 : 
     312               0 :     let s3_root = match node_kind {
     313               0 :         NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
     314               0 :             bucket_name: bucket_config.bucket,
     315               0 :             prefix_in_bucket: bucket_config
     316               0 :                 .prefix_in_bucket
     317               0 :                 .unwrap_or("pageserver/v1".to_string()),
     318               0 :             delimiter,
     319               0 :         }),
     320               0 :         NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target {
     321               0 :             bucket_name: bucket_config.bucket,
     322               0 :             prefix_in_bucket: bucket_config
     323               0 :                 .prefix_in_bucket
     324               0 :                 .unwrap_or("safekeeper/v1".to_string()),
     325               0 :             delimiter,
     326               0 :         }),
     327                 :     };
     328                 : 
     329               0 :     Ok((s3_client, s3_root))
     330               0 : }
     331                 : 
     332               0 : async fn list_objects_with_retries(
     333               0 :     s3_client: &Client,
     334               0 :     s3_target: &S3Target,
     335               0 :     continuation_token: Option<String>,
     336               0 : ) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
     337               0 :     for _ in 0..MAX_RETRIES {
     338               0 :         match s3_client
     339               0 :             .list_objects_v2()
     340               0 :             .bucket(&s3_target.bucket_name)
     341               0 :             .prefix(&s3_target.prefix_in_bucket)
     342               0 :             .delimiter(&s3_target.delimiter)
     343               0 :             .set_continuation_token(continuation_token.clone())
     344               0 :             .send()
     345               0 :             .await
     346                 :         {
     347               0 :             Ok(response) => return Ok(response),
     348               0 :             Err(e) => {
     349               0 :                 error!("list_objects_v2 query failed: {e}");
     350               0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     351                 :             }
     352                 :         }
     353                 :     }
     354                 : 
     355               0 :     anyhow::bail!("Failed to list objects {MAX_RETRIES} times")
     356               0 : }
     357                 : 
     358               0 : async fn download_object_with_retries(
     359               0 :     s3_client: &Client,
     360               0 :     bucket_name: &str,
     361               0 :     key: &str,
     362               0 : ) -> anyhow::Result<Vec<u8>> {
     363               0 :     for _ in 0..MAX_RETRIES {
     364               0 :         let mut body_buf = Vec::new();
     365               0 :         let response_stream = match s3_client
     366               0 :             .get_object()
     367               0 :             .bucket(bucket_name)
     368               0 :             .key(key)
     369               0 :             .send()
     370               0 :             .await
     371                 :         {
     372               0 :             Ok(response) => response,
     373               0 :             Err(e) => {
     374               0 :                 error!("Failed to download object for key {key}: {e}");
     375               0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     376               0 :                 continue;
     377                 :             }
     378                 :         };
     379                 : 
     380               0 :         match response_stream
     381               0 :             .body
     382               0 :             .into_async_read()
     383               0 :             .read_to_end(&mut body_buf)
     384               0 :             .await
     385                 :         {
     386               0 :             Ok(bytes_read) => {
     387               0 :                 tracing::info!("Downloaded {bytes_read} bytes for object object with key {key}");
     388               0 :                 return Ok(body_buf);
     389                 :             }
     390               0 :             Err(e) => {
     391               0 :                 error!("Failed to stream object body for key {key}: {e}");
     392               0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     393                 :             }
     394                 :         }
     395                 :     }
     396                 : 
     397               0 :     anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
     398               0 : }
        

Generated by: LCOV version 2.1-beta