LCOV - code coverage report
Current view: top level - libs/remote_storage/src - s3_bucket.rs (source / functions) Coverage Total Hit
Test: bb522999b2ee0ee028df22bb188d3a84170ba700.info Lines: 89.4 % 754 674
Test Date: 2024-07-21 16:16:09 Functions: 72.1 % 86 62

            Line data    Source code
       1              : //! AWS S3 storage wrapper around `rusoto` library.
       2              : //!
       3              : //! Respects `prefix_in_bucket` property from [`S3Config`],
       4              : //! allowing multiple api users to independently work with the same S3 bucket, if
       5              : //! their bucket prefixes are both specified and different.
       6              : 
       7              : use std::{
       8              :     borrow::Cow,
       9              :     collections::HashMap,
      10              :     num::NonZeroU32,
      11              :     pin::Pin,
      12              :     sync::Arc,
      13              :     task::{Context, Poll},
      14              :     time::{Duration, SystemTime},
      15              : };
      16              : 
      17              : use anyhow::{anyhow, Context as _};
      18              : use aws_config::{
      19              :     default_provider::credentials::DefaultCredentialsChain,
      20              :     retry::{RetryConfigBuilder, RetryMode},
      21              :     BehaviorVersion,
      22              : };
      23              : use aws_sdk_s3::{
      24              :     config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
      25              :     error::SdkError,
      26              :     operation::get_object::GetObjectError,
      27              :     types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass},
      28              :     Client,
      29              : };
      30              : use aws_smithy_async::rt::sleep::TokioSleep;
      31              : 
      32              : use aws_smithy_types::{body::SdkBody, DateTime};
      33              : use aws_smithy_types::{byte_stream::ByteStream, date_time::ConversionError};
      34              : use bytes::Bytes;
      35              : use futures::stream::Stream;
      36              : use hyper::Body;
      37              : use scopeguard::ScopeGuard;
      38              : use tokio_util::sync::CancellationToken;
      39              : use utils::backoff;
      40              : 
      41              : use super::StorageMetadata;
      42              : use crate::{
      43              :     config::S3Config,
      44              :     error::Cancelled,
      45              :     metrics::{start_counting_cancelled_wait, start_measuring_requests},
      46              :     support::PermitCarrying,
      47              :     ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
      48              :     TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
      49              : };
      50              : 
      51              : use crate::metrics::AttemptOutcome;
      52              : pub(super) use crate::metrics::RequestKind;
      53              : 
      54              : /// AWS S3 storage.
      55              : pub struct S3Bucket {
      56              :     client: Client,
      57              :     bucket_name: String,
      58              :     prefix_in_bucket: Option<String>,
      59              :     max_keys_per_list_response: Option<i32>,
      60              :     upload_storage_class: Option<StorageClass>,
      61              :     concurrency_limiter: ConcurrencyLimiter,
      62              :     // Per-request timeout. Accessible for tests.
      63              :     pub timeout: Duration,
      64              : }
      65              : 
      66              : struct GetObjectRequest {
      67              :     bucket: String,
      68              :     key: String,
      69              :     range: Option<String>,
      70              : }
      71              : impl S3Bucket {
      72              :     /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
      73           38 :     pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
      74           38 :         tracing::debug!(
      75            0 :             "Creating s3 remote storage for S3 bucket {}",
      76              :             remote_storage_config.bucket_name
      77              :         );
      78              : 
      79           38 :         let region = Region::new(remote_storage_config.bucket_region.clone());
      80           38 :         let region_opt = Some(region.clone());
      81              : 
      82              :         // https://docs.aws.amazon.com/sdkref/latest/guide/standardized-credentials.html
      83              :         // https://docs.rs/aws-config/latest/aws_config/default_provider/credentials/struct.DefaultCredentialsChain.html
      84              :         // Incomplete list of auth methods used by this:
      85              :         // * "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
      86              :         // * "AWS_PROFILE" / `aws sso login --profile <profile>`
      87              :         // * "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
      88              :         // * http (ECS/EKS) container credentials
      89              :         // * imds v2
      90           38 :         let credentials_provider = DefaultCredentialsChain::builder()
      91           38 :             .region(region)
      92           38 :             .build()
      93            0 :             .await;
      94              : 
      95              :         // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
      96           38 :         let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
      97           38 : 
      98           38 :         let sdk_config_loader: aws_config::ConfigLoader = aws_config::defaults(
      99           38 :             #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
     100           38 :             BehaviorVersion::v2023_11_09(),
     101           38 :         )
     102           38 :         .region(region_opt)
     103           38 :         .identity_cache(IdentityCache::lazy().build())
     104           38 :         .credentials_provider(credentials_provider)
     105           38 :         .sleep_impl(SharedAsyncSleep::from(sleep_impl));
     106           38 : 
     107           38 :         let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
     108           38 :             s.spawn(|| {
     109           38 :                 // TODO: make this function async.
     110           38 :                 tokio::runtime::Builder::new_current_thread()
     111           38 :                     .enable_all()
     112           38 :                     .build()
     113           38 :                     .unwrap()
     114           38 :                     .block_on(sdk_config_loader.load())
     115           38 :             })
     116           38 :             .join()
     117           38 :             .unwrap()
     118           38 :         });
     119           38 : 
     120           38 :         let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
     121              : 
     122              :         // Technically, the `remote_storage_config.endpoint` field only applies to S3 interactions.
     123              :         // (In case we ever re-use the `sdk_config` for more than just the S3 client in the future)
     124           38 :         if let Some(custom_endpoint) = remote_storage_config.endpoint.clone() {
     125            0 :             s3_config_builder = s3_config_builder
     126            0 :                 .endpoint_url(custom_endpoint)
     127            0 :                 .force_path_style(true);
     128           38 :         }
     129              : 
     130              :         // We do our own retries (see [`backoff::retry`]).  However, for the AWS SDK to enable rate limiting in response to throttling
     131              :         // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config.  We set it to use at most one
     132              :         // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
     133           38 :         let mut retry_config = RetryConfigBuilder::new();
     134           38 :         retry_config
     135           38 :             .set_max_attempts(Some(1))
     136           38 :             .set_mode(Some(RetryMode::Adaptive));
     137           38 :         s3_config_builder = s3_config_builder.retry_config(retry_config.build());
     138           38 : 
     139           38 :         let s3_config = s3_config_builder.build();
     140           38 :         let client = aws_sdk_s3::Client::from_conf(s3_config);
     141           38 : 
     142           38 :         let prefix_in_bucket = remote_storage_config
     143           38 :             .prefix_in_bucket
     144           38 :             .as_deref()
     145           38 :             .map(|prefix| {
     146           34 :                 let mut prefix = prefix;
     147           38 :                 while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     148            4 :                     prefix = &prefix[1..]
     149              :                 }
     150              : 
     151           34 :                 let mut prefix = prefix.to_string();
     152           60 :                 while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     153           26 :                     prefix.pop();
     154           26 :                 }
     155           34 :                 prefix
     156           38 :             });
     157           38 : 
     158           38 :         Ok(Self {
     159           38 :             client,
     160           38 :             bucket_name: remote_storage_config.bucket_name.clone(),
     161           38 :             max_keys_per_list_response: remote_storage_config.max_keys_per_list_response,
     162           38 :             prefix_in_bucket,
     163           38 :             concurrency_limiter: ConcurrencyLimiter::new(
     164           38 :                 remote_storage_config.concurrency_limit.get(),
     165           38 :             ),
     166           38 :             upload_storage_class: remote_storage_config.upload_storage_class.clone(),
     167           38 :             timeout,
     168           38 :         })
     169           38 :     }
     170              : 
     171          126 :     fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
     172          126 :         let relative_path =
     173          126 :             match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
     174          126 :                 Some(stripped) => stripped,
     175              :                 // we rely on AWS to return properly prefixed paths
     176              :                 // for requests with a certain prefix
     177            0 :                 None => panic!(
     178            0 :                     "Key {} does not start with bucket prefix {:?}",
     179            0 :                     key, self.prefix_in_bucket
     180            0 :                 ),
     181              :             };
     182          126 :         RemotePath(
     183          126 :             relative_path
     184          126 :                 .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     185          126 :                 .collect(),
     186          126 :         )
     187          126 :     }
     188              : 
     189          309 :     pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
     190          309 :         assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
     191          309 :         let path_string = path.get_path().as_str();
     192          309 :         match &self.prefix_in_bucket {
     193          297 :             Some(prefix) => prefix.clone() + "/" + path_string,
     194           12 :             None => path_string.to_string(),
     195              :         }
     196          309 :     }
     197              : 
     198          241 :     async fn permit(
     199          241 :         &self,
     200          241 :         kind: RequestKind,
     201          241 :         cancel: &CancellationToken,
     202          241 :     ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
     203          241 :         let started_at = start_counting_cancelled_wait(kind);
     204          241 :         let acquire = self.concurrency_limiter.acquire(kind);
     205              : 
     206          241 :         let permit = tokio::select! {
     207              :             permit = acquire => permit.expect("semaphore is never closed"),
     208              :             _ = cancel.cancelled() => return Err(Cancelled),
     209              :         };
     210              : 
     211          241 :         let started_at = ScopeGuard::into_inner(started_at);
     212          241 :         crate::metrics::BUCKET_METRICS
     213          241 :             .wait_seconds
     214          241 :             .observe_elapsed(kind, started_at);
     215          241 : 
     216          241 :         Ok(permit)
     217          241 :     }
     218              : 
     219           24 :     async fn owned_permit(
     220           24 :         &self,
     221           24 :         kind: RequestKind,
     222           24 :         cancel: &CancellationToken,
     223           24 :     ) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
     224           24 :         let started_at = start_counting_cancelled_wait(kind);
     225           24 :         let acquire = self.concurrency_limiter.acquire_owned(kind);
     226              : 
     227           24 :         let permit = tokio::select! {
     228              :             permit = acquire => permit.expect("semaphore is never closed"),
     229              :             _ = cancel.cancelled() => return Err(Cancelled),
     230              :         };
     231              : 
     232           24 :         let started_at = ScopeGuard::into_inner(started_at);
     233           24 :         crate::metrics::BUCKET_METRICS
     234           24 :             .wait_seconds
     235           24 :             .observe_elapsed(kind, started_at);
     236           24 :         Ok(permit)
     237           24 :     }
     238              : 
     239           24 :     async fn download_object(
     240           24 :         &self,
     241           24 :         request: GetObjectRequest,
     242           24 :         cancel: &CancellationToken,
     243           24 :     ) -> Result<Download, DownloadError> {
     244           24 :         let kind = RequestKind::Get;
     245              : 
     246           24 :         let permit = self.owned_permit(kind, cancel).await?;
     247              : 
     248           24 :         let started_at = start_measuring_requests(kind);
     249           24 : 
     250           24 :         let get_object = self
     251           24 :             .client
     252           24 :             .get_object()
     253           24 :             .bucket(request.bucket)
     254           24 :             .key(request.key)
     255           24 :             .set_range(request.range)
     256           24 :             .send();
     257              : 
     258           24 :         let get_object = tokio::select! {
     259              :             res = get_object => res,
     260              :             _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
     261              :             _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
     262              :         };
     263              : 
     264           24 :         let started_at = ScopeGuard::into_inner(started_at);
     265              : 
     266           24 :         let object_output = match get_object {
     267           24 :             Ok(object_output) => object_output,
     268            0 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
     269              :                 // Count this in the AttemptOutcome::Ok bucket, because 404 is not
     270              :                 // an error: we expect to sometimes fetch an object and find it missing,
     271              :                 // e.g. when probing for timeline indices.
     272            0 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     273            0 :                     kind,
     274            0 :                     AttemptOutcome::Ok,
     275            0 :                     started_at,
     276            0 :                 );
     277            0 :                 return Err(DownloadError::NotFound);
     278              :             }
     279            0 :             Err(e) => {
     280            0 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     281            0 :                     kind,
     282            0 :                     AttemptOutcome::Err,
     283            0 :                     started_at,
     284            0 :                 );
     285            0 : 
     286            0 :                 return Err(DownloadError::Other(
     287            0 :                     anyhow::Error::new(e).context("download s3 object"),
     288            0 :                 ));
     289              :             }
     290              :         };
     291              : 
     292              :         // even if we would have no timeout left, continue anyways. the caller can decide to ignore
     293              :         // the errors considering timeouts and cancellation.
     294           24 :         let remaining = self.timeout.saturating_sub(started_at.elapsed());
     295           24 : 
     296           24 :         let metadata = object_output.metadata().cloned().map(StorageMetadata);
     297           24 :         let etag = object_output
     298           24 :             .e_tag
     299           24 :             .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?
     300           24 :             .into();
     301           24 :         let last_modified = object_output
     302           24 :             .last_modified
     303           24 :             .ok_or(DownloadError::Other(anyhow::anyhow!(
     304           24 :                 "Missing LastModified header"
     305           24 :             )))?
     306           24 :             .try_into()
     307           24 :             .map_err(|e: ConversionError| DownloadError::Other(e.into()))?;
     308              : 
     309           24 :         let body = object_output.body;
     310           24 :         let body = ByteStreamAsStream::from(body);
     311           24 :         let body = PermitCarrying::new(permit, body);
     312           24 :         let body = TimedDownload::new(started_at, body);
     313           24 : 
     314           24 :         let cancel_or_timeout = crate::support::cancel_or_timeout(remaining, cancel.clone());
     315           24 :         let body = crate::support::DownloadStream::new(cancel_or_timeout, body);
     316           24 : 
     317           24 :         Ok(Download {
     318           24 :             metadata,
     319           24 :             etag,
     320           24 :             last_modified,
     321           24 :             download_stream: Box::pin(body),
     322           24 :         })
     323           24 :     }
     324              : 
     325          106 :     async fn delete_oids(
     326          106 :         &self,
     327          106 :         _permit: &tokio::sync::SemaphorePermit<'_>,
     328          106 :         delete_objects: &[ObjectIdentifier],
     329          106 :         cancel: &CancellationToken,
     330          106 :     ) -> anyhow::Result<()> {
     331          106 :         let kind = RequestKind::Delete;
     332          106 :         let mut cancel = std::pin::pin!(cancel.cancelled());
     333              : 
     334          106 :         for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
     335          106 :             let started_at = start_measuring_requests(kind);
     336              : 
     337          106 :             let req = self
     338          106 :                 .client
     339          106 :                 .delete_objects()
     340          106 :                 .bucket(self.bucket_name.clone())
     341          106 :                 .delete(
     342          106 :                     Delete::builder()
     343          106 :                         .set_objects(Some(chunk.to_vec()))
     344          106 :                         .build()
     345          106 :                         .context("build request")?,
     346              :                 )
     347          106 :                 .send();
     348              : 
     349          106 :             let resp = tokio::select! {
     350              :                 resp = req => resp,
     351              :                 _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
     352              :                 _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
     353              :             };
     354              : 
     355          106 :             let started_at = ScopeGuard::into_inner(started_at);
     356          106 :             crate::metrics::BUCKET_METRICS
     357          106 :                 .req_seconds
     358          106 :                 .observe_elapsed(kind, &resp, started_at);
     359              : 
     360          106 :             let resp = resp.context("request deletion")?;
     361          106 :             crate::metrics::BUCKET_METRICS
     362          106 :                 .deleted_objects_total
     363          106 :                 .inc_by(chunk.len() as u64);
     364              : 
     365          106 :             if let Some(errors) = resp.errors {
     366              :                 // Log a bounded number of the errors within the response:
     367              :                 // these requests can carry 1000 keys so logging each one
     368              :                 // would be too verbose, especially as errors may lead us
     369              :                 // to retry repeatedly.
     370              :                 const LOG_UP_TO_N_ERRORS: usize = 10;
     371            0 :                 for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
     372            0 :                     tracing::warn!(
     373            0 :                         "DeleteObjects key {} failed: {}: {}",
     374            0 :                         e.key.as_ref().map(Cow::from).unwrap_or("".into()),
     375            0 :                         e.code.as_ref().map(Cow::from).unwrap_or("".into()),
     376            0 :                         e.message.as_ref().map(Cow::from).unwrap_or("".into())
     377              :                     );
     378              :                 }
     379              : 
     380            0 :                 return Err(anyhow::anyhow!(
     381            0 :                     "Failed to delete {}/{} objects",
     382            0 :                     errors.len(),
     383            0 :                     chunk.len(),
     384            0 :                 ));
     385          106 :             }
     386              :         }
     387          106 :         Ok(())
     388          106 :     }
     389              : }
     390              : 
     391              : pin_project_lite::pin_project! {
     392              :     struct ByteStreamAsStream {
     393              :         #[pin]
     394              :         inner: aws_smithy_types::byte_stream::ByteStream
     395              :     }
     396              : }
     397              : 
     398              : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
     399           24 :     fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
     400           24 :         ByteStreamAsStream { inner }
     401           24 :     }
     402              : }
     403              : 
     404              : impl Stream for ByteStreamAsStream {
     405              :     type Item = std::io::Result<Bytes>;
     406              : 
     407           50 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     408           50 :         // this does the std::io::ErrorKind::Other conversion
     409           50 :         self.project().inner.poll_next(cx).map_err(|x| x.into())
     410           50 :     }
     411              : 
     412              :     // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
     413              :     // sense and Stream::size_hint does not really
     414              : }
     415              : 
     416              : pin_project_lite::pin_project! {
     417              :     /// Times and tracks the outcome of the request.
     418              :     struct TimedDownload<S> {
     419              :         started_at: std::time::Instant,
     420              :         outcome: AttemptOutcome,
     421              :         #[pin]
     422              :         inner: S
     423              :     }
     424              : 
     425              :     impl<S> PinnedDrop for TimedDownload<S> {
     426              :         fn drop(mut this: Pin<&mut Self>) {
     427              :             crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
     428              :         }
     429              :     }
     430              : }
     431              : 
     432              : impl<S> TimedDownload<S> {
     433           24 :     fn new(started_at: std::time::Instant, inner: S) -> Self {
     434           24 :         TimedDownload {
     435           24 :             started_at,
     436           24 :             outcome: AttemptOutcome::Cancelled,
     437           24 :             inner,
     438           24 :         }
     439           24 :     }
     440              : }
     441              : 
     442              : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
     443              :     type Item = <S as Stream>::Item;
     444              : 
     445           50 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     446           50 :         use std::task::ready;
     447           50 : 
     448           50 :         let this = self.project();
     449              : 
     450           50 :         let res = ready!(this.inner.poll_next(cx));
     451           22 :         match &res {
     452           22 :             Some(Ok(_)) => {}
     453            0 :             Some(Err(_)) => *this.outcome = AttemptOutcome::Err,
     454           18 :             None => *this.outcome = AttemptOutcome::Ok,
     455              :         }
     456              : 
     457           40 :         Poll::Ready(res)
     458           50 :     }
     459              : 
     460            0 :     fn size_hint(&self) -> (usize, Option<usize>) {
     461            0 :         self.inner.size_hint()
     462            0 :     }
     463              : }
     464              : 
     465              : impl RemoteStorage for S3Bucket {
     466           24 :     async fn list(
     467           24 :         &self,
     468           24 :         prefix: Option<&RemotePath>,
     469           24 :         mode: ListingMode,
     470           24 :         max_keys: Option<NonZeroU32>,
     471           24 :         cancel: &CancellationToken,
     472           24 :     ) -> Result<Listing, DownloadError> {
     473           24 :         let kind = RequestKind::List;
     474           24 :         // s3 sdk wants i32
     475           24 :         let mut max_keys = max_keys.map(|mk| mk.get() as i32);
     476           24 :         let mut result = Listing::default();
     477           24 : 
     478           24 :         // get the passed prefix or if it is not set use prefix_in_bucket value
     479           24 :         let list_prefix = prefix
     480           24 :             .map(|p| self.relative_path_to_s3_object(p))
     481           24 :             .or_else(|| {
     482           20 :                 self.prefix_in_bucket.clone().map(|mut s| {
     483           20 :                     s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
     484           20 :                     s
     485           20 :                 })
     486           24 :             });
     487              : 
     488           24 :         let _permit = self.permit(kind, cancel).await?;
     489              : 
     490           24 :         let mut continuation_token = None;
     491              : 
     492              :         loop {
     493           32 :             let started_at = start_measuring_requests(kind);
     494           32 : 
     495           32 :             // min of two Options, returning Some if one is value and another is
     496           32 :             // None (None is smaller than anything, so plain min doesn't work).
     497           32 :             let request_max_keys = self
     498           32 :                 .max_keys_per_list_response
     499           32 :                 .into_iter()
     500           32 :                 .chain(max_keys.into_iter())
     501           32 :                 .min();
     502           32 :             let mut request = self
     503           32 :                 .client
     504           32 :                 .list_objects_v2()
     505           32 :                 .bucket(self.bucket_name.clone())
     506           32 :                 .set_prefix(list_prefix.clone())
     507           32 :                 .set_continuation_token(continuation_token)
     508           32 :                 .set_max_keys(request_max_keys);
     509           32 : 
     510           32 :             if let ListingMode::WithDelimiter = mode {
     511           10 :                 request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
     512           22 :             }
     513              : 
     514           32 :             let request = request.send();
     515              : 
     516           32 :             let response = tokio::select! {
     517              :                 res = request => res,
     518              :                 _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
     519              :                 _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
     520              :             };
     521              : 
     522           32 :             let response = response
     523           32 :                 .context("Failed to list S3 prefixes")
     524           32 :                 .map_err(DownloadError::Other);
     525           32 : 
     526           32 :             let started_at = ScopeGuard::into_inner(started_at);
     527           32 : 
     528           32 :             crate::metrics::BUCKET_METRICS
     529           32 :                 .req_seconds
     530           32 :                 .observe_elapsed(kind, &response, started_at);
     531              : 
     532           32 :             let response = response?;
     533              : 
     534           32 :             let keys = response.contents();
     535           32 :             let empty = Vec::new();
     536           32 :             let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty);
     537           32 : 
     538           32 :             tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
     539              : 
     540          110 :             for object in keys {
     541           80 :                 let object_path = object.key().expect("response does not contain a key");
     542           80 :                 let remote_path = self.s3_object_to_relative_path(object_path);
     543           80 :                 result.keys.push(remote_path);
     544           80 :                 if let Some(mut mk) = max_keys {
     545            4 :                     assert!(mk > 0);
     546            4 :                     mk -= 1;
     547            4 :                     if mk == 0 {
     548            2 :                         return Ok(result); // limit reached
     549            2 :                     }
     550            2 :                     max_keys = Some(mk);
     551           76 :                 }
     552              :             }
     553              : 
     554              :             // S3 gives us prefixes like "foo/", we return them like "foo"
     555           46 :             result.prefixes.extend(prefixes.iter().filter_map(|o| {
     556           46 :                 Some(
     557           46 :                     self.s3_object_to_relative_path(
     558           46 :                         o.prefix()?
     559           46 :                             .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR),
     560              :                     ),
     561              :                 )
     562           46 :             }));
     563              : 
     564           30 :             continuation_token = match response.next_continuation_token {
     565            8 :                 Some(new_token) => Some(new_token),
     566           22 :                 None => break,
     567           22 :             };
     568           22 :         }
     569           22 : 
     570           22 :         Ok(result)
     571           24 :     }
     572              : 
     573          107 :     async fn upload(
     574          107 :         &self,
     575          107 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     576          107 :         from_size_bytes: usize,
     577          107 :         to: &RemotePath,
     578          107 :         metadata: Option<StorageMetadata>,
     579          107 :         cancel: &CancellationToken,
     580          107 :     ) -> anyhow::Result<()> {
     581          107 :         let kind = RequestKind::Put;
     582          107 :         let _permit = self.permit(kind, cancel).await?;
     583              : 
     584          107 :         let started_at = start_measuring_requests(kind);
     585          107 : 
     586          107 :         let body = Body::wrap_stream(from);
     587          107 :         let bytes_stream = ByteStream::new(SdkBody::from_body_0_4(body));
     588              : 
     589          107 :         let upload = self
     590          107 :             .client
     591          107 :             .put_object()
     592          107 :             .bucket(self.bucket_name.clone())
     593          107 :             .key(self.relative_path_to_s3_object(to))
     594          107 :             .set_metadata(metadata.map(|m| m.0))
     595          107 :             .set_storage_class(self.upload_storage_class.clone())
     596          107 :             .content_length(from_size_bytes.try_into()?)
     597          107 :             .body(bytes_stream)
     598          107 :             .send();
     599          107 : 
     600          107 :         let upload = tokio::time::timeout(self.timeout, upload);
     601              : 
     602          107 :         let res = tokio::select! {
     603              :             res = upload => res,
     604              :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     605              :         };
     606              : 
     607          107 :         if let Ok(inner) = &res {
     608          107 :             // do not incl. timeouts as errors in metrics but cancellations
     609          107 :             let started_at = ScopeGuard::into_inner(started_at);
     610          107 :             crate::metrics::BUCKET_METRICS
     611          107 :                 .req_seconds
     612          107 :                 .observe_elapsed(kind, inner, started_at);
     613          107 :         }
     614              : 
     615          107 :         match res {
     616          106 :             Ok(Ok(_put)) => Ok(()),
     617            1 :             Ok(Err(sdk)) => Err(sdk.into()),
     618            0 :             Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
     619              :         }
     620          107 :     }
     621              : 
     622            2 :     async fn copy(
     623            2 :         &self,
     624            2 :         from: &RemotePath,
     625            2 :         to: &RemotePath,
     626            2 :         cancel: &CancellationToken,
     627            2 :     ) -> anyhow::Result<()> {
     628            2 :         let kind = RequestKind::Copy;
     629            2 :         let _permit = self.permit(kind, cancel).await?;
     630              : 
     631            2 :         let timeout = tokio::time::sleep(self.timeout);
     632            2 : 
     633            2 :         let started_at = start_measuring_requests(kind);
     634            2 : 
     635            2 :         // we need to specify bucket_name as a prefix
     636            2 :         let copy_source = format!(
     637            2 :             "{}/{}",
     638            2 :             self.bucket_name,
     639            2 :             self.relative_path_to_s3_object(from)
     640            2 :         );
     641            2 : 
     642            2 :         let op = self
     643            2 :             .client
     644            2 :             .copy_object()
     645            2 :             .bucket(self.bucket_name.clone())
     646            2 :             .key(self.relative_path_to_s3_object(to))
     647            2 :             .set_storage_class(self.upload_storage_class.clone())
     648            2 :             .copy_source(copy_source)
     649            2 :             .send();
     650              : 
     651            2 :         let res = tokio::select! {
     652              :             res = op => res,
     653              :             _ = timeout => return Err(TimeoutOrCancel::Timeout.into()),
     654              :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     655              :         };
     656              : 
     657            2 :         let started_at = ScopeGuard::into_inner(started_at);
     658            2 :         crate::metrics::BUCKET_METRICS
     659            2 :             .req_seconds
     660            2 :             .observe_elapsed(kind, &res, started_at);
     661            2 : 
     662            2 :         res?;
     663              : 
     664            2 :         Ok(())
     665            2 :     }
     666              : 
     667           14 :     async fn download(
     668           14 :         &self,
     669           14 :         from: &RemotePath,
     670           14 :         cancel: &CancellationToken,
     671           14 :     ) -> Result<Download, DownloadError> {
     672           14 :         // if prefix is not none then download file `prefix/from`
     673           14 :         // if prefix is none then download file `from`
     674           14 :         self.download_object(
     675           14 :             GetObjectRequest {
     676           14 :                 bucket: self.bucket_name.clone(),
     677           14 :                 key: self.relative_path_to_s3_object(from),
     678           14 :                 range: None,
     679           14 :             },
     680           14 :             cancel,
     681           14 :         )
     682           14 :         .await
     683           14 :     }
     684              : 
     685           10 :     async fn download_byte_range(
     686           10 :         &self,
     687           10 :         from: &RemotePath,
     688           10 :         start_inclusive: u64,
     689           10 :         end_exclusive: Option<u64>,
     690           10 :         cancel: &CancellationToken,
     691           10 :     ) -> Result<Download, DownloadError> {
     692           10 :         // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
     693           10 :         // and needs both ends to be exclusive
     694           10 :         let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
     695           10 :         let range = Some(match end_inclusive {
     696            6 :             Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
     697            4 :             None => format!("bytes={start_inclusive}-"),
     698              :         });
     699              : 
     700           10 :         self.download_object(
     701           10 :             GetObjectRequest {
     702           10 :                 bucket: self.bucket_name.clone(),
     703           10 :                 key: self.relative_path_to_s3_object(from),
     704           10 :                 range,
     705           10 :             },
     706           10 :             cancel,
     707           10 :         )
     708           10 :         .await
     709           10 :     }
     710              : 
     711          102 :     async fn delete_objects<'a>(
     712          102 :         &self,
     713          102 :         paths: &'a [RemotePath],
     714          102 :         cancel: &CancellationToken,
     715          102 :     ) -> anyhow::Result<()> {
     716          102 :         let kind = RequestKind::Delete;
     717          102 :         let permit = self.permit(kind, cancel).await?;
     718          102 :         let mut delete_objects = Vec::with_capacity(paths.len());
     719          212 :         for path in paths {
     720          110 :             let obj_id = ObjectIdentifier::builder()
     721          110 :                 .set_key(Some(self.relative_path_to_s3_object(path)))
     722          110 :                 .build()
     723          110 :                 .context("convert path to oid")?;
     724          110 :             delete_objects.push(obj_id);
     725              :         }
     726              : 
     727          350 :         self.delete_oids(&permit, &delete_objects, cancel).await
     728          102 :     }
     729              : 
     730           90 :     async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
     731           90 :         let paths = std::array::from_ref(path);
     732          294 :         self.delete_objects(paths, cancel).await
     733           90 :     }
     734              : 
     735            6 :     async fn time_travel_recover(
     736            6 :         &self,
     737            6 :         prefix: Option<&RemotePath>,
     738            6 :         timestamp: SystemTime,
     739            6 :         done_if_after: SystemTime,
     740            6 :         cancel: &CancellationToken,
     741            6 :     ) -> Result<(), TimeTravelError> {
     742            6 :         let kind = RequestKind::TimeTravel;
     743            6 :         let permit = self.permit(kind, cancel).await?;
     744              : 
     745            6 :         let timestamp = DateTime::from(timestamp);
     746            6 :         let done_if_after = DateTime::from(done_if_after);
     747            6 : 
     748            6 :         tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
     749              : 
     750              :         // get the passed prefix or if it is not set use prefix_in_bucket value
     751            6 :         let prefix = prefix
     752            6 :             .map(|p| self.relative_path_to_s3_object(p))
     753            6 :             .or_else(|| self.prefix_in_bucket.clone());
     754            6 : 
     755            6 :         let warn_threshold = 3;
     756            6 :         let max_retries = 10;
     757            6 :         let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
     758              : 
     759            6 :         let mut key_marker = None;
     760            6 :         let mut version_id_marker = None;
     761            6 :         let mut versions_and_deletes = Vec::new();
     762              : 
     763              :         loop {
     764            6 :             let response = backoff::retry(
     765            7 :                 || async {
     766            7 :                     let op = self
     767            7 :                         .client
     768            7 :                         .list_object_versions()
     769            7 :                         .bucket(self.bucket_name.clone())
     770            7 :                         .set_prefix(prefix.clone())
     771            7 :                         .set_key_marker(key_marker.clone())
     772            7 :                         .set_version_id_marker(version_id_marker.clone())
     773            7 :                         .send();
     774            7 : 
     775            7 :                     tokio::select! {
     776            7 :                         res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
     777            7 :                         _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
     778            7 :                     }
     779            7 :                 },
     780            6 :                 is_permanent,
     781            6 :                 warn_threshold,
     782            6 :                 max_retries,
     783            6 :                 "listing object versions for time_travel_recover",
     784            6 :                 cancel,
     785            6 :             )
     786           49 :             .await
     787            6 :             .ok_or_else(|| TimeTravelError::Cancelled)
     788            6 :             .and_then(|x| x)?;
     789              : 
     790            6 :             tracing::trace!(
     791            0 :                 "  Got List response version_id_marker={:?}, key_marker={:?}",
     792              :                 response.version_id_marker,
     793              :                 response.key_marker
     794              :             );
     795            6 :             let versions = response
     796            6 :                 .versions
     797            6 :                 .unwrap_or_default()
     798            6 :                 .into_iter()
     799            6 :                 .map(VerOrDelete::from_version);
     800            6 :             let deletes = response
     801            6 :                 .delete_markers
     802            6 :                 .unwrap_or_default()
     803            6 :                 .into_iter()
     804            6 :                 .map(VerOrDelete::from_delete_marker);
     805            6 :             itertools::process_results(versions.chain(deletes), |n_vds| {
     806            6 :                 versions_and_deletes.extend(n_vds)
     807            6 :             })
     808            6 :             .map_err(TimeTravelError::Other)?;
     809           12 :             fn none_if_empty(v: Option<String>) -> Option<String> {
     810           12 :                 v.filter(|v| !v.is_empty())
     811           12 :             }
     812            6 :             version_id_marker = none_if_empty(response.next_version_id_marker);
     813            6 :             key_marker = none_if_empty(response.next_key_marker);
     814            6 :             if version_id_marker.is_none() {
     815              :                 // The final response is not supposed to be truncated
     816            6 :                 if response.is_truncated.unwrap_or_default() {
     817            0 :                     return Err(TimeTravelError::Other(anyhow::anyhow!(
     818            0 :                         "Received truncated ListObjectVersions response for prefix={prefix:?}"
     819            0 :                     )));
     820            6 :                 }
     821            6 :                 break;
     822            0 :             }
     823            0 :             // Limit the number of versions deletions, mostly so that we don't
     824            0 :             // keep requesting forever if the list is too long, as we'd put the
     825            0 :             // list in RAM.
     826            0 :             // Building a list of 100k entries that reaches the limit roughly takes
     827            0 :             // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
     828            0 :             const COMPLEXITY_LIMIT: usize = 100_000;
     829            0 :             if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
     830            0 :                 return Err(TimeTravelError::TooManyVersions);
     831            0 :             }
     832              :         }
     833              : 
     834            6 :         tracing::info!(
     835            0 :             "Built list for time travel with {} versions and deletions",
     836            0 :             versions_and_deletes.len()
     837              :         );
     838              : 
     839              :         // Work on the list of references instead of the objects directly,
     840              :         // otherwise we get lifetime errors in the sort_by_key call below.
     841            6 :         let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
     842            6 : 
     843          124 :         versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
     844            6 : 
     845            6 :         let mut vds_for_key = HashMap::<_, Vec<_>>::new();
     846              : 
     847           42 :         for vd in &versions_and_deletes {
     848              :             let VerOrDelete {
     849           36 :                 version_id, key, ..
     850           36 :             } = &vd;
     851           36 :             if version_id == "null" {
     852            0 :                 return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \
     853            0 :                     indicating either disabled versioning, or legacy objects with null version id values")));
     854           36 :             }
     855           36 :             tracing::trace!(
     856            0 :                 "Parsing version key={key} version_id={version_id} kind={:?}",
     857              :                 vd.kind
     858              :             );
     859              : 
     860           36 :             vds_for_key.entry(key).or_default().push(vd);
     861              :         }
     862           24 :         for (key, versions) in vds_for_key {
     863           18 :             let last_vd = versions.last().unwrap();
     864           18 :             if last_vd.last_modified > done_if_after {
     865            0 :                 tracing::trace!("Key {key} has version later than done_if_after, skipping");
     866            0 :                 continue;
     867           18 :             }
     868              :             // the version we want to restore to.
     869           18 :             let version_to_restore_to =
     870           28 :                 match versions.binary_search_by_key(&timestamp, |tpl| tpl.last_modified) {
     871            0 :                     Ok(v) => v,
     872           18 :                     Err(e) => e,
     873              :                 };
     874           18 :             if version_to_restore_to == versions.len() {
     875            6 :                 tracing::trace!("Key {key} has no changes since timestamp, skipping");
     876            6 :                 continue;
     877           12 :             }
     878           12 :             let mut do_delete = false;
     879           12 :             if version_to_restore_to == 0 {
     880              :                 // All versions more recent, so the key didn't exist at the specified time point.
     881            6 :                 tracing::trace!(
     882            0 :                     "All {} versions more recent for {key}, deleting",
     883            0 :                     versions.len()
     884              :                 );
     885            6 :                 do_delete = true;
     886              :             } else {
     887            6 :                 match &versions[version_to_restore_to - 1] {
     888              :                     VerOrDelete {
     889              :                         kind: VerOrDeleteKind::Version,
     890            6 :                         version_id,
     891            6 :                         ..
     892            6 :                     } => {
     893            6 :                         tracing::trace!("Copying old version {version_id} for {key}...");
     894              :                         // Restore the state to the last version by copying
     895            6 :                         let source_id =
     896            6 :                             format!("{}/{key}?versionId={version_id}", self.bucket_name);
     897            6 : 
     898            6 :                         backoff::retry(
     899            6 :                             || async {
     900            6 :                                 let op = self
     901            6 :                                     .client
     902            6 :                                     .copy_object()
     903            6 :                                     .bucket(self.bucket_name.clone())
     904            6 :                                     .key(key)
     905            6 :                                     .set_storage_class(self.upload_storage_class.clone())
     906            6 :                                     .copy_source(&source_id)
     907            6 :                                     .send();
     908            6 : 
     909            6 :                                 tokio::select! {
     910            6 :                                     res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
     911            6 :                                     _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
     912            6 :                                 }
     913            6 :                             },
     914            6 :                             is_permanent,
     915            6 :                             warn_threshold,
     916            6 :                             max_retries,
     917            6 :                             "copying object version for time_travel_recover",
     918            6 :                             cancel,
     919            6 :                         )
     920           12 :                         .await
     921            6 :                         .ok_or_else(|| TimeTravelError::Cancelled)
     922            6 :                         .and_then(|x| x)?;
     923            6 :                         tracing::info!(%version_id, %key, "Copied old version in S3");
     924              :                     }
     925              :                     VerOrDelete {
     926              :                         kind: VerOrDeleteKind::DeleteMarker,
     927              :                         ..
     928            0 :                     } => {
     929            0 :                         do_delete = true;
     930            0 :                     }
     931              :                 }
     932              :             };
     933           12 :             if do_delete {
     934            6 :                 if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
     935              :                     // Key has since been deleted (but there was some history), no need to do anything
     936            2 :                     tracing::trace!("Key {key} already deleted, skipping.");
     937              :                 } else {
     938            4 :                     tracing::trace!("Deleting {key}...");
     939              : 
     940            4 :                     let oid = ObjectIdentifier::builder()
     941            4 :                         .key(key.to_owned())
     942            4 :                         .build()
     943            4 :                         .map_err(|e| TimeTravelError::Other(e.into()))?;
     944              : 
     945            4 :                     self.delete_oids(&permit, &[oid], cancel)
     946            8 :                         .await
     947            4 :                         .map_err(|e| {
     948            0 :                             // delete_oid0 will use TimeoutOrCancel
     949            0 :                             if TimeoutOrCancel::caused_by_cancel(&e) {
     950            0 :                                 TimeTravelError::Cancelled
     951              :                             } else {
     952            0 :                                 TimeTravelError::Other(e)
     953              :                             }
     954            4 :                         })?;
     955              :                 }
     956            6 :             }
     957              :         }
     958            6 :         Ok(())
     959            6 :     }
     960              : }
     961              : 
     962              : // Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
     963              : struct VerOrDelete {
     964              :     kind: VerOrDeleteKind,
     965              :     last_modified: DateTime,
     966              :     version_id: String,
     967              :     key: String,
     968              : }
     969              : 
     970              : #[derive(Debug)]
     971              : enum VerOrDeleteKind {
     972              :     Version,
     973              :     DeleteMarker,
     974              : }
     975              : 
     976              : impl VerOrDelete {
     977           36 :     fn with_kind(
     978           36 :         kind: VerOrDeleteKind,
     979           36 :         last_modified: Option<DateTime>,
     980           36 :         version_id: Option<String>,
     981           36 :         key: Option<String>,
     982           36 :     ) -> anyhow::Result<Self> {
     983           36 :         let lvk = (last_modified, version_id, key);
     984           36 :         let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
     985            0 :             anyhow::bail!(
     986            0 :                 "One (or more) of last_modified, key, and id is None. \
     987            0 :             Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
     988            0 :                 lvk.0,
     989            0 :                 lvk.1,
     990            0 :                 lvk.2,
     991            0 :             );
     992              :         };
     993           36 :         Ok(Self {
     994           36 :             kind,
     995           36 :             last_modified,
     996           36 :             version_id,
     997           36 :             key,
     998           36 :         })
     999           36 :     }
    1000           28 :     fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
    1001           28 :         Self::with_kind(
    1002           28 :             VerOrDeleteKind::Version,
    1003           28 :             v.last_modified,
    1004           28 :             v.version_id,
    1005           28 :             v.key,
    1006           28 :         )
    1007           28 :     }
    1008            8 :     fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
    1009            8 :         Self::with_kind(
    1010            8 :             VerOrDeleteKind::DeleteMarker,
    1011            8 :             v.last_modified,
    1012            8 :             v.version_id,
    1013            8 :             v.key,
    1014            8 :         )
    1015            8 :     }
    1016              : }
    1017              : 
    1018              : #[cfg(test)]
    1019              : mod tests {
    1020              :     use camino::Utf8Path;
    1021              :     use std::num::NonZeroUsize;
    1022              : 
    1023              :     use crate::{RemotePath, S3Bucket, S3Config};
    1024              : 
    1025              :     #[tokio::test]
    1026            4 :     async fn relative_path() {
    1027            4 :         let all_paths = ["", "some/path", "some/path/"];
    1028            4 :         let all_paths: Vec<RemotePath> = all_paths
    1029            4 :             .iter()
    1030           12 :             .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
    1031            4 :             .collect();
    1032            4 :         let prefixes = [
    1033            4 :             None,
    1034            4 :             Some(""),
    1035            4 :             Some("test/prefix"),
    1036            4 :             Some("test/prefix/"),
    1037            4 :             Some("/test/prefix/"),
    1038            4 :         ];
    1039            4 :         let expected_outputs = [
    1040            4 :             vec!["", "some/path", "some/path/"],
    1041            4 :             vec!["/", "/some/path", "/some/path/"],
    1042            4 :             vec![
    1043            4 :                 "test/prefix/",
    1044            4 :                 "test/prefix/some/path",
    1045            4 :                 "test/prefix/some/path/",
    1046            4 :             ],
    1047            4 :             vec![
    1048            4 :                 "test/prefix/",
    1049            4 :                 "test/prefix/some/path",
    1050            4 :                 "test/prefix/some/path/",
    1051            4 :             ],
    1052            4 :             vec![
    1053            4 :                 "test/prefix/",
    1054            4 :                 "test/prefix/some/path",
    1055            4 :                 "test/prefix/some/path/",
    1056            4 :             ],
    1057            4 :         ];
    1058            4 : 
    1059           20 :         for (prefix_idx, prefix) in prefixes.iter().enumerate() {
    1060           20 :             let config = S3Config {
    1061           20 :                 bucket_name: "bucket".to_owned(),
    1062           20 :                 bucket_region: "region".to_owned(),
    1063           20 :                 prefix_in_bucket: prefix.map(str::to_string),
    1064           20 :                 endpoint: None,
    1065           20 :                 concurrency_limit: NonZeroUsize::new(100).unwrap(),
    1066           20 :                 max_keys_per_list_response: Some(5),
    1067           20 :                 upload_storage_class: None,
    1068           20 :             };
    1069           20 :             let storage = S3Bucket::new(&config, std::time::Duration::ZERO)
    1070            4 :                 .await
    1071           20 :                 .expect("remote storage init");
    1072           60 :             for (test_path_idx, test_path) in all_paths.iter().enumerate() {
    1073           60 :                 let result = storage.relative_path_to_s3_object(test_path);
    1074           60 :                 let expected = expected_outputs[prefix_idx][test_path_idx];
    1075           60 :                 assert_eq!(result, expected);
    1076            4 :             }
    1077            4 :         }
    1078            4 :     }
    1079              : }
        

Generated by: LCOV version 2.1-beta