LCOV - code coverage report
Current view: top level - storage_scrubber/src - lib.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 0.0 % 343 0
Test Date: 2025-04-24 20:31:15 Functions: 0.0 % 56 0

            Line data    Source code
       1              : #![deny(unsafe_code)]
       2              : #![deny(clippy::undocumented_unsafe_blocks)]
       3              : pub mod checks;
       4              : pub mod cloud_admin_api;
       5              : pub mod find_large_objects;
       6              : pub mod garbage;
       7              : pub mod metadata_stream;
       8              : pub mod pageserver_physical_gc;
       9              : pub mod scan_pageserver_metadata;
      10              : pub mod scan_safekeeper_metadata;
      11              : pub mod tenant_snapshot;
      12              : 
      13              : use std::env;
      14              : use std::fmt::Display;
      15              : use std::time::{Duration, SystemTime};
      16              : 
      17              : use anyhow::Context;
      18              : use camino::{Utf8Path, Utf8PathBuf};
      19              : use clap::ValueEnum;
      20              : use futures::{Stream, StreamExt};
      21              : use pageserver::tenant::TENANTS_SEGMENT_NAME;
      22              : use pageserver::tenant::remote_timeline_client::{remote_tenant_path, remote_timeline_path};
      23              : use pageserver_api::shard::TenantShardId;
      24              : use remote_storage::{
      25              :     DownloadOpts, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorageConfig,
      26              :     RemoteStorageKind, VersionId,
      27              : };
      28              : use reqwest::Url;
      29              : use serde::{Deserialize, Serialize};
      30              : use storage_controller_client::control_api;
      31              : use tokio::io::AsyncReadExt;
      32              : use tokio_util::sync::CancellationToken;
      33              : use tracing::{error, warn};
      34              : use tracing_appender::non_blocking::WorkerGuard;
      35              : use tracing_subscriber::prelude::*;
      36              : use tracing_subscriber::{EnvFilter, fmt};
      37              : use utils::fs_ext;
      38              : use utils::id::{TenantId, TenantTimelineId, TimelineId};
      39              : 
      40              : const MAX_RETRIES: usize = 20;
      41              : const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
      42              : 
      43              : #[derive(Debug, Clone)]
      44              : pub struct S3Target {
      45              :     pub desc_str: String,
      46              :     /// This `prefix_in_bucket` is only equal to the PS/SK config of the same
      47              :     /// name for the RootTarget: other instances of S3Target will have prefix_in_bucket
      48              :     /// with extra parts.
      49              :     pub prefix_in_bucket: String,
      50              :     pub delimiter: String,
      51              : }
      52              : 
      53              : /// Convenience for referring to timelines within a particular shard: more ergonomic
      54              : /// than using a 2-tuple.
      55              : ///
      56              : /// This is the shard-aware equivalent of TenantTimelineId.  It's defined here rather
      57              : /// than somewhere more broadly exposed, because this kind of thing is rarely needed
      58              : /// in the pageserver, as all timeline objects existing in the scope of a particular
      59              : /// tenant: the scrubber is different in that it handles collections of data referring to many
      60              : /// TenantShardTimelineIds in on place.
      61            0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
      62              : pub struct TenantShardTimelineId {
      63              :     tenant_shard_id: TenantShardId,
      64              :     timeline_id: TimelineId,
      65              : }
      66              : 
      67              : impl TenantShardTimelineId {
      68            0 :     fn new(tenant_shard_id: TenantShardId, timeline_id: TimelineId) -> Self {
      69            0 :         Self {
      70            0 :             tenant_shard_id,
      71            0 :             timeline_id,
      72            0 :         }
      73            0 :     }
      74              : 
      75            0 :     fn as_tenant_timeline_id(&self) -> TenantTimelineId {
      76            0 :         TenantTimelineId::new(self.tenant_shard_id.tenant_id, self.timeline_id)
      77            0 :     }
      78              : }
      79              : 
      80              : impl Display for TenantShardTimelineId {
      81            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      82            0 :         write!(f, "{}/{}", self.tenant_shard_id, self.timeline_id)
      83            0 :     }
      84              : }
      85              : 
      86              : #[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
      87              : pub enum TraversingDepth {
      88              :     Tenant,
      89              :     Timeline,
      90              : }
      91              : 
      92              : impl Display for TraversingDepth {
      93            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      94            0 :         f.write_str(match self {
      95            0 :             Self::Tenant => "tenant",
      96            0 :             Self::Timeline => "timeline",
      97              :         })
      98            0 :     }
      99              : }
     100              : 
     101            0 : #[derive(ValueEnum, Clone, Copy, Eq, PartialEq, Debug, Serialize, Deserialize)]
     102              : pub enum NodeKind {
     103              :     Safekeeper,
     104              :     Pageserver,
     105              : }
     106              : 
     107              : impl NodeKind {
     108            0 :     fn as_str(&self) -> &'static str {
     109            0 :         match self {
     110            0 :             Self::Safekeeper => "safekeeper",
     111            0 :             Self::Pageserver => "pageserver",
     112              :         }
     113            0 :     }
     114              : }
     115              : 
     116              : impl Display for NodeKind {
     117            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     118            0 :         f.write_str(self.as_str())
     119            0 :     }
     120              : }
     121              : 
     122              : impl S3Target {
     123            0 :     pub fn with_sub_segment(&self, new_segment: &str) -> Self {
     124            0 :         let mut new_self = self.clone();
     125            0 :         if new_self.prefix_in_bucket.is_empty() {
     126            0 :             new_self.prefix_in_bucket = format!("/{}/", new_segment);
     127            0 :         } else {
     128            0 :             if new_self.prefix_in_bucket.ends_with('/') {
     129            0 :                 new_self.prefix_in_bucket.pop();
     130            0 :             }
     131            0 :             new_self.prefix_in_bucket =
     132            0 :                 [&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter);
     133              :         }
     134            0 :         new_self
     135            0 :     }
     136              : }
     137              : 
     138              : #[derive(Clone)]
     139              : pub enum RootTarget {
     140              :     Pageserver(S3Target),
     141              :     Safekeeper(S3Target),
     142              : }
     143              : 
     144              : impl RootTarget {
     145            0 :     pub fn tenants_root(&self) -> S3Target {
     146            0 :         match self {
     147            0 :             Self::Pageserver(root) => root.with_sub_segment(TENANTS_SEGMENT_NAME),
     148            0 :             Self::Safekeeper(root) => root.clone(),
     149              :         }
     150            0 :     }
     151              : 
     152            0 :     pub fn tenant_root(&self, tenant_id: &TenantShardId) -> S3Target {
     153            0 :         match self {
     154            0 :             Self::Pageserver(_) => self.tenants_root().with_sub_segment(&tenant_id.to_string()),
     155            0 :             Self::Safekeeper(_) => self
     156            0 :                 .tenants_root()
     157            0 :                 .with_sub_segment(&tenant_id.tenant_id.to_string()),
     158              :         }
     159            0 :     }
     160              : 
     161            0 :     pub(crate) fn tenant_shards_prefix(&self, tenant_id: &TenantId) -> S3Target {
     162            0 :         // Only pageserver remote storage contains tenant-shards
     163            0 :         assert!(matches!(self, Self::Pageserver(_)));
     164            0 :         let Self::Pageserver(root) = self else {
     165            0 :             panic!();
     166              :         };
     167              : 
     168            0 :         S3Target {
     169            0 :             desc_str: root.desc_str.clone(),
     170            0 :             prefix_in_bucket: format!(
     171            0 :                 "{}/{TENANTS_SEGMENT_NAME}/{tenant_id}",
     172            0 :                 root.prefix_in_bucket
     173            0 :             ),
     174            0 :             delimiter: root.delimiter.clone(),
     175            0 :         }
     176            0 :     }
     177              : 
     178            0 :     pub fn timelines_root(&self, tenant_id: &TenantShardId) -> S3Target {
     179            0 :         match self {
     180            0 :             Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
     181            0 :             Self::Safekeeper(_) => self.tenant_root(tenant_id),
     182              :         }
     183            0 :     }
     184              : 
     185            0 :     pub fn timeline_root(&self, id: &TenantShardTimelineId) -> S3Target {
     186            0 :         self.timelines_root(&id.tenant_shard_id)
     187            0 :             .with_sub_segment(&id.timeline_id.to_string())
     188            0 :     }
     189              : 
     190              :     /// Given RemotePath "tenants/foo/timelines/bar/layerxyz", prefix it to a literal
     191              :     /// key in the S3 bucket.
     192            0 :     pub fn absolute_key(&self, key: &RemotePath) -> String {
     193            0 :         let root = match self {
     194            0 :             Self::Pageserver(root) => root,
     195            0 :             Self::Safekeeper(root) => root,
     196              :         };
     197              : 
     198            0 :         let prefix = &root.prefix_in_bucket;
     199            0 :         if prefix.ends_with('/') {
     200            0 :             format!("{prefix}{key}")
     201              :         } else {
     202            0 :             format!("{prefix}/{key}")
     203              :         }
     204            0 :     }
     205              : 
     206            0 :     pub fn desc_str(&self) -> &str {
     207            0 :         match self {
     208            0 :             Self::Pageserver(root) => &root.desc_str,
     209            0 :             Self::Safekeeper(root) => &root.desc_str,
     210              :         }
     211            0 :     }
     212              : 
     213            0 :     pub fn delimiter(&self) -> &str {
     214            0 :         match self {
     215            0 :             Self::Pageserver(root) => &root.delimiter,
     216            0 :             Self::Safekeeper(root) => &root.delimiter,
     217              :         }
     218            0 :     }
     219              : }
     220              : 
     221            0 : pub fn remote_timeline_path_id(id: &TenantShardTimelineId) -> RemotePath {
     222            0 :     remote_timeline_path(&id.tenant_shard_id, &id.timeline_id)
     223            0 : }
     224              : 
     225            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     226              : #[serde(deny_unknown_fields)]
     227              : pub struct BucketConfig(RemoteStorageConfig);
     228              : 
     229              : impl BucketConfig {
     230            0 :     pub fn from_env() -> anyhow::Result<Self> {
     231            0 :         if let Ok(legacy) = Self::from_env_legacy() {
     232            0 :             return Ok(legacy);
     233            0 :         }
     234            0 :         let config_toml =
     235            0 :             env::var("REMOTE_STORAGE_CONFIG").context("'REMOTE_STORAGE_CONFIG' retrieval")?;
     236            0 :         let remote_config = RemoteStorageConfig::from_toml_str(&config_toml)?;
     237            0 :         Ok(BucketConfig(remote_config))
     238            0 :     }
     239              : 
     240            0 :     fn from_env_legacy() -> anyhow::Result<Self> {
     241            0 :         let bucket_region = env::var("REGION").context("'REGION' param retrieval")?;
     242            0 :         let bucket_name = env::var("BUCKET").context("'BUCKET' param retrieval")?;
     243            0 :         let prefix_in_bucket = env::var("BUCKET_PREFIX").ok();
     244            0 :         let endpoint = env::var("AWS_ENDPOINT_URL").ok();
     245            0 :         // Create a json object which we then deserialize so that we don't
     246            0 :         // have to repeat all of the S3Config fields.
     247            0 :         let s3_config_json = serde_json::json!({
     248            0 :             "bucket_name": bucket_name,
     249            0 :             "bucket_region": bucket_region,
     250            0 :             "prefix_in_bucket": prefix_in_bucket,
     251            0 :             "endpoint": endpoint,
     252            0 :         });
     253            0 :         let config: RemoteStorageConfig = serde_json::from_value(s3_config_json)?;
     254            0 :         Ok(BucketConfig(config))
     255            0 :     }
     256            0 :     pub fn desc_str(&self) -> String {
     257            0 :         match &self.0.storage {
     258            0 :             RemoteStorageKind::LocalFs { local_path } => {
     259            0 :                 format!("local path {local_path}")
     260              :             }
     261            0 :             RemoteStorageKind::AwsS3(config) => format!(
     262            0 :                 "bucket {}, region {}",
     263            0 :                 config.bucket_name, config.bucket_region
     264            0 :             ),
     265            0 :             RemoteStorageKind::AzureContainer(config) => format!(
     266            0 :                 "container {}, storage account {:?}, region {}",
     267            0 :                 config.container_name, config.storage_account, config.container_region
     268            0 :             ),
     269              :         }
     270            0 :     }
     271            0 :     pub fn bucket_name(&self) -> Option<&str> {
     272            0 :         self.0.storage.bucket_name()
     273            0 :     }
     274              : }
     275              : 
     276            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     277              : #[serde(deny_unknown_fields)]
     278              : pub struct BucketConfigLegacy {
     279              :     pub region: String,
     280              :     pub bucket: String,
     281              :     pub prefix_in_bucket: Option<String>,
     282              : }
     283              : 
     284              : pub struct ControllerClientConfig {
     285              :     /// URL to storage controller.  e.g. http://127.0.0.1:1234 when using `neon_local`
     286              :     pub controller_api: Url,
     287              : 
     288              :     /// JWT token for authenticating with storage controller.  Requires scope 'scrubber' or 'admin'.
     289              :     pub controller_jwt: String,
     290              : }
     291              : 
     292              : impl ControllerClientConfig {
     293            0 :     pub fn build_client(self, http_client: reqwest::Client) -> control_api::Client {
     294            0 :         control_api::Client::new(http_client, self.controller_api, Some(self.controller_jwt))
     295            0 :     }
     296              : }
     297              : 
     298              : pub struct ConsoleConfig {
     299              :     pub token: String,
     300              :     pub base_url: Url,
     301              : }
     302              : 
     303              : impl ConsoleConfig {
     304            0 :     pub fn from_env() -> anyhow::Result<Self> {
     305            0 :         let base_url: Url = env::var("CLOUD_ADMIN_API_URL")
     306            0 :             .context("'CLOUD_ADMIN_API_URL' param retrieval")?
     307            0 :             .parse()
     308            0 :             .context("'CLOUD_ADMIN_API_URL' param parsing")?;
     309              : 
     310            0 :         let token = env::var(CLOUD_ADMIN_API_TOKEN_ENV_VAR)
     311            0 :             .context("'CLOUD_ADMIN_API_TOKEN' environment variable fetch")?;
     312              : 
     313            0 :         Ok(Self { base_url, token })
     314            0 :     }
     315              : }
     316              : 
     317            0 : pub fn init_logging(file_name: &str) -> Option<WorkerGuard> {
     318            0 :     let stderr_logs = fmt::Layer::new()
     319            0 :         .with_target(false)
     320            0 :         .with_writer(std::io::stderr);
     321              : 
     322            0 :     let disable_file_logging = match std::env::var("PAGESERVER_DISABLE_FILE_LOGGING") {
     323            0 :         Ok(s) => s == "1" || s.to_lowercase() == "true",
     324            0 :         Err(_) => false,
     325              :     };
     326              : 
     327            0 :     if disable_file_logging {
     328            0 :         tracing_subscriber::registry()
     329            0 :             .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
     330            0 :             .with(stderr_logs)
     331            0 :             .init();
     332            0 :         None
     333              :     } else {
     334            0 :         let (file_writer, guard) =
     335            0 :             tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
     336            0 :         let file_logs = fmt::Layer::new()
     337            0 :             .with_target(false)
     338            0 :             .with_ansi(false)
     339            0 :             .with_writer(file_writer);
     340            0 :         tracing_subscriber::registry()
     341            0 :             .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
     342            0 :             .with(stderr_logs)
     343            0 :             .with(file_logs)
     344            0 :             .init();
     345            0 :         Some(guard)
     346              :     }
     347            0 : }
     348              : 
     349            0 : fn default_prefix_in_bucket(node_kind: NodeKind) -> &'static str {
     350            0 :     match node_kind {
     351            0 :         NodeKind::Pageserver => "pageserver/v1/",
     352            0 :         NodeKind::Safekeeper => "wal/",
     353              :     }
     354            0 : }
     355              : 
     356            0 : fn make_root_target(desc_str: String, prefix_in_bucket: String, node_kind: NodeKind) -> RootTarget {
     357            0 :     let s3_target = S3Target {
     358            0 :         desc_str,
     359            0 :         prefix_in_bucket,
     360            0 :         delimiter: "/".to_string(),
     361            0 :     };
     362            0 :     match node_kind {
     363            0 :         NodeKind::Pageserver => RootTarget::Pageserver(s3_target),
     364            0 :         NodeKind::Safekeeper => RootTarget::Safekeeper(s3_target),
     365              :     }
     366            0 : }
     367              : 
     368            0 : async fn init_remote(
     369            0 :     mut storage_config: BucketConfig,
     370            0 :     node_kind: NodeKind,
     371            0 : ) -> anyhow::Result<(GenericRemoteStorage, RootTarget)> {
     372            0 :     let desc_str = storage_config.desc_str();
     373            0 : 
     374            0 :     let default_prefix = default_prefix_in_bucket(node_kind).to_string();
     375            0 : 
     376            0 :     match &mut storage_config.0.storage {
     377            0 :         RemoteStorageKind::AwsS3(config) => {
     378            0 :             config.prefix_in_bucket.get_or_insert(default_prefix);
     379            0 :         }
     380            0 :         RemoteStorageKind::AzureContainer(config) => {
     381            0 :             config.prefix_in_container.get_or_insert(default_prefix);
     382            0 :         }
     383            0 :         RemoteStorageKind::LocalFs { .. } => (),
     384              :     }
     385              : 
     386              :     // We already pass the prefix to the remote client above
     387            0 :     let prefix_in_root_target = String::new();
     388            0 :     let root_target = make_root_target(desc_str, prefix_in_root_target, node_kind);
     389              : 
     390            0 :     let client = GenericRemoteStorage::from_config(&storage_config.0).await?;
     391            0 :     Ok((client, root_target))
     392            0 : }
     393              : 
     394              : /// Listing possibly large amounts of keys in a streaming fashion.
     395            0 : fn stream_objects_with_retries<'a>(
     396            0 :     storage_client: &'a GenericRemoteStorage,
     397            0 :     listing_mode: ListingMode,
     398            0 :     s3_target: &'a S3Target,
     399            0 : ) -> impl Stream<Item = Result<Listing, anyhow::Error>> + 'a {
     400            0 :     async_stream::stream! {
     401            0 :         let mut trial = 0;
     402            0 :         let cancel = CancellationToken::new();
     403            0 :         let prefix_str = &s3_target
     404            0 :             .prefix_in_bucket
     405            0 :             .strip_prefix("/")
     406            0 :             .unwrap_or(&s3_target.prefix_in_bucket);
     407            0 :         let prefix = RemotePath::from_string(prefix_str)?;
     408            0 :         let mut list_stream =
     409            0 :             storage_client.list_streaming(Some(&prefix), listing_mode, None, &cancel);
     410            0 :         while let Some(res) = list_stream.next().await {
     411            0 :             match res {
     412            0 :                 Err(err) => {
     413            0 :                     let yield_err = if err.is_permanent() {
     414            0 :                         true
     415            0 :                     } else {
     416            0 :                         let backoff_time = 1 << trial.min(5);
     417            0 :                         tokio::time::sleep(Duration::from_secs(backoff_time)).await;
     418            0 :                         trial += 1;
     419            0 :                         trial == MAX_RETRIES - 1
     420            0 :                     };
     421            0 :                     if yield_err {
     422            0 :                         yield Err(err)
     423            0 :                             .with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
     424            0 :                         break;
     425            0 :                     }
     426            0 :                 }
     427            0 :                 Ok(res) => {
     428            0 :                     trial = 0;
     429            0 :                     yield Ok(res);
     430            0 :                 }
     431            0 :             }
     432            0 :         }
     433            0 :     }
     434            0 : }
     435              : 
     436              : /// If you want to list a bounded amount of prefixes or keys. For larger numbers of keys/prefixes,
     437              : /// use [`stream_objects_with_retries`] instead.
     438            0 : async fn list_objects_with_retries(
     439            0 :     remote_client: &GenericRemoteStorage,
     440            0 :     listing_mode: ListingMode,
     441            0 :     s3_target: &S3Target,
     442            0 : ) -> anyhow::Result<Listing> {
     443            0 :     let cancel = CancellationToken::new();
     444            0 :     let prefix_str = &s3_target
     445            0 :         .prefix_in_bucket
     446            0 :         .strip_prefix("/")
     447            0 :         .unwrap_or(&s3_target.prefix_in_bucket);
     448            0 :     let prefix = RemotePath::from_string(prefix_str)?;
     449            0 :     for trial in 0..MAX_RETRIES {
     450            0 :         match remote_client
     451            0 :             .list(Some(&prefix), listing_mode, None, &cancel)
     452            0 :             .await
     453              :         {
     454            0 :             Ok(response) => return Ok(response),
     455            0 :             Err(e) => {
     456            0 :                 if trial == MAX_RETRIES - 1 {
     457            0 :                     return Err(e)
     458            0 :                         .with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
     459            0 :                 }
     460            0 :                 warn!(
     461            0 :                     "list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}",
     462            0 :                     remote_client.bucket_name().unwrap_or_default(),
     463              :                     s3_target.prefix_in_bucket,
     464              :                     s3_target.delimiter,
     465              :                     e,
     466              :                 );
     467            0 :                 let backoff_time = 1 << trial.min(5);
     468            0 :                 tokio::time::sleep(Duration::from_secs(backoff_time)).await;
     469              :             }
     470              :         }
     471              :     }
     472            0 :     panic!("MAX_RETRIES is not allowed to be 0");
     473            0 : }
     474              : 
     475              : /// Returns content, last modified time
     476            0 : async fn download_object_with_retries(
     477            0 :     remote_client: &GenericRemoteStorage,
     478            0 :     key: &RemotePath,
     479            0 : ) -> anyhow::Result<(Vec<u8>, SystemTime)> {
     480            0 :     let cancel = CancellationToken::new();
     481            0 :     for trial in 0..MAX_RETRIES {
     482            0 :         let mut buf = Vec::new();
     483            0 :         let download = match remote_client
     484            0 :             .download(key, &DownloadOpts::default(), &cancel)
     485            0 :             .await
     486              :         {
     487            0 :             Ok(response) => response,
     488            0 :             Err(e) => {
     489            0 :                 error!("Failed to download object for key {key}: {e}");
     490            0 :                 let backoff_time = 1 << trial.min(5);
     491            0 :                 tokio::time::sleep(Duration::from_secs(backoff_time)).await;
     492            0 :                 continue;
     493              :             }
     494              :         };
     495              : 
     496            0 :         match tokio_util::io::StreamReader::new(download.download_stream)
     497            0 :             .read_to_end(&mut buf)
     498            0 :             .await
     499              :         {
     500            0 :             Ok(bytes_read) => {
     501            0 :                 tracing::debug!("Downloaded {bytes_read} bytes for object {key}");
     502            0 :                 return Ok((buf, download.last_modified));
     503              :             }
     504            0 :             Err(e) => {
     505            0 :                 error!("Failed to stream object body for key {key}: {e}");
     506            0 :                 let backoff_time = 1 << trial.min(5);
     507            0 :                 tokio::time::sleep(Duration::from_secs(backoff_time)).await;
     508              :             }
     509              :         }
     510              :     }
     511              : 
     512            0 :     anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
     513            0 : }
     514              : 
     515            0 : async fn download_object_to_file(
     516            0 :     remote_storage: &GenericRemoteStorage,
     517            0 :     key: &RemotePath,
     518            0 :     version_id: Option<VersionId>,
     519            0 :     local_path: &Utf8Path,
     520            0 : ) -> anyhow::Result<()> {
     521            0 :     let opts = DownloadOpts {
     522            0 :         version_id: version_id.clone(),
     523            0 :         ..Default::default()
     524            0 :     };
     525            0 :     let tmp_path = Utf8PathBuf::from(format!("{local_path}.tmp"));
     526            0 :     let cancel = CancellationToken::new();
     527            0 :     for _ in 0..MAX_RETRIES {
     528            0 :         tokio::fs::remove_file(&tmp_path)
     529            0 :             .await
     530            0 :             .or_else(fs_ext::ignore_not_found)?;
     531              : 
     532            0 :         let mut file = tokio::fs::File::create(&tmp_path)
     533            0 :             .await
     534            0 :             .context("Opening output file")?;
     535              : 
     536            0 :         let res = remote_storage.download(key, &opts, &cancel).await;
     537              : 
     538            0 :         let download = match res {
     539            0 :             Ok(response) => response,
     540            0 :             Err(e) => {
     541            0 :                 error!(
     542            0 :                     "Failed to download object for key {key} version {:?}: {e:#}",
     543            0 :                     &version_id.as_ref().unwrap_or(&VersionId(String::new()))
     544              :                 );
     545            0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     546            0 :                 continue;
     547              :             }
     548              :         };
     549              : 
     550              :         //response_stream.download_stream
     551              : 
     552            0 :         let mut body = tokio_util::io::StreamReader::new(download.download_stream);
     553            0 :         tokio::io::copy(&mut body, &mut file).await?;
     554              : 
     555            0 :         tokio::fs::rename(&tmp_path, local_path).await?;
     556            0 :         return Ok(());
     557              :     }
     558              : 
     559            0 :     anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
     560            0 : }
        

Generated by: LCOV version 2.1-beta