LCOV - code coverage report
Current view: top level - libs/remote_storage/src - s3_bucket.rs (source / functions) Coverage Total Hit
Test: e402c46de0a007db6b48dddbde450ddbb92e6ceb.info Lines: 89.7 % 774 694
Test Date: 2024-06-25 10:31:23 Functions: 71.4 % 84 60

            Line data    Source code
       1              : //! AWS S3 storage wrapper around `rusoto` library.
       2              : //!
       3              : //! Respects `prefix_in_bucket` property from [`S3Config`],
       4              : //! allowing multiple api users to independently work with the same S3 bucket, if
       5              : //! their bucket prefixes are both specified and different.
       6              : 
       7              : use std::{
       8              :     borrow::Cow,
       9              :     collections::HashMap,
      10              :     num::NonZeroU32,
      11              :     pin::Pin,
      12              :     sync::Arc,
      13              :     task::{Context, Poll},
      14              :     time::{Duration, SystemTime},
      15              : };
      16              : 
      17              : use anyhow::{anyhow, Context as _};
      18              : use aws_config::{
      19              :     environment::credentials::EnvironmentVariableCredentialsProvider,
      20              :     imds::credentials::ImdsCredentialsProvider,
      21              :     meta::credentials::CredentialsProviderChain,
      22              :     profile::ProfileFileCredentialsProvider,
      23              :     provider_config::ProviderConfig,
      24              :     retry::{RetryConfigBuilder, RetryMode},
      25              :     web_identity_token::WebIdentityTokenCredentialsProvider,
      26              :     BehaviorVersion,
      27              : };
      28              : use aws_credential_types::provider::SharedCredentialsProvider;
      29              : use aws_sdk_s3::{
      30              :     config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
      31              :     error::SdkError,
      32              :     operation::get_object::GetObjectError,
      33              :     types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass},
      34              :     Client,
      35              : };
      36              : use aws_smithy_async::rt::sleep::TokioSleep;
      37              : 
      38              : use aws_smithy_types::{body::SdkBody, DateTime};
      39              : use aws_smithy_types::{byte_stream::ByteStream, date_time::ConversionError};
      40              : use bytes::Bytes;
      41              : use futures::stream::Stream;
      42              : use hyper::Body;
      43              : use scopeguard::ScopeGuard;
      44              : use tokio_util::sync::CancellationToken;
      45              : use utils::backoff;
      46              : 
      47              : use super::StorageMetadata;
      48              : use crate::{
      49              :     config::S3Config,
      50              :     error::Cancelled,
      51              :     metrics::{start_counting_cancelled_wait, start_measuring_requests},
      52              :     support::PermitCarrying,
      53              :     ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
      54              :     TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
      55              : };
      56              : 
      57              : use crate::metrics::AttemptOutcome;
      58              : pub(super) use crate::metrics::RequestKind;
      59              : 
      60              : /// AWS S3 storage.
      61              : pub struct S3Bucket {
      62              :     client: Client,
      63              :     bucket_name: String,
      64              :     prefix_in_bucket: Option<String>,
      65              :     max_keys_per_list_response: Option<i32>,
      66              :     upload_storage_class: Option<StorageClass>,
      67              :     concurrency_limiter: ConcurrencyLimiter,
      68              :     // Per-request timeout. Accessible for tests.
      69              :     pub timeout: Duration,
      70              : }
      71              : 
      72              : struct GetObjectRequest {
      73              :     bucket: String,
      74              :     key: String,
      75              :     range: Option<String>,
      76              : }
      77              : impl S3Bucket {
      78              :     /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
      79           38 :     pub fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
      80           38 :         tracing::debug!(
      81            0 :             "Creating s3 remote storage for S3 bucket {}",
      82              :             remote_storage_config.bucket_name
      83              :         );
      84              : 
      85           38 :         let region = Some(Region::new(remote_storage_config.bucket_region.clone()));
      86           38 : 
      87           38 :         let provider_conf = ProviderConfig::without_region().with_region(region.clone());
      88           38 : 
      89           38 :         let credentials_provider = {
      90           38 :             // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
      91           38 :             CredentialsProviderChain::first_try(
      92           38 :                 "env",
      93           38 :                 EnvironmentVariableCredentialsProvider::new(),
      94           38 :             )
      95           38 :             // uses "AWS_PROFILE" / `aws sso login --profile <profile>`
      96           38 :             .or_else(
      97           38 :                 "profile-sso",
      98           38 :                 ProfileFileCredentialsProvider::builder()
      99           38 :                     .configure(&provider_conf)
     100           38 :                     .build(),
     101           38 :             )
     102           38 :             // uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
     103           38 :             // needed to access remote extensions bucket
     104           38 :             .or_else(
     105           38 :                 "token",
     106           38 :                 WebIdentityTokenCredentialsProvider::builder()
     107           38 :                     .configure(&provider_conf)
     108           38 :                     .build(),
     109           38 :             )
     110           38 :             // uses imds v2
     111           38 :             .or_else("imds", ImdsCredentialsProvider::builder().build())
     112           38 :         };
     113           38 : 
     114           38 :         // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
     115           38 :         let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
     116           38 : 
     117           38 :         let sdk_config_loader: aws_config::ConfigLoader = aws_config::defaults(
     118           38 :             #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
     119           38 :             BehaviorVersion::v2023_11_09(),
     120           38 :         )
     121           38 :         .region(region)
     122           38 :         .identity_cache(IdentityCache::lazy().build())
     123           38 :         .credentials_provider(SharedCredentialsProvider::new(credentials_provider))
     124           38 :         .sleep_impl(SharedAsyncSleep::from(sleep_impl));
     125           38 : 
     126           38 :         let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
     127           38 :             s.spawn(|| {
     128           38 :                 // TODO: make this function async.
     129           38 :                 tokio::runtime::Builder::new_current_thread()
     130           38 :                     .enable_all()
     131           38 :                     .build()
     132           38 :                     .unwrap()
     133           38 :                     .block_on(sdk_config_loader.load())
     134           38 :             })
     135           38 :             .join()
     136           38 :             .unwrap()
     137           38 :         });
     138           38 : 
     139           38 :         let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
     140              : 
     141              :         // Technically, the `remote_storage_config.endpoint` field only applies to S3 interactions.
     142              :         // (In case we ever re-use the `sdk_config` for more than just the S3 client in the future)
     143           38 :         if let Some(custom_endpoint) = remote_storage_config.endpoint.clone() {
     144            0 :             s3_config_builder = s3_config_builder
     145            0 :                 .endpoint_url(custom_endpoint)
     146            0 :                 .force_path_style(true);
     147           38 :         }
     148              : 
     149              :         // We do our own retries (see [`backoff::retry`]).  However, for the AWS SDK to enable rate limiting in response to throttling
     150              :         // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config.  We set it to use at most one
     151              :         // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
     152           38 :         let mut retry_config = RetryConfigBuilder::new();
     153           38 :         retry_config
     154           38 :             .set_max_attempts(Some(1))
     155           38 :             .set_mode(Some(RetryMode::Adaptive));
     156           38 :         s3_config_builder = s3_config_builder.retry_config(retry_config.build());
     157           38 : 
     158           38 :         let s3_config = s3_config_builder.build();
     159           38 :         let client = aws_sdk_s3::Client::from_conf(s3_config);
     160           38 : 
     161           38 :         let prefix_in_bucket = remote_storage_config
     162           38 :             .prefix_in_bucket
     163           38 :             .as_deref()
     164           38 :             .map(|prefix| {
     165           34 :                 let mut prefix = prefix;
     166           38 :                 while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     167            4 :                     prefix = &prefix[1..]
     168              :                 }
     169              : 
     170           34 :                 let mut prefix = prefix.to_string();
     171           60 :                 while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     172           26 :                     prefix.pop();
     173           26 :                 }
     174           34 :                 prefix
     175           38 :             });
     176           38 : 
     177           38 :         Ok(Self {
     178           38 :             client,
     179           38 :             bucket_name: remote_storage_config.bucket_name.clone(),
     180           38 :             max_keys_per_list_response: remote_storage_config.max_keys_per_list_response,
     181           38 :             prefix_in_bucket,
     182           38 :             concurrency_limiter: ConcurrencyLimiter::new(
     183           38 :                 remote_storage_config.concurrency_limit.get(),
     184           38 :             ),
     185           38 :             upload_storage_class: remote_storage_config.upload_storage_class.clone(),
     186           38 :             timeout,
     187           38 :         })
     188           38 :     }
     189              : 
     190          126 :     fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
     191          126 :         let relative_path =
     192          126 :             match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
     193          126 :                 Some(stripped) => stripped,
     194              :                 // we rely on AWS to return properly prefixed paths
     195              :                 // for requests with a certain prefix
     196            0 :                 None => panic!(
     197            0 :                     "Key {} does not start with bucket prefix {:?}",
     198            0 :                     key, self.prefix_in_bucket
     199            0 :                 ),
     200              :             };
     201          126 :         RemotePath(
     202          126 :             relative_path
     203          126 :                 .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     204          126 :                 .collect(),
     205          126 :         )
     206          126 :     }
     207              : 
     208          308 :     pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
     209          308 :         assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
     210          308 :         let path_string = path.get_path().as_str();
     211          308 :         match &self.prefix_in_bucket {
     212          296 :             Some(prefix) => prefix.clone() + "/" + path_string,
     213           12 :             None => path_string.to_string(),
     214              :         }
     215          308 :     }
     216              : 
     217          240 :     async fn permit(
     218          240 :         &self,
     219          240 :         kind: RequestKind,
     220          240 :         cancel: &CancellationToken,
     221          240 :     ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
     222          240 :         let started_at = start_counting_cancelled_wait(kind);
     223          240 :         let acquire = self.concurrency_limiter.acquire(kind);
     224              : 
     225          240 :         let permit = tokio::select! {
     226              :             permit = acquire => permit.expect("semaphore is never closed"),
     227              :             _ = cancel.cancelled() => return Err(Cancelled),
     228              :         };
     229              : 
     230          240 :         let started_at = ScopeGuard::into_inner(started_at);
     231          240 :         crate::metrics::BUCKET_METRICS
     232          240 :             .wait_seconds
     233          240 :             .observe_elapsed(kind, started_at);
     234          240 : 
     235          240 :         Ok(permit)
     236          240 :     }
     237              : 
     238           24 :     async fn owned_permit(
     239           24 :         &self,
     240           24 :         kind: RequestKind,
     241           24 :         cancel: &CancellationToken,
     242           24 :     ) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
     243           24 :         let started_at = start_counting_cancelled_wait(kind);
     244           24 :         let acquire = self.concurrency_limiter.acquire_owned(kind);
     245              : 
     246           24 :         let permit = tokio::select! {
     247              :             permit = acquire => permit.expect("semaphore is never closed"),
     248              :             _ = cancel.cancelled() => return Err(Cancelled),
     249              :         };
     250              : 
     251           24 :         let started_at = ScopeGuard::into_inner(started_at);
     252           24 :         crate::metrics::BUCKET_METRICS
     253           24 :             .wait_seconds
     254           24 :             .observe_elapsed(kind, started_at);
     255           24 :         Ok(permit)
     256           24 :     }
     257              : 
     258           24 :     async fn download_object(
     259           24 :         &self,
     260           24 :         request: GetObjectRequest,
     261           24 :         cancel: &CancellationToken,
     262           24 :     ) -> Result<Download, DownloadError> {
     263           24 :         let kind = RequestKind::Get;
     264              : 
     265           24 :         let permit = self.owned_permit(kind, cancel).await?;
     266              : 
     267           24 :         let started_at = start_measuring_requests(kind);
     268           24 : 
     269           24 :         let get_object = self
     270           24 :             .client
     271           24 :             .get_object()
     272           24 :             .bucket(request.bucket)
     273           24 :             .key(request.key)
     274           24 :             .set_range(request.range)
     275           24 :             .send();
     276              : 
     277           24 :         let get_object = tokio::select! {
     278              :             res = get_object => res,
     279              :             _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
     280              :             _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
     281              :         };
     282              : 
     283           24 :         let started_at = ScopeGuard::into_inner(started_at);
     284              : 
     285           24 :         let object_output = match get_object {
     286           24 :             Ok(object_output) => object_output,
     287            0 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
     288              :                 // Count this in the AttemptOutcome::Ok bucket, because 404 is not
     289              :                 // an error: we expect to sometimes fetch an object and find it missing,
     290              :                 // e.g. when probing for timeline indices.
     291            0 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     292            0 :                     kind,
     293            0 :                     AttemptOutcome::Ok,
     294            0 :                     started_at,
     295            0 :                 );
     296            0 :                 return Err(DownloadError::NotFound);
     297              :             }
     298            0 :             Err(e) => {
     299            0 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     300            0 :                     kind,
     301            0 :                     AttemptOutcome::Err,
     302            0 :                     started_at,
     303            0 :                 );
     304            0 : 
     305            0 :                 return Err(DownloadError::Other(
     306            0 :                     anyhow::Error::new(e).context("download s3 object"),
     307            0 :                 ));
     308              :             }
     309              :         };
     310              : 
     311              :         // even if we would have no timeout left, continue anyways. the caller can decide to ignore
     312              :         // the errors considering timeouts and cancellation.
     313           24 :         let remaining = self.timeout.saturating_sub(started_at.elapsed());
     314           24 : 
     315           24 :         let metadata = object_output.metadata().cloned().map(StorageMetadata);
     316           24 :         let etag = object_output
     317           24 :             .e_tag
     318           24 :             .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?
     319           24 :             .into();
     320           24 :         let last_modified = object_output
     321           24 :             .last_modified
     322           24 :             .ok_or(DownloadError::Other(anyhow::anyhow!(
     323           24 :                 "Missing LastModified header"
     324           24 :             )))?
     325           24 :             .try_into()
     326           24 :             .map_err(|e: ConversionError| DownloadError::Other(e.into()))?;
     327              : 
     328           24 :         let body = object_output.body;
     329           24 :         let body = ByteStreamAsStream::from(body);
     330           24 :         let body = PermitCarrying::new(permit, body);
     331           24 :         let body = TimedDownload::new(started_at, body);
     332           24 : 
     333           24 :         let cancel_or_timeout = crate::support::cancel_or_timeout(remaining, cancel.clone());
     334           24 :         let body = crate::support::DownloadStream::new(cancel_or_timeout, body);
     335           24 : 
     336           24 :         Ok(Download {
     337           24 :             metadata,
     338           24 :             etag,
     339           24 :             last_modified,
     340           24 :             download_stream: Box::pin(body),
     341           24 :         })
     342           24 :     }
     343              : 
     344          106 :     async fn delete_oids(
     345          106 :         &self,
     346          106 :         _permit: &tokio::sync::SemaphorePermit<'_>,
     347          106 :         delete_objects: &[ObjectIdentifier],
     348          106 :         cancel: &CancellationToken,
     349          106 :     ) -> anyhow::Result<()> {
     350          106 :         let kind = RequestKind::Delete;
     351          106 :         let mut cancel = std::pin::pin!(cancel.cancelled());
     352              : 
     353          106 :         for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
     354          106 :             let started_at = start_measuring_requests(kind);
     355              : 
     356          106 :             let req = self
     357          106 :                 .client
     358          106 :                 .delete_objects()
     359          106 :                 .bucket(self.bucket_name.clone())
     360          106 :                 .delete(
     361          106 :                     Delete::builder()
     362          106 :                         .set_objects(Some(chunk.to_vec()))
     363          106 :                         .build()
     364          106 :                         .context("build request")?,
     365              :                 )
     366          106 :                 .send();
     367              : 
     368          106 :             let resp = tokio::select! {
     369              :                 resp = req => resp,
     370              :                 _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
     371              :                 _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
     372              :             };
     373              : 
     374          106 :             let started_at = ScopeGuard::into_inner(started_at);
     375          106 :             crate::metrics::BUCKET_METRICS
     376          106 :                 .req_seconds
     377          106 :                 .observe_elapsed(kind, &resp, started_at);
     378              : 
     379          106 :             let resp = resp.context("request deletion")?;
     380          106 :             crate::metrics::BUCKET_METRICS
     381          106 :                 .deleted_objects_total
     382          106 :                 .inc_by(chunk.len() as u64);
     383              : 
     384          106 :             if let Some(errors) = resp.errors {
     385              :                 // Log a bounded number of the errors within the response:
     386              :                 // these requests can carry 1000 keys so logging each one
     387              :                 // would be too verbose, especially as errors may lead us
     388              :                 // to retry repeatedly.
     389              :                 const LOG_UP_TO_N_ERRORS: usize = 10;
     390            0 :                 for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
     391            0 :                     tracing::warn!(
     392            0 :                         "DeleteObjects key {} failed: {}: {}",
     393            0 :                         e.key.as_ref().map(Cow::from).unwrap_or("".into()),
     394            0 :                         e.code.as_ref().map(Cow::from).unwrap_or("".into()),
     395            0 :                         e.message.as_ref().map(Cow::from).unwrap_or("".into())
     396              :                     );
     397              :                 }
     398              : 
     399            0 :                 return Err(anyhow::anyhow!(
     400            0 :                     "Failed to delete {}/{} objects",
     401            0 :                     errors.len(),
     402            0 :                     chunk.len(),
     403            0 :                 ));
     404          106 :             }
     405              :         }
     406          106 :         Ok(())
     407          106 :     }
     408              : }
     409              : 
     410              : pin_project_lite::pin_project! {
     411              :     struct ByteStreamAsStream {
     412              :         #[pin]
     413              :         inner: aws_smithy_types::byte_stream::ByteStream
     414              :     }
     415              : }
     416              : 
     417              : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
     418           24 :     fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
     419           24 :         ByteStreamAsStream { inner }
     420           24 :     }
     421              : }
     422              : 
     423              : impl Stream for ByteStreamAsStream {
     424              :     type Item = std::io::Result<Bytes>;
     425              : 
     426           41 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     427           41 :         // this does the std::io::ErrorKind::Other conversion
     428           41 :         self.project().inner.poll_next(cx).map_err(|x| x.into())
     429           41 :     }
     430              : 
     431              :     // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
     432              :     // sense and Stream::size_hint does not really
     433              : }
     434              : 
     435              : pin_project_lite::pin_project! {
     436              :     /// Times and tracks the outcome of the request.
     437              :     struct TimedDownload<S> {
     438              :         started_at: std::time::Instant,
     439              :         outcome: AttemptOutcome,
     440              :         #[pin]
     441              :         inner: S
     442              :     }
     443              : 
     444              :     impl<S> PinnedDrop for TimedDownload<S> {
     445              :         fn drop(mut this: Pin<&mut Self>) {
     446              :             crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
     447              :         }
     448              :     }
     449              : }
     450              : 
     451              : impl<S> TimedDownload<S> {
     452           24 :     fn new(started_at: std::time::Instant, inner: S) -> Self {
     453           24 :         TimedDownload {
     454           24 :             started_at,
     455           24 :             outcome: AttemptOutcome::Cancelled,
     456           24 :             inner,
     457           24 :         }
     458           24 :     }
     459              : }
     460              : 
     461              : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
     462              :     type Item = <S as Stream>::Item;
     463              : 
     464           41 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     465           41 :         use std::task::ready;
     466           41 : 
     467           41 :         let this = self.project();
     468              : 
     469           41 :         let res = ready!(this.inner.poll_next(cx));
     470           22 :         match &res {
     471           22 :             Some(Ok(_)) => {}
     472            0 :             Some(Err(_)) => *this.outcome = AttemptOutcome::Err,
     473           18 :             None => *this.outcome = AttemptOutcome::Ok,
     474              :         }
     475              : 
     476           40 :         Poll::Ready(res)
     477           41 :     }
     478              : 
     479            0 :     fn size_hint(&self) -> (usize, Option<usize>) {
     480            0 :         self.inner.size_hint()
     481            0 :     }
     482              : }
     483              : 
     484              : impl RemoteStorage for S3Bucket {
     485           24 :     async fn list(
     486           24 :         &self,
     487           24 :         prefix: Option<&RemotePath>,
     488           24 :         mode: ListingMode,
     489           24 :         max_keys: Option<NonZeroU32>,
     490           24 :         cancel: &CancellationToken,
     491           24 :     ) -> Result<Listing, DownloadError> {
     492           24 :         let kind = RequestKind::List;
     493           24 :         // s3 sdk wants i32
     494           24 :         let mut max_keys = max_keys.map(|mk| mk.get() as i32);
     495           24 :         let mut result = Listing::default();
     496           24 : 
     497           24 :         // get the passed prefix or if it is not set use prefix_in_bucket value
     498           24 :         let list_prefix = prefix
     499           24 :             .map(|p| self.relative_path_to_s3_object(p))
     500           24 :             .or_else(|| {
     501           20 :                 self.prefix_in_bucket.clone().map(|mut s| {
     502           20 :                     s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
     503           20 :                     s
     504           20 :                 })
     505           24 :             });
     506              : 
     507           24 :         let _permit = self.permit(kind, cancel).await?;
     508              : 
     509           24 :         let mut continuation_token = None;
     510              : 
     511              :         loop {
     512           32 :             let started_at = start_measuring_requests(kind);
     513           32 : 
     514           32 :             // min of two Options, returning Some if one is value and another is
     515           32 :             // None (None is smaller than anything, so plain min doesn't work).
     516           32 :             let request_max_keys = self
     517           32 :                 .max_keys_per_list_response
     518           32 :                 .into_iter()
     519           32 :                 .chain(max_keys.into_iter())
     520           32 :                 .min();
     521           32 :             let mut request = self
     522           32 :                 .client
     523           32 :                 .list_objects_v2()
     524           32 :                 .bucket(self.bucket_name.clone())
     525           32 :                 .set_prefix(list_prefix.clone())
     526           32 :                 .set_continuation_token(continuation_token)
     527           32 :                 .set_max_keys(request_max_keys);
     528           32 : 
     529           32 :             if let ListingMode::WithDelimiter = mode {
     530           10 :                 request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
     531           22 :             }
     532              : 
     533           32 :             let request = request.send();
     534              : 
     535           32 :             let response = tokio::select! {
     536              :                 res = request => res,
     537              :                 _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
     538              :                 _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
     539              :             };
     540              : 
     541           32 :             let response = response
     542           32 :                 .context("Failed to list S3 prefixes")
     543           32 :                 .map_err(DownloadError::Other);
     544           32 : 
     545           32 :             let started_at = ScopeGuard::into_inner(started_at);
     546           32 : 
     547           32 :             crate::metrics::BUCKET_METRICS
     548           32 :                 .req_seconds
     549           32 :                 .observe_elapsed(kind, &response, started_at);
     550              : 
     551           32 :             let response = response?;
     552              : 
     553           32 :             let keys = response.contents();
     554           32 :             let empty = Vec::new();
     555           32 :             let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty);
     556           32 : 
     557           32 :             tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
     558              : 
     559          110 :             for object in keys {
     560           80 :                 let object_path = object.key().expect("response does not contain a key");
     561           80 :                 let remote_path = self.s3_object_to_relative_path(object_path);
     562           80 :                 result.keys.push(remote_path);
     563           80 :                 if let Some(mut mk) = max_keys {
     564            4 :                     assert!(mk > 0);
     565            4 :                     mk -= 1;
     566            4 :                     if mk == 0 {
     567            2 :                         return Ok(result); // limit reached
     568            2 :                     }
     569            2 :                     max_keys = Some(mk);
     570           76 :                 }
     571              :             }
     572              : 
     573              :             // S3 gives us prefixes like "foo/", we return them like "foo"
     574           46 :             result.prefixes.extend(prefixes.iter().filter_map(|o| {
     575           46 :                 Some(
     576           46 :                     self.s3_object_to_relative_path(
     577           46 :                         o.prefix()?
     578           46 :                             .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR),
     579              :                     ),
     580              :                 )
     581           46 :             }));
     582              : 
     583           30 :             continuation_token = match response.next_continuation_token {
     584            8 :                 Some(new_token) => Some(new_token),
     585           22 :                 None => break,
     586           22 :             };
     587           22 :         }
     588           22 : 
     589           22 :         Ok(result)
     590           24 :     }
     591              : 
     592          106 :     async fn upload(
     593          106 :         &self,
     594          106 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     595          106 :         from_size_bytes: usize,
     596          106 :         to: &RemotePath,
     597          106 :         metadata: Option<StorageMetadata>,
     598          106 :         cancel: &CancellationToken,
     599          106 :     ) -> anyhow::Result<()> {
     600          106 :         let kind = RequestKind::Put;
     601          106 :         let _permit = self.permit(kind, cancel).await?;
     602              : 
     603          106 :         let started_at = start_measuring_requests(kind);
     604          106 : 
     605          106 :         let body = Body::wrap_stream(from);
     606          106 :         let bytes_stream = ByteStream::new(SdkBody::from_body_0_4(body));
     607              : 
     608          106 :         let upload = self
     609          106 :             .client
     610          106 :             .put_object()
     611          106 :             .bucket(self.bucket_name.clone())
     612          106 :             .key(self.relative_path_to_s3_object(to))
     613          106 :             .set_metadata(metadata.map(|m| m.0))
     614          106 :             .set_storage_class(self.upload_storage_class.clone())
     615          106 :             .content_length(from_size_bytes.try_into()?)
     616          106 :             .body(bytes_stream)
     617          106 :             .send();
     618          106 : 
     619          106 :         let upload = tokio::time::timeout(self.timeout, upload);
     620              : 
     621          106 :         let res = tokio::select! {
     622              :             res = upload => res,
     623              :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     624              :         };
     625              : 
     626          106 :         if let Ok(inner) = &res {
     627          106 :             // do not incl. timeouts as errors in metrics but cancellations
     628          106 :             let started_at = ScopeGuard::into_inner(started_at);
     629          106 :             crate::metrics::BUCKET_METRICS
     630          106 :                 .req_seconds
     631          106 :                 .observe_elapsed(kind, inner, started_at);
     632          106 :         }
     633              : 
     634          106 :         match res {
     635          106 :             Ok(Ok(_put)) => Ok(()),
     636            0 :             Ok(Err(sdk)) => Err(sdk.into()),
     637            0 :             Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
     638              :         }
     639          106 :     }
     640              : 
     641            2 :     async fn copy(
     642            2 :         &self,
     643            2 :         from: &RemotePath,
     644            2 :         to: &RemotePath,
     645            2 :         cancel: &CancellationToken,
     646            2 :     ) -> anyhow::Result<()> {
     647            2 :         let kind = RequestKind::Copy;
     648            2 :         let _permit = self.permit(kind, cancel).await?;
     649              : 
     650            2 :         let timeout = tokio::time::sleep(self.timeout);
     651            2 : 
     652            2 :         let started_at = start_measuring_requests(kind);
     653            2 : 
     654            2 :         // we need to specify bucket_name as a prefix
     655            2 :         let copy_source = format!(
     656            2 :             "{}/{}",
     657            2 :             self.bucket_name,
     658            2 :             self.relative_path_to_s3_object(from)
     659            2 :         );
     660            2 : 
     661            2 :         let op = self
     662            2 :             .client
     663            2 :             .copy_object()
     664            2 :             .bucket(self.bucket_name.clone())
     665            2 :             .key(self.relative_path_to_s3_object(to))
     666            2 :             .set_storage_class(self.upload_storage_class.clone())
     667            2 :             .copy_source(copy_source)
     668            2 :             .send();
     669              : 
     670            2 :         let res = tokio::select! {
     671              :             res = op => res,
     672              :             _ = timeout => return Err(TimeoutOrCancel::Timeout.into()),
     673              :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     674              :         };
     675              : 
     676            2 :         let started_at = ScopeGuard::into_inner(started_at);
     677            2 :         crate::metrics::BUCKET_METRICS
     678            2 :             .req_seconds
     679            2 :             .observe_elapsed(kind, &res, started_at);
     680            2 : 
     681            2 :         res?;
     682              : 
     683            2 :         Ok(())
     684            2 :     }
     685              : 
     686           14 :     async fn download(
     687           14 :         &self,
     688           14 :         from: &RemotePath,
     689           14 :         cancel: &CancellationToken,
     690           14 :     ) -> Result<Download, DownloadError> {
     691           14 :         // if prefix is not none then download file `prefix/from`
     692           14 :         // if prefix is none then download file `from`
     693           14 :         self.download_object(
     694           14 :             GetObjectRequest {
     695           14 :                 bucket: self.bucket_name.clone(),
     696           14 :                 key: self.relative_path_to_s3_object(from),
     697           14 :                 range: None,
     698           14 :             },
     699           14 :             cancel,
     700           14 :         )
     701           19 :         .await
     702           14 :     }
     703              : 
     704           10 :     async fn download_byte_range(
     705           10 :         &self,
     706           10 :         from: &RemotePath,
     707           10 :         start_inclusive: u64,
     708           10 :         end_exclusive: Option<u64>,
     709           10 :         cancel: &CancellationToken,
     710           10 :     ) -> Result<Download, DownloadError> {
     711           10 :         // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
     712           10 :         // and needs both ends to be exclusive
     713           10 :         let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
     714           10 :         let range = Some(match end_inclusive {
     715            6 :             Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
     716            4 :             None => format!("bytes={start_inclusive}-"),
     717              :         });
     718              : 
     719           10 :         self.download_object(
     720           10 :             GetObjectRequest {
     721           10 :                 bucket: self.bucket_name.clone(),
     722           10 :                 key: self.relative_path_to_s3_object(from),
     723           10 :                 range,
     724           10 :             },
     725           10 :             cancel,
     726           10 :         )
     727           10 :         .await
     728           10 :     }
     729              : 
     730          102 :     async fn delete_objects<'a>(
     731          102 :         &self,
     732          102 :         paths: &'a [RemotePath],
     733          102 :         cancel: &CancellationToken,
     734          102 :     ) -> anyhow::Result<()> {
     735          102 :         let kind = RequestKind::Delete;
     736          102 :         let permit = self.permit(kind, cancel).await?;
     737          102 :         let mut delete_objects = Vec::with_capacity(paths.len());
     738          212 :         for path in paths {
     739          110 :             let obj_id = ObjectIdentifier::builder()
     740          110 :                 .set_key(Some(self.relative_path_to_s3_object(path)))
     741          110 :                 .build()
     742          110 :                 .context("convert path to oid")?;
     743          110 :             delete_objects.push(obj_id);
     744              :         }
     745              : 
     746          327 :         self.delete_oids(&permit, &delete_objects, cancel).await
     747          102 :     }
     748              : 
     749           90 :     async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
     750           90 :         let paths = std::array::from_ref(path);
     751          277 :         self.delete_objects(paths, cancel).await
     752           90 :     }
     753              : 
     754            6 :     async fn time_travel_recover(
     755            6 :         &self,
     756            6 :         prefix: Option<&RemotePath>,
     757            6 :         timestamp: SystemTime,
     758            6 :         done_if_after: SystemTime,
     759            6 :         cancel: &CancellationToken,
     760            6 :     ) -> Result<(), TimeTravelError> {
     761            6 :         let kind = RequestKind::TimeTravel;
     762            6 :         let permit = self.permit(kind, cancel).await?;
     763              : 
     764            6 :         let timestamp = DateTime::from(timestamp);
     765            6 :         let done_if_after = DateTime::from(done_if_after);
     766            6 : 
     767            6 :         tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
     768              : 
     769              :         // get the passed prefix or if it is not set use prefix_in_bucket value
     770            6 :         let prefix = prefix
     771            6 :             .map(|p| self.relative_path_to_s3_object(p))
     772            6 :             .or_else(|| self.prefix_in_bucket.clone());
     773            6 : 
     774            6 :         let warn_threshold = 3;
     775            6 :         let max_retries = 10;
     776            6 :         let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
     777              : 
     778            6 :         let mut key_marker = None;
     779            6 :         let mut version_id_marker = None;
     780            6 :         let mut versions_and_deletes = Vec::new();
     781              : 
     782              :         loop {
     783            6 :             let response = backoff::retry(
     784            7 :                 || async {
     785            7 :                     let op = self
     786            7 :                         .client
     787            7 :                         .list_object_versions()
     788            7 :                         .bucket(self.bucket_name.clone())
     789            7 :                         .set_prefix(prefix.clone())
     790            7 :                         .set_key_marker(key_marker.clone())
     791            7 :                         .set_version_id_marker(version_id_marker.clone())
     792            7 :                         .send();
     793            7 : 
     794            7 :                     tokio::select! {
     795            7 :                         res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
     796            7 :                         _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
     797            7 :                     }
     798            7 :                 },
     799            6 :                 is_permanent,
     800            6 :                 warn_threshold,
     801            6 :                 max_retries,
     802            6 :                 "listing object versions for time_travel_recover",
     803            6 :                 cancel,
     804            6 :             )
     805           47 :             .await
     806            6 :             .ok_or_else(|| TimeTravelError::Cancelled)
     807            6 :             .and_then(|x| x)?;
     808              : 
     809            6 :             tracing::trace!(
     810            0 :                 "  Got List response version_id_marker={:?}, key_marker={:?}",
     811              :                 response.version_id_marker,
     812              :                 response.key_marker
     813              :             );
     814            6 :             let versions = response
     815            6 :                 .versions
     816            6 :                 .unwrap_or_default()
     817            6 :                 .into_iter()
     818            6 :                 .map(VerOrDelete::from_version);
     819            6 :             let deletes = response
     820            6 :                 .delete_markers
     821            6 :                 .unwrap_or_default()
     822            6 :                 .into_iter()
     823            6 :                 .map(VerOrDelete::from_delete_marker);
     824            6 :             itertools::process_results(versions.chain(deletes), |n_vds| {
     825            6 :                 versions_and_deletes.extend(n_vds)
     826            6 :             })
     827            6 :             .map_err(TimeTravelError::Other)?;
     828           12 :             fn none_if_empty(v: Option<String>) -> Option<String> {
     829           12 :                 v.filter(|v| !v.is_empty())
     830           12 :             }
     831            6 :             version_id_marker = none_if_empty(response.next_version_id_marker);
     832            6 :             key_marker = none_if_empty(response.next_key_marker);
     833            6 :             if version_id_marker.is_none() {
     834              :                 // The final response is not supposed to be truncated
     835            6 :                 if response.is_truncated.unwrap_or_default() {
     836            0 :                     return Err(TimeTravelError::Other(anyhow::anyhow!(
     837            0 :                         "Received truncated ListObjectVersions response for prefix={prefix:?}"
     838            0 :                     )));
     839            6 :                 }
     840            6 :                 break;
     841            0 :             }
     842            0 :             // Limit the number of versions deletions, mostly so that we don't
     843            0 :             // keep requesting forever if the list is too long, as we'd put the
     844            0 :             // list in RAM.
     845            0 :             // Building a list of 100k entries that reaches the limit roughly takes
     846            0 :             // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
     847            0 :             const COMPLEXITY_LIMIT: usize = 100_000;
     848            0 :             if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
     849            0 :                 return Err(TimeTravelError::TooManyVersions);
     850            0 :             }
     851              :         }
     852              : 
     853            6 :         tracing::info!(
     854            0 :             "Built list for time travel with {} versions and deletions",
     855            0 :             versions_and_deletes.len()
     856              :         );
     857              : 
     858              :         // Work on the list of references instead of the objects directly,
     859              :         // otherwise we get lifetime errors in the sort_by_key call below.
     860            6 :         let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
     861            6 : 
     862          124 :         versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
     863            6 : 
     864            6 :         let mut vds_for_key = HashMap::<_, Vec<_>>::new();
     865              : 
     866           42 :         for vd in &versions_and_deletes {
     867              :             let VerOrDelete {
     868           36 :                 version_id, key, ..
     869           36 :             } = &vd;
     870           36 :             if version_id == "null" {
     871            0 :                 return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \
     872            0 :                     indicating either disabled versioning, or legacy objects with null version id values")));
     873           36 :             }
     874           36 :             tracing::trace!(
     875            0 :                 "Parsing version key={key} version_id={version_id} kind={:?}",
     876              :                 vd.kind
     877              :             );
     878              : 
     879           36 :             vds_for_key.entry(key).or_default().push(vd);
     880              :         }
     881           24 :         for (key, versions) in vds_for_key {
     882           18 :             let last_vd = versions.last().unwrap();
     883           18 :             if last_vd.last_modified > done_if_after {
     884            0 :                 tracing::trace!("Key {key} has version later than done_if_after, skipping");
     885            0 :                 continue;
     886           18 :             }
     887              :             // the version we want to restore to.
     888           18 :             let version_to_restore_to =
     889           28 :                 match versions.binary_search_by_key(&timestamp, |tpl| tpl.last_modified) {
     890            0 :                     Ok(v) => v,
     891           18 :                     Err(e) => e,
     892              :                 };
     893           18 :             if version_to_restore_to == versions.len() {
     894            6 :                 tracing::trace!("Key {key} has no changes since timestamp, skipping");
     895            6 :                 continue;
     896           12 :             }
     897           12 :             let mut do_delete = false;
     898           12 :             if version_to_restore_to == 0 {
     899              :                 // All versions more recent, so the key didn't exist at the specified time point.
     900            6 :                 tracing::trace!(
     901            0 :                     "All {} versions more recent for {key}, deleting",
     902            0 :                     versions.len()
     903              :                 );
     904            6 :                 do_delete = true;
     905              :             } else {
     906            6 :                 match &versions[version_to_restore_to - 1] {
     907              :                     VerOrDelete {
     908              :                         kind: VerOrDeleteKind::Version,
     909            6 :                         version_id,
     910            6 :                         ..
     911            6 :                     } => {
     912            6 :                         tracing::trace!("Copying old version {version_id} for {key}...");
     913              :                         // Restore the state to the last version by copying
     914            6 :                         let source_id =
     915            6 :                             format!("{}/{key}?versionId={version_id}", self.bucket_name);
     916            6 : 
     917            6 :                         backoff::retry(
     918            6 :                             || async {
     919            6 :                                 let op = self
     920            6 :                                     .client
     921            6 :                                     .copy_object()
     922            6 :                                     .bucket(self.bucket_name.clone())
     923            6 :                                     .key(key)
     924            6 :                                     .set_storage_class(self.upload_storage_class.clone())
     925            6 :                                     .copy_source(&source_id)
     926            6 :                                     .send();
     927            6 : 
     928            6 :                                 tokio::select! {
     929            6 :                                     res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
     930            6 :                                     _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
     931            6 :                                 }
     932            6 :                             },
     933            6 :                             is_permanent,
     934            6 :                             warn_threshold,
     935            6 :                             max_retries,
     936            6 :                             "copying object version for time_travel_recover",
     937            6 :                             cancel,
     938            6 :                         )
     939           24 :                         .await
     940            6 :                         .ok_or_else(|| TimeTravelError::Cancelled)
     941            6 :                         .and_then(|x| x)?;
     942            6 :                         tracing::info!(%version_id, %key, "Copied old version in S3");
     943              :                     }
     944              :                     VerOrDelete {
     945              :                         kind: VerOrDeleteKind::DeleteMarker,
     946              :                         ..
     947            0 :                     } => {
     948            0 :                         do_delete = true;
     949            0 :                     }
     950              :                 }
     951              :             };
     952           12 :             if do_delete {
     953            6 :                 if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
     954              :                     // Key has since been deleted (but there was some history), no need to do anything
     955            2 :                     tracing::trace!("Key {key} already deleted, skipping.");
     956              :                 } else {
     957            4 :                     tracing::trace!("Deleting {key}...");
     958              : 
     959            4 :                     let oid = ObjectIdentifier::builder()
     960            4 :                         .key(key.to_owned())
     961            4 :                         .build()
     962            4 :                         .map_err(|e| TimeTravelError::Other(e.into()))?;
     963              : 
     964            4 :                     self.delete_oids(&permit, &[oid], cancel)
     965            8 :                         .await
     966            4 :                         .map_err(|e| {
     967            0 :                             // delete_oid0 will use TimeoutOrCancel
     968            0 :                             if TimeoutOrCancel::caused_by_cancel(&e) {
     969            0 :                                 TimeTravelError::Cancelled
     970              :                             } else {
     971            0 :                                 TimeTravelError::Other(e)
     972              :                             }
     973            4 :                         })?;
     974              :                 }
     975            6 :             }
     976              :         }
     977            6 :         Ok(())
     978            6 :     }
     979              : }
     980              : 
     981              : // Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
     982              : struct VerOrDelete {
     983              :     kind: VerOrDeleteKind,
     984              :     last_modified: DateTime,
     985              :     version_id: String,
     986              :     key: String,
     987              : }
     988              : 
     989              : #[derive(Debug)]
     990              : enum VerOrDeleteKind {
     991              :     Version,
     992              :     DeleteMarker,
     993              : }
     994              : 
     995              : impl VerOrDelete {
     996           36 :     fn with_kind(
     997           36 :         kind: VerOrDeleteKind,
     998           36 :         last_modified: Option<DateTime>,
     999           36 :         version_id: Option<String>,
    1000           36 :         key: Option<String>,
    1001           36 :     ) -> anyhow::Result<Self> {
    1002           36 :         let lvk = (last_modified, version_id, key);
    1003           36 :         let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
    1004            0 :             anyhow::bail!(
    1005            0 :                 "One (or more) of last_modified, key, and id is None. \
    1006            0 :             Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
    1007            0 :                 lvk.0,
    1008            0 :                 lvk.1,
    1009            0 :                 lvk.2,
    1010            0 :             );
    1011              :         };
    1012           36 :         Ok(Self {
    1013           36 :             kind,
    1014           36 :             last_modified,
    1015           36 :             version_id,
    1016           36 :             key,
    1017           36 :         })
    1018           36 :     }
    1019           28 :     fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
    1020           28 :         Self::with_kind(
    1021           28 :             VerOrDeleteKind::Version,
    1022           28 :             v.last_modified,
    1023           28 :             v.version_id,
    1024           28 :             v.key,
    1025           28 :         )
    1026           28 :     }
    1027            8 :     fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
    1028            8 :         Self::with_kind(
    1029            8 :             VerOrDeleteKind::DeleteMarker,
    1030            8 :             v.last_modified,
    1031            8 :             v.version_id,
    1032            8 :             v.key,
    1033            8 :         )
    1034            8 :     }
    1035              : }
    1036              : 
    1037              : #[cfg(test)]
    1038              : mod tests {
    1039              :     use camino::Utf8Path;
    1040              :     use std::num::NonZeroUsize;
    1041              : 
    1042              :     use crate::{RemotePath, S3Bucket, S3Config};
    1043              : 
    1044              :     #[test]
    1045            4 :     fn relative_path() {
    1046            4 :         let all_paths = ["", "some/path", "some/path/"];
    1047            4 :         let all_paths: Vec<RemotePath> = all_paths
    1048            4 :             .iter()
    1049           12 :             .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
    1050            4 :             .collect();
    1051            4 :         let prefixes = [
    1052            4 :             None,
    1053            4 :             Some(""),
    1054            4 :             Some("test/prefix"),
    1055            4 :             Some("test/prefix/"),
    1056            4 :             Some("/test/prefix/"),
    1057            4 :         ];
    1058            4 :         let expected_outputs = [
    1059            4 :             vec!["", "some/path", "some/path/"],
    1060            4 :             vec!["/", "/some/path", "/some/path/"],
    1061            4 :             vec![
    1062            4 :                 "test/prefix/",
    1063            4 :                 "test/prefix/some/path",
    1064            4 :                 "test/prefix/some/path/",
    1065            4 :             ],
    1066            4 :             vec![
    1067            4 :                 "test/prefix/",
    1068            4 :                 "test/prefix/some/path",
    1069            4 :                 "test/prefix/some/path/",
    1070            4 :             ],
    1071            4 :             vec![
    1072            4 :                 "test/prefix/",
    1073            4 :                 "test/prefix/some/path",
    1074            4 :                 "test/prefix/some/path/",
    1075            4 :             ],
    1076            4 :         ];
    1077              : 
    1078           20 :         for (prefix_idx, prefix) in prefixes.iter().enumerate() {
    1079           20 :             let config = S3Config {
    1080           20 :                 bucket_name: "bucket".to_owned(),
    1081           20 :                 bucket_region: "region".to_owned(),
    1082           20 :                 prefix_in_bucket: prefix.map(str::to_string),
    1083           20 :                 endpoint: None,
    1084           20 :                 concurrency_limit: NonZeroUsize::new(100).unwrap(),
    1085           20 :                 max_keys_per_list_response: Some(5),
    1086           20 :                 upload_storage_class: None,
    1087           20 :             };
    1088           20 :             let storage =
    1089           20 :                 S3Bucket::new(&config, std::time::Duration::ZERO).expect("remote storage init");
    1090           60 :             for (test_path_idx, test_path) in all_paths.iter().enumerate() {
    1091           60 :                 let result = storage.relative_path_to_s3_object(test_path);
    1092           60 :                 let expected = expected_outputs[prefix_idx][test_path_idx];
    1093           60 :                 assert_eq!(result, expected);
    1094              :             }
    1095              :         }
    1096            4 :     }
    1097              : }
        

Generated by: LCOV version 2.1-beta