LCOV - code coverage report
Current view: top level - storage_scrubber/src - lib.rs (source / functions) Coverage Total Hit
Test: 42f947419473a288706e86ecdf7c2863d760d5d7.info Lines: 0.0 % 356 0
Test Date: 2024-08-02 21:34:27 Functions: 0.0 % 81 0

            Line data    Source code
       1              : #![deny(unsafe_code)]
       2              : #![deny(clippy::undocumented_unsafe_blocks)]
       3              : pub mod checks;
       4              : pub mod cloud_admin_api;
       5              : pub mod find_large_objects;
       6              : pub mod garbage;
       7              : pub mod metadata_stream;
       8              : pub mod pageserver_physical_gc;
       9              : pub mod scan_pageserver_metadata;
      10              : pub mod scan_safekeeper_metadata;
      11              : pub mod tenant_snapshot;
      12              : 
      13              : use std::env;
      14              : use std::fmt::Display;
      15              : use std::sync::Arc;
      16              : use std::time::Duration;
      17              : 
      18              : use anyhow::{anyhow, Context};
      19              : use aws_config::retry::{RetryConfigBuilder, RetryMode};
      20              : use aws_sdk_s3::config::Region;
      21              : use aws_sdk_s3::error::DisplayErrorContext;
      22              : use aws_sdk_s3::Client;
      23              : 
      24              : use camino::{Utf8Path, Utf8PathBuf};
      25              : use clap::ValueEnum;
      26              : use futures::{Stream, StreamExt};
      27              : use pageserver::tenant::remote_timeline_client::{remote_tenant_path, remote_timeline_path};
      28              : use pageserver::tenant::TENANTS_SEGMENT_NAME;
      29              : use pageserver_api::shard::TenantShardId;
      30              : use remote_storage::{
      31              :     GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorageConfig, RemoteStorageKind,
      32              :     S3Config, DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
      33              : };
      34              : use reqwest::Url;
      35              : use serde::{Deserialize, Serialize};
      36              : use storage_controller_client::control_api;
      37              : use tokio::io::AsyncReadExt;
      38              : use tokio_util::sync::CancellationToken;
      39              : use tracing::error;
      40              : use tracing_appender::non_blocking::WorkerGuard;
      41              : use tracing_subscriber::{fmt, prelude::*, EnvFilter};
      42              : use utils::fs_ext;
      43              : use utils::id::{TenantId, TenantTimelineId, TimelineId};
      44              : 
      45              : const MAX_RETRIES: usize = 20;
      46              : const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
      47              : 
      48              : #[derive(Debug, Clone)]
      49              : pub struct S3Target {
      50              :     pub bucket_name: String,
      51              :     /// This `prefix_in_bucket` is only equal to the PS/SK config of the same
      52              :     /// name for the RootTarget: other instances of S3Target will have prefix_in_bucket
      53              :     /// with extra parts.
      54              :     pub prefix_in_bucket: String,
      55              :     pub delimiter: String,
      56              : }
      57              : 
      58              : /// Convenience for referring to timelines within a particular shard: more ergonomic
      59              : /// than using a 2-tuple.
      60              : ///
      61              : /// This is the shard-aware equivalent of TenantTimelineId.  It's defined here rather
      62              : /// than somewhere more broadly exposed, because this kind of thing is rarely needed
      63              : /// in the pageserver, as all timeline objects existing in the scope of a particular
      64              : /// tenant: the scrubber is different in that it handles collections of data referring to many
      65              : /// TenantShardTimelineIds in on place.
      66            0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
      67              : pub struct TenantShardTimelineId {
      68              :     tenant_shard_id: TenantShardId,
      69              :     timeline_id: TimelineId,
      70              : }
      71              : 
      72              : impl TenantShardTimelineId {
      73            0 :     fn new(tenant_shard_id: TenantShardId, timeline_id: TimelineId) -> Self {
      74            0 :         Self {
      75            0 :             tenant_shard_id,
      76            0 :             timeline_id,
      77            0 :         }
      78            0 :     }
      79              : 
      80            0 :     fn as_tenant_timeline_id(&self) -> TenantTimelineId {
      81            0 :         TenantTimelineId::new(self.tenant_shard_id.tenant_id, self.timeline_id)
      82            0 :     }
      83              : }
      84              : 
      85              : impl Display for TenantShardTimelineId {
      86            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      87            0 :         write!(f, "{}/{}", self.tenant_shard_id, self.timeline_id)
      88            0 :     }
      89              : }
      90              : 
      91            0 : #[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
      92              : pub enum TraversingDepth {
      93              :     Tenant,
      94              :     Timeline,
      95              : }
      96              : 
      97              : impl Display for TraversingDepth {
      98            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      99            0 :         f.write_str(match self {
     100            0 :             Self::Tenant => "tenant",
     101            0 :             Self::Timeline => "timeline",
     102              :         })
     103            0 :     }
     104              : }
     105              : 
     106            0 : #[derive(ValueEnum, Clone, Copy, Eq, PartialEq, Debug, Serialize, Deserialize)]
     107              : pub enum NodeKind {
     108              :     Safekeeper,
     109              :     Pageserver,
     110              : }
     111              : 
     112              : impl NodeKind {
     113            0 :     fn as_str(&self) -> &'static str {
     114            0 :         match self {
     115            0 :             Self::Safekeeper => "safekeeper",
     116            0 :             Self::Pageserver => "pageserver",
     117              :         }
     118            0 :     }
     119              : }
     120              : 
     121              : impl Display for NodeKind {
     122            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     123            0 :         f.write_str(self.as_str())
     124            0 :     }
     125              : }
     126              : 
     127              : impl S3Target {
     128            0 :     pub fn with_sub_segment(&self, new_segment: &str) -> Self {
     129            0 :         let mut new_self = self.clone();
     130            0 :         if new_self.prefix_in_bucket.is_empty() {
     131            0 :             new_self.prefix_in_bucket = format!("/{}/", new_segment);
     132            0 :         } else {
     133            0 :             if new_self.prefix_in_bucket.ends_with('/') {
     134            0 :                 new_self.prefix_in_bucket.pop();
     135            0 :             }
     136            0 :             new_self.prefix_in_bucket =
     137            0 :                 [&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter);
     138              :         }
     139            0 :         new_self
     140            0 :     }
     141              : }
     142              : 
     143              : #[derive(Clone)]
     144              : pub enum RootTarget {
     145              :     Pageserver(S3Target),
     146              :     Safekeeper(S3Target),
     147              : }
     148              : 
     149              : impl RootTarget {
     150            0 :     pub fn tenants_root(&self) -> S3Target {
     151            0 :         match self {
     152            0 :             Self::Pageserver(root) => root.with_sub_segment(TENANTS_SEGMENT_NAME),
     153            0 :             Self::Safekeeper(root) => root.clone(),
     154              :         }
     155            0 :     }
     156              : 
     157            0 :     pub fn tenant_root(&self, tenant_id: &TenantShardId) -> S3Target {
     158            0 :         match self {
     159            0 :             Self::Pageserver(_) => self.tenants_root().with_sub_segment(&tenant_id.to_string()),
     160            0 :             Self::Safekeeper(_) => self
     161            0 :                 .tenants_root()
     162            0 :                 .with_sub_segment(&tenant_id.tenant_id.to_string()),
     163              :         }
     164            0 :     }
     165              : 
     166            0 :     pub(crate) fn tenant_shards_prefix(&self, tenant_id: &TenantId) -> S3Target {
     167            0 :         // Only pageserver remote storage contains tenant-shards
     168            0 :         assert!(matches!(self, Self::Pageserver(_)));
     169            0 :         let Self::Pageserver(root) = self else {
     170            0 :             panic!();
     171              :         };
     172              : 
     173            0 :         S3Target {
     174            0 :             bucket_name: root.bucket_name.clone(),
     175            0 :             prefix_in_bucket: format!(
     176            0 :                 "{}/{TENANTS_SEGMENT_NAME}/{tenant_id}",
     177            0 :                 root.prefix_in_bucket
     178            0 :             ),
     179            0 :             delimiter: root.delimiter.clone(),
     180            0 :         }
     181            0 :     }
     182              : 
     183            0 :     pub fn timelines_root(&self, tenant_id: &TenantShardId) -> S3Target {
     184            0 :         match self {
     185            0 :             Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
     186            0 :             Self::Safekeeper(_) => self.tenant_root(tenant_id),
     187              :         }
     188            0 :     }
     189              : 
     190            0 :     pub fn timeline_root(&self, id: &TenantShardTimelineId) -> S3Target {
     191            0 :         self.timelines_root(&id.tenant_shard_id)
     192            0 :             .with_sub_segment(&id.timeline_id.to_string())
     193            0 :     }
     194              : 
     195              :     /// Given RemotePath "tenants/foo/timelines/bar/layerxyz", prefix it to a literal
     196              :     /// key in the S3 bucket.
     197            0 :     pub fn absolute_key(&self, key: &RemotePath) -> String {
     198            0 :         let root = match self {
     199            0 :             Self::Pageserver(root) => root,
     200            0 :             Self::Safekeeper(root) => root,
     201              :         };
     202              : 
     203            0 :         let prefix = &root.prefix_in_bucket;
     204            0 :         if prefix.ends_with('/') {
     205            0 :             format!("{prefix}{key}")
     206              :         } else {
     207            0 :             format!("{prefix}/{key}")
     208              :         }
     209            0 :     }
     210              : 
     211            0 :     pub fn bucket_name(&self) -> &str {
     212            0 :         match self {
     213            0 :             Self::Pageserver(root) => &root.bucket_name,
     214            0 :             Self::Safekeeper(root) => &root.bucket_name,
     215              :         }
     216            0 :     }
     217              : 
     218            0 :     pub fn delimiter(&self) -> &str {
     219            0 :         match self {
     220            0 :             Self::Pageserver(root) => &root.delimiter,
     221            0 :             Self::Safekeeper(root) => &root.delimiter,
     222              :         }
     223            0 :     }
     224              : }
     225              : 
     226            0 : pub fn remote_timeline_path_id(id: &TenantShardTimelineId) -> RemotePath {
     227            0 :     remote_timeline_path(&id.tenant_shard_id, &id.timeline_id)
     228            0 : }
     229              : 
     230            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     231              : #[serde(deny_unknown_fields)]
     232              : pub struct BucketConfig {
     233              :     pub region: String,
     234              :     pub bucket: String,
     235              :     pub prefix_in_bucket: Option<String>,
     236              : }
     237              : 
     238              : impl BucketConfig {
     239            0 :     pub fn from_env() -> anyhow::Result<Self> {
     240            0 :         let region = env::var("REGION").context("'REGION' param retrieval")?;
     241            0 :         let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?;
     242            0 :         let prefix_in_bucket = env::var("BUCKET_PREFIX").ok();
     243            0 : 
     244            0 :         Ok(Self {
     245            0 :             region,
     246            0 :             bucket,
     247            0 :             prefix_in_bucket,
     248            0 :         })
     249            0 :     }
     250              : }
     251              : 
     252              : pub struct ControllerClientConfig {
     253              :     /// URL to storage controller.  e.g. http://127.0.0.1:1234 when using `neon_local`
     254              :     pub controller_api: Url,
     255              : 
     256              :     /// JWT token for authenticating with storage controller.  Requires scope 'scrubber' or 'admin'.
     257              :     pub controller_jwt: String,
     258              : }
     259              : 
     260              : impl ControllerClientConfig {
     261            0 :     pub fn build_client(self) -> control_api::Client {
     262            0 :         control_api::Client::new(self.controller_api, Some(self.controller_jwt))
     263            0 :     }
     264              : }
     265              : 
     266              : pub struct ConsoleConfig {
     267              :     pub token: String,
     268              :     pub base_url: Url,
     269              : }
     270              : 
     271              : impl ConsoleConfig {
     272            0 :     pub fn from_env() -> anyhow::Result<Self> {
     273            0 :         let base_url: Url = env::var("CLOUD_ADMIN_API_URL")
     274            0 :             .context("'CLOUD_ADMIN_API_URL' param retrieval")?
     275            0 :             .parse()
     276            0 :             .context("'CLOUD_ADMIN_API_URL' param parsing")?;
     277              : 
     278            0 :         let token = env::var(CLOUD_ADMIN_API_TOKEN_ENV_VAR)
     279            0 :             .context("'CLOUD_ADMIN_API_TOKEN' environment variable fetch")?;
     280              : 
     281            0 :         Ok(Self { base_url, token })
     282            0 :     }
     283              : }
     284              : 
     285            0 : pub fn init_logging(file_name: &str) -> Option<WorkerGuard> {
     286            0 :     let stderr_logs = fmt::Layer::new()
     287            0 :         .with_target(false)
     288            0 :         .with_writer(std::io::stderr);
     289              : 
     290            0 :     let disable_file_logging = match std::env::var("PAGESERVER_DISABLE_FILE_LOGGING") {
     291            0 :         Ok(s) => s == "1" || s.to_lowercase() == "true",
     292            0 :         Err(_) => false,
     293              :     };
     294              : 
     295            0 :     if disable_file_logging {
     296            0 :         tracing_subscriber::registry()
     297            0 :             .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
     298            0 :             .with(stderr_logs)
     299            0 :             .init();
     300            0 :         None
     301              :     } else {
     302            0 :         let (file_writer, guard) =
     303            0 :             tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
     304            0 :         let file_logs = fmt::Layer::new()
     305            0 :             .with_target(false)
     306            0 :             .with_ansi(false)
     307            0 :             .with_writer(file_writer);
     308            0 :         tracing_subscriber::registry()
     309            0 :             .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
     310            0 :             .with(stderr_logs)
     311            0 :             .with(file_logs)
     312            0 :             .init();
     313            0 :         Some(guard)
     314              :     }
     315            0 : }
     316              : 
     317            0 : async fn init_s3_client(bucket_region: Region) -> Client {
     318            0 :     let mut retry_config_builder = RetryConfigBuilder::new();
     319            0 : 
     320            0 :     retry_config_builder
     321            0 :         .set_max_attempts(Some(3))
     322            0 :         .set_mode(Some(RetryMode::Adaptive));
     323              : 
     324            0 :     let config = aws_config::defaults(aws_config::BehaviorVersion::v2024_03_28())
     325            0 :         .region(bucket_region)
     326            0 :         .retry_config(retry_config_builder.build())
     327            0 :         .load()
     328            0 :         .await;
     329            0 :     Client::new(&config)
     330            0 : }
     331              : 
     332            0 : fn default_prefix_in_bucket(node_kind: NodeKind) -> &'static str {
     333            0 :     match node_kind {
     334            0 :         NodeKind::Pageserver => "pageserver/v1/",
     335            0 :         NodeKind::Safekeeper => "wal/",
     336              :     }
     337            0 : }
     338              : 
     339            0 : fn make_root_target(
     340            0 :     bucket_name: String,
     341            0 :     prefix_in_bucket: String,
     342            0 :     node_kind: NodeKind,
     343            0 : ) -> RootTarget {
     344            0 :     let s3_target = S3Target {
     345            0 :         bucket_name,
     346            0 :         prefix_in_bucket,
     347            0 :         delimiter: "/".to_string(),
     348            0 :     };
     349            0 :     match node_kind {
     350            0 :         NodeKind::Pageserver => RootTarget::Pageserver(s3_target),
     351            0 :         NodeKind::Safekeeper => RootTarget::Safekeeper(s3_target),
     352              :     }
     353            0 : }
     354              : 
     355            0 : async fn init_remote(
     356            0 :     bucket_config: BucketConfig,
     357            0 :     node_kind: NodeKind,
     358            0 : ) -> anyhow::Result<(Arc<Client>, RootTarget)> {
     359            0 :     let bucket_region = Region::new(bucket_config.region);
     360            0 :     let s3_client = Arc::new(init_s3_client(bucket_region).await);
     361            0 :     let default_prefix = default_prefix_in_bucket(node_kind).to_string();
     362            0 : 
     363            0 :     let s3_root = make_root_target(
     364            0 :         bucket_config.bucket,
     365            0 :         bucket_config.prefix_in_bucket.unwrap_or(default_prefix),
     366            0 :         node_kind,
     367            0 :     );
     368            0 : 
     369            0 :     Ok((s3_client, s3_root))
     370            0 : }
     371              : 
     372            0 : async fn init_remote_generic(
     373            0 :     bucket_config: BucketConfig,
     374            0 :     node_kind: NodeKind,
     375            0 : ) -> anyhow::Result<(GenericRemoteStorage, RootTarget)> {
     376            0 :     let endpoint = env::var("AWS_ENDPOINT_URL").ok();
     377            0 :     let default_prefix = default_prefix_in_bucket(node_kind).to_string();
     378            0 :     let prefix_in_bucket = Some(bucket_config.prefix_in_bucket.unwrap_or(default_prefix));
     379            0 :     let storage = S3Config {
     380            0 :         bucket_name: bucket_config.bucket.clone(),
     381            0 :         bucket_region: bucket_config.region,
     382            0 :         prefix_in_bucket,
     383            0 :         endpoint,
     384            0 :         concurrency_limit: DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
     385            0 :             .try_into()
     386            0 :             .unwrap(),
     387            0 :         max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
     388            0 :         upload_storage_class: None,
     389            0 :     };
     390            0 :     let storage_config = RemoteStorageConfig {
     391            0 :         storage: RemoteStorageKind::AwsS3(storage),
     392            0 :         timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
     393            0 :     };
     394            0 : 
     395            0 :     // We already pass the prefix to the remote client above
     396            0 :     let prefix_in_root_target = String::new();
     397            0 :     let s3_root = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind);
     398              : 
     399            0 :     let client = GenericRemoteStorage::from_config(&storage_config).await?;
     400            0 :     Ok((client, s3_root))
     401            0 : }
     402              : 
     403            0 : async fn list_objects_with_retries(
     404            0 :     s3_client: &Client,
     405            0 :     s3_target: &S3Target,
     406            0 :     continuation_token: Option<String>,
     407            0 : ) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
     408            0 :     for trial in 0..MAX_RETRIES {
     409            0 :         match s3_client
     410            0 :             .list_objects_v2()
     411            0 :             .bucket(&s3_target.bucket_name)
     412            0 :             .prefix(&s3_target.prefix_in_bucket)
     413            0 :             .delimiter(&s3_target.delimiter)
     414            0 :             .set_continuation_token(continuation_token.clone())
     415            0 :             .send()
     416            0 :             .await
     417              :         {
     418            0 :             Ok(response) => return Ok(response),
     419            0 :             Err(e) => {
     420            0 :                 if trial == MAX_RETRIES - 1 {
     421            0 :                     return Err(e)
     422            0 :                         .with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
     423            0 :                 }
     424            0 :                 error!(
     425            0 :                     "list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}",
     426            0 :                     s3_target.bucket_name,
     427            0 :                     s3_target.prefix_in_bucket,
     428            0 :                     s3_target.delimiter,
     429            0 :                     DisplayErrorContext(e),
     430              :                 );
     431            0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     432              :             }
     433              :         }
     434              :     }
     435            0 :     Err(anyhow!("unreachable unless MAX_RETRIES==0"))
     436            0 : }
     437              : 
     438              : /// Listing possibly large amounts of keys in a streaming fashion.
     439            0 : fn stream_objects_with_retries<'a>(
     440            0 :     storage_client: &'a GenericRemoteStorage,
     441            0 :     listing_mode: ListingMode,
     442            0 :     s3_target: &'a S3Target,
     443            0 : ) -> impl Stream<Item = Result<Listing, anyhow::Error>> + 'a {
     444              :     async_stream::stream! {
     445              :         let mut trial = 0;
     446              :         let cancel = CancellationToken::new();
     447              :         let prefix_str = &s3_target
     448              :             .prefix_in_bucket
     449              :             .strip_prefix("/")
     450              :             .unwrap_or(&s3_target.prefix_in_bucket);
     451              :         let prefix = RemotePath::from_string(prefix_str)?;
     452              :         let mut list_stream =
     453              :             storage_client.list_streaming(Some(&prefix), listing_mode, None, &cancel);
     454              :         while let Some(res) = list_stream.next().await {
     455              :             if let Err(err) = res {
     456              :                 let yield_err = if err.is_permanent() {
     457              :                     true
     458              :                 } else {
     459              :                     let backoff_time = 1 << trial.max(5);
     460              :                     tokio::time::sleep(Duration::from_secs(backoff_time)).await;
     461              :                     trial += 1;
     462              :                     trial == MAX_RETRIES - 1
     463              :                 };
     464              :                 if yield_err {
     465              :                     yield Err(err)
     466            0 :                         .with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
     467              :                     break;
     468              :                 }
     469              :             } else {
     470              :                 trial = 0;
     471              :                 yield res.map_err(anyhow::Error::from);
     472              :             }
     473              :         }
     474              :     }
     475            0 : }
     476              : 
     477              : /// If you want to list a bounded amount of prefixes or keys. For larger numbers of keys/prefixes,
     478              : /// use [`stream_objects_with_retries`] instead.
     479            0 : async fn list_objects_with_retries_generic(
     480            0 :     remote_client: &GenericRemoteStorage,
     481            0 :     listing_mode: ListingMode,
     482            0 :     s3_target: &S3Target,
     483            0 : ) -> anyhow::Result<Listing> {
     484            0 :     let cancel = CancellationToken::new();
     485            0 :     let prefix_str = &s3_target
     486            0 :         .prefix_in_bucket
     487            0 :         .strip_prefix("/")
     488            0 :         .unwrap_or(&s3_target.prefix_in_bucket);
     489            0 :     let prefix = RemotePath::from_string(prefix_str)?;
     490            0 :     for trial in 0..MAX_RETRIES {
     491            0 :         match remote_client
     492            0 :             .list(Some(&prefix), listing_mode, None, &cancel)
     493            0 :             .await
     494              :         {
     495            0 :             Ok(response) => return Ok(response),
     496            0 :             Err(e) => {
     497            0 :                 if trial == MAX_RETRIES - 1 {
     498            0 :                     return Err(e)
     499            0 :                         .with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
     500            0 :                 }
     501            0 :                 error!(
     502            0 :                     "list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}",
     503            0 :                     s3_target.bucket_name,
     504            0 :                     s3_target.prefix_in_bucket,
     505            0 :                     s3_target.delimiter,
     506            0 :                     DisplayErrorContext(e),
     507              :                 );
     508            0 :                 let backoff_time = 1 << trial.max(5);
     509            0 :                 tokio::time::sleep(Duration::from_secs(backoff_time)).await;
     510              :             }
     511              :         }
     512              :     }
     513            0 :     panic!("MAX_RETRIES is not allowed to be 0");
     514            0 : }
     515              : 
     516            0 : async fn download_object_with_retries(
     517            0 :     s3_client: &Client,
     518            0 :     bucket_name: &str,
     519            0 :     key: &str,
     520            0 : ) -> anyhow::Result<Vec<u8>> {
     521            0 :     for _ in 0..MAX_RETRIES {
     522            0 :         let mut body_buf = Vec::new();
     523            0 :         let response_stream = match s3_client
     524            0 :             .get_object()
     525            0 :             .bucket(bucket_name)
     526            0 :             .key(key)
     527            0 :             .send()
     528            0 :             .await
     529              :         {
     530            0 :             Ok(response) => response,
     531            0 :             Err(e) => {
     532            0 :                 error!("Failed to download object for key {key}: {e}");
     533            0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     534            0 :                 continue;
     535              :             }
     536              :         };
     537              : 
     538            0 :         match response_stream
     539            0 :             .body
     540            0 :             .into_async_read()
     541            0 :             .read_to_end(&mut body_buf)
     542            0 :             .await
     543              :         {
     544            0 :             Ok(bytes_read) => {
     545            0 :                 tracing::debug!("Downloaded {bytes_read} bytes for object {key}");
     546            0 :                 return Ok(body_buf);
     547              :             }
     548            0 :             Err(e) => {
     549            0 :                 error!("Failed to stream object body for key {key}: {e}");
     550            0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     551              :             }
     552              :         }
     553              :     }
     554              : 
     555            0 :     anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
     556            0 : }
     557              : 
     558            0 : async fn download_object_to_file(
     559            0 :     s3_client: &Client,
     560            0 :     bucket_name: &str,
     561            0 :     key: &str,
     562            0 :     version_id: Option<&str>,
     563            0 :     local_path: &Utf8Path,
     564            0 : ) -> anyhow::Result<()> {
     565            0 :     let tmp_path = Utf8PathBuf::from(format!("{local_path}.tmp"));
     566            0 :     for _ in 0..MAX_RETRIES {
     567            0 :         tokio::fs::remove_file(&tmp_path)
     568            0 :             .await
     569            0 :             .or_else(fs_ext::ignore_not_found)?;
     570              : 
     571            0 :         let mut file = tokio::fs::File::create(&tmp_path)
     572            0 :             .await
     573            0 :             .context("Opening output file")?;
     574              : 
     575            0 :         let request = s3_client.get_object().bucket(bucket_name).key(key);
     576              : 
     577            0 :         let request = match version_id {
     578            0 :             Some(version_id) => request.version_id(version_id),
     579            0 :             None => request,
     580              :         };
     581              : 
     582            0 :         let response_stream = match request.send().await {
     583            0 :             Ok(response) => response,
     584            0 :             Err(e) => {
     585            0 :                 error!(
     586            0 :                     "Failed to download object for key {key} version {}: {e:#}",
     587            0 :                     version_id.unwrap_or("")
     588              :                 );
     589            0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     590            0 :                 continue;
     591              :             }
     592              :         };
     593              : 
     594            0 :         let mut read_stream = response_stream.body.into_async_read();
     595            0 : 
     596            0 :         tokio::io::copy(&mut read_stream, &mut file).await?;
     597              : 
     598            0 :         tokio::fs::rename(&tmp_path, local_path).await?;
     599            0 :         return Ok(());
     600              :     }
     601              : 
     602            0 :     anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
     603            0 : }
        

Generated by: LCOV version 2.1-beta