LCOV - code coverage report
Current view: top level - libs/remote_storage/src - s3_bucket.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 34.3 % 792 272
Test Date: 2024-05-10 13:18:37 Functions: 30.7 % 88 27

            Line data    Source code
       1              : //! AWS S3 storage wrapper around `rusoto` library.
       2              : //!
       3              : //! Respects `prefix_in_bucket` property from [`S3Config`],
       4              : //! allowing multiple api users to independently work with the same S3 bucket, if
       5              : //! their bucket prefixes are both specified and different.
       6              : 
       7              : use std::{
       8              :     borrow::Cow,
       9              :     collections::HashMap,
      10              :     num::NonZeroU32,
      11              :     pin::Pin,
      12              :     sync::Arc,
      13              :     task::{Context, Poll},
      14              :     time::{Duration, SystemTime},
      15              : };
      16              : 
      17              : use anyhow::{anyhow, Context as _};
      18              : use aws_config::{
      19              :     environment::credentials::EnvironmentVariableCredentialsProvider,
      20              :     imds::credentials::ImdsCredentialsProvider,
      21              :     meta::credentials::CredentialsProviderChain,
      22              :     profile::ProfileFileCredentialsProvider,
      23              :     provider_config::ProviderConfig,
      24              :     retry::{RetryConfigBuilder, RetryMode},
      25              :     web_identity_token::WebIdentityTokenCredentialsProvider,
      26              :     BehaviorVersion,
      27              : };
      28              : use aws_credential_types::provider::SharedCredentialsProvider;
      29              : use aws_sdk_s3::{
      30              :     config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
      31              :     error::SdkError,
      32              :     operation::get_object::GetObjectError,
      33              :     types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass},
      34              :     Client,
      35              : };
      36              : use aws_smithy_async::rt::sleep::TokioSleep;
      37              : 
      38              : use aws_smithy_types::{body::SdkBody, DateTime};
      39              : use aws_smithy_types::{byte_stream::ByteStream, date_time::ConversionError};
      40              : use bytes::Bytes;
      41              : use futures::stream::Stream;
      42              : use hyper::Body;
      43              : use scopeguard::ScopeGuard;
      44              : use tokio_util::sync::CancellationToken;
      45              : use utils::backoff;
      46              : 
      47              : use super::StorageMetadata;
      48              : use crate::{
      49              :     error::Cancelled, support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError,
      50              :     Listing, ListingMode, RemotePath, RemoteStorage, S3Config, TimeTravelError, TimeoutOrCancel,
      51              :     MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
      52              : };
      53              : 
      54              : pub(super) mod metrics;
      55              : 
      56              : use self::metrics::AttemptOutcome;
      57              : pub(super) use self::metrics::RequestKind;
      58              : 
      59              : /// AWS S3 storage.
      60              : pub struct S3Bucket {
      61              :     client: Client,
      62              :     bucket_name: String,
      63              :     prefix_in_bucket: Option<String>,
      64              :     max_keys_per_list_response: Option<i32>,
      65              :     upload_storage_class: Option<StorageClass>,
      66              :     concurrency_limiter: ConcurrencyLimiter,
      67              :     // Per-request timeout. Accessible for tests.
      68              :     pub timeout: Duration,
      69              : }
      70              : 
      71              : struct GetObjectRequest {
      72              :     bucket: String,
      73              :     key: String,
      74              :     range: Option<String>,
      75              : }
      76              : impl S3Bucket {
      77              :     /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
      78           28 :     pub fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
      79           28 :         tracing::debug!(
      80            0 :             "Creating s3 remote storage for S3 bucket {}",
      81              :             remote_storage_config.bucket_name
      82              :         );
      83              : 
      84           28 :         let region = Some(Region::new(remote_storage_config.bucket_region.clone()));
      85           28 : 
      86           28 :         let provider_conf = ProviderConfig::without_region().with_region(region.clone());
      87           28 : 
      88           28 :         let credentials_provider = {
      89           28 :             // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
      90           28 :             CredentialsProviderChain::first_try(
      91           28 :                 "env",
      92           28 :                 EnvironmentVariableCredentialsProvider::new(),
      93           28 :             )
      94           28 :             // uses "AWS_PROFILE" / `aws sso login --profile <profile>`
      95           28 :             .or_else(
      96           28 :                 "profile-sso",
      97           28 :                 ProfileFileCredentialsProvider::builder()
      98           28 :                     .configure(&provider_conf)
      99           28 :                     .build(),
     100           28 :             )
     101           28 :             // uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
     102           28 :             // needed to access remote extensions bucket
     103           28 :             .or_else(
     104           28 :                 "token",
     105           28 :                 WebIdentityTokenCredentialsProvider::builder()
     106           28 :                     .configure(&provider_conf)
     107           28 :                     .build(),
     108           28 :             )
     109           28 :             // uses imds v2
     110           28 :             .or_else("imds", ImdsCredentialsProvider::builder().build())
     111           28 :         };
     112           28 : 
     113           28 :         // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
     114           28 :         let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
     115           28 : 
     116           28 :         let sdk_config_loader: aws_config::ConfigLoader = aws_config::defaults(
     117           28 :             #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
     118           28 :             BehaviorVersion::v2023_11_09(),
     119           28 :         )
     120           28 :         .region(region)
     121           28 :         .identity_cache(IdentityCache::lazy().build())
     122           28 :         .credentials_provider(SharedCredentialsProvider::new(credentials_provider))
     123           28 :         .sleep_impl(SharedAsyncSleep::from(sleep_impl));
     124           28 : 
     125           28 :         let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
     126           28 :             s.spawn(|| {
     127           28 :                 // TODO: make this function async.
     128           28 :                 tokio::runtime::Builder::new_current_thread()
     129           28 :                     .enable_all()
     130           28 :                     .build()
     131           28 :                     .unwrap()
     132           28 :                     .block_on(sdk_config_loader.load())
     133           28 :             })
     134           28 :             .join()
     135           28 :             .unwrap()
     136           28 :         });
     137           28 : 
     138           28 :         let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
     139              : 
     140              :         // Technically, the `remote_storage_config.endpoint` field only applies to S3 interactions.
     141              :         // (In case we ever re-use the `sdk_config` for more than just the S3 client in the future)
     142           28 :         if let Some(custom_endpoint) = remote_storage_config.endpoint.clone() {
     143            0 :             s3_config_builder = s3_config_builder
     144            0 :                 .endpoint_url(custom_endpoint)
     145            0 :                 .force_path_style(true);
     146           28 :         }
     147              : 
     148              :         // We do our own retries (see [`backoff::retry`]).  However, for the AWS SDK to enable rate limiting in response to throttling
     149              :         // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config.  We set it to use at most one
     150              :         // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
     151           28 :         let mut retry_config = RetryConfigBuilder::new();
     152           28 :         retry_config
     153           28 :             .set_max_attempts(Some(1))
     154           28 :             .set_mode(Some(RetryMode::Adaptive));
     155           28 :         s3_config_builder = s3_config_builder.retry_config(retry_config.build());
     156           28 : 
     157           28 :         let s3_config = s3_config_builder.build();
     158           28 :         let client = aws_sdk_s3::Client::from_conf(s3_config);
     159           28 : 
     160           28 :         let prefix_in_bucket = remote_storage_config
     161           28 :             .prefix_in_bucket
     162           28 :             .as_deref()
     163           28 :             .map(|prefix| {
     164           26 :                 let mut prefix = prefix;
     165           28 :                 while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     166            2 :                     prefix = &prefix[1..]
     167              :                 }
     168              : 
     169           26 :                 let mut prefix = prefix.to_string();
     170           48 :                 while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     171           22 :                     prefix.pop();
     172           22 :                 }
     173           26 :                 prefix
     174           28 :             });
     175           28 : 
     176           28 :         Ok(Self {
     177           28 :             client,
     178           28 :             bucket_name: remote_storage_config.bucket_name.clone(),
     179           28 :             max_keys_per_list_response: remote_storage_config.max_keys_per_list_response,
     180           28 :             prefix_in_bucket,
     181           28 :             concurrency_limiter: ConcurrencyLimiter::new(
     182           28 :                 remote_storage_config.concurrency_limit.get(),
     183           28 :             ),
     184           28 :             upload_storage_class: remote_storage_config.upload_storage_class.clone(),
     185           28 :             timeout,
     186           28 :         })
     187           28 :     }
     188              : 
     189          126 :     fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
     190          126 :         let relative_path =
     191          126 :             match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
     192          126 :                 Some(stripped) => stripped,
     193              :                 // we rely on AWS to return properly prefixed paths
     194              :                 // for requests with a certain prefix
     195            0 :                 None => panic!(
     196            0 :                     "Key {} does not start with bucket prefix {:?}",
     197            0 :                     key, self.prefix_in_bucket
     198            0 :                 ),
     199              :             };
     200          126 :         RemotePath(
     201          126 :             relative_path
     202          126 :                 .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     203          126 :                 .collect(),
     204          126 :         )
     205          126 :     }
     206              : 
     207          279 :     pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
     208          279 :         assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
     209          279 :         let path_string = path.get_path().as_str();
     210          279 :         match &self.prefix_in_bucket {
     211          273 :             Some(prefix) => prefix.clone() + "/" + path_string,
     212            6 :             None => path_string.to_string(),
     213              :         }
     214          279 :     }
     215              : 
     216          241 :     async fn permit(
     217          241 :         &self,
     218          241 :         kind: RequestKind,
     219          241 :         cancel: &CancellationToken,
     220          241 :     ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
     221            0 :         let started_at = start_counting_cancelled_wait(kind);
     222            0 :         let acquire = self.concurrency_limiter.acquire(kind);
     223              : 
     224            0 :         let permit = tokio::select! {
     225              :             permit = acquire => permit.expect("semaphore is never closed"),
     226              :             _ = cancel.cancelled() => return Err(Cancelled),
     227              :         };
     228              : 
     229            0 :         let started_at = ScopeGuard::into_inner(started_at);
     230            0 :         metrics::BUCKET_METRICS
     231            0 :             .wait_seconds
     232            0 :             .observe_elapsed(kind, started_at);
     233            0 : 
     234            0 :         Ok(permit)
     235            0 :     }
     236              : 
     237           24 :     async fn owned_permit(
     238           24 :         &self,
     239           24 :         kind: RequestKind,
     240           24 :         cancel: &CancellationToken,
     241           24 :     ) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
     242            0 :         let started_at = start_counting_cancelled_wait(kind);
     243            0 :         let acquire = self.concurrency_limiter.acquire_owned(kind);
     244              : 
     245            0 :         let permit = tokio::select! {
     246              :             permit = acquire => permit.expect("semaphore is never closed"),
     247              :             _ = cancel.cancelled() => return Err(Cancelled),
     248              :         };
     249              : 
     250            0 :         let started_at = ScopeGuard::into_inner(started_at);
     251            0 :         metrics::BUCKET_METRICS
     252            0 :             .wait_seconds
     253            0 :             .observe_elapsed(kind, started_at);
     254            0 :         Ok(permit)
     255            0 :     }
     256              : 
     257           24 :     async fn download_object(
     258           24 :         &self,
     259           24 :         request: GetObjectRequest,
     260           24 :         cancel: &CancellationToken,
     261           24 :     ) -> Result<Download, DownloadError> {
     262            0 :         let kind = RequestKind::Get;
     263              : 
     264            0 :         let permit = self.owned_permit(kind, cancel).await?;
     265              : 
     266            0 :         let started_at = start_measuring_requests(kind);
     267            0 : 
     268            0 :         let get_object = self
     269            0 :             .client
     270            0 :             .get_object()
     271            0 :             .bucket(request.bucket)
     272            0 :             .key(request.key)
     273            0 :             .set_range(request.range)
     274            0 :             .send();
     275              : 
     276            0 :         let get_object = tokio::select! {
     277              :             res = get_object => res,
     278              :             _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
     279              :             _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
     280              :         };
     281              : 
     282            0 :         let started_at = ScopeGuard::into_inner(started_at);
     283              : 
     284            0 :         let object_output = match get_object {
     285            0 :             Ok(object_output) => object_output,
     286            0 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
     287              :                 // Count this in the AttemptOutcome::Ok bucket, because 404 is not
     288              :                 // an error: we expect to sometimes fetch an object and find it missing,
     289              :                 // e.g. when probing for timeline indices.
     290            0 :                 metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     291            0 :                     kind,
     292            0 :                     AttemptOutcome::Ok,
     293            0 :                     started_at,
     294            0 :                 );
     295            0 :                 return Err(DownloadError::NotFound);
     296              :             }
     297            0 :             Err(e) => {
     298            0 :                 metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     299            0 :                     kind,
     300            0 :                     AttemptOutcome::Err,
     301            0 :                     started_at,
     302            0 :                 );
     303            0 : 
     304            0 :                 return Err(DownloadError::Other(
     305            0 :                     anyhow::Error::new(e).context("download s3 object"),
     306            0 :                 ));
     307              :             }
     308              :         };
     309              : 
     310              :         // even if we would have no timeout left, continue anyways. the caller can decide to ignore
     311              :         // the errors considering timeouts and cancellation.
     312            0 :         let remaining = self.timeout.saturating_sub(started_at.elapsed());
     313            0 : 
     314            0 :         let metadata = object_output.metadata().cloned().map(StorageMetadata);
     315            0 :         let etag = object_output
     316            0 :             .e_tag
     317            0 :             .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?
     318            0 :             .into();
     319            0 :         let last_modified = object_output
     320            0 :             .last_modified
     321            0 :             .ok_or(DownloadError::Other(anyhow::anyhow!(
     322            0 :                 "Missing LastModified header"
     323            0 :             )))?
     324            0 :             .try_into()
     325            0 :             .map_err(|e: ConversionError| DownloadError::Other(e.into()))?;
     326              : 
     327            0 :         let body = object_output.body;
     328            0 :         let body = ByteStreamAsStream::from(body);
     329            0 :         let body = PermitCarrying::new(permit, body);
     330            0 :         let body = TimedDownload::new(started_at, body);
     331            0 : 
     332            0 :         let cancel_or_timeout = crate::support::cancel_or_timeout(remaining, cancel.clone());
     333            0 :         let body = crate::support::DownloadStream::new(cancel_or_timeout, body);
     334            0 : 
     335            0 :         Ok(Download {
     336            0 :             metadata,
     337            0 :             etag,
     338            0 :             last_modified,
     339            0 :             download_stream: Box::pin(body),
     340            0 :         })
     341            0 :     }
     342              : 
     343          106 :     async fn delete_oids(
     344          106 :         &self,
     345          106 :         _permit: &tokio::sync::SemaphorePermit<'_>,
     346          106 :         delete_objects: &[ObjectIdentifier],
     347          106 :         cancel: &CancellationToken,
     348          106 :     ) -> anyhow::Result<()> {
     349            0 :         let kind = RequestKind::Delete;
     350            0 :         let mut cancel = std::pin::pin!(cancel.cancelled());
     351              : 
     352            0 :         for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
     353            0 :             let started_at = start_measuring_requests(kind);
     354              : 
     355            0 :             let req = self
     356            0 :                 .client
     357            0 :                 .delete_objects()
     358            0 :                 .bucket(self.bucket_name.clone())
     359            0 :                 .delete(
     360            0 :                     Delete::builder()
     361            0 :                         .set_objects(Some(chunk.to_vec()))
     362            0 :                         .build()
     363            0 :                         .context("build request")?,
     364              :                 )
     365            0 :                 .send();
     366              : 
     367            0 :             let resp = tokio::select! {
     368              :                 resp = req => resp,
     369              :                 _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
     370              :                 _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
     371              :             };
     372              : 
     373            0 :             let started_at = ScopeGuard::into_inner(started_at);
     374            0 :             metrics::BUCKET_METRICS
     375            0 :                 .req_seconds
     376            0 :                 .observe_elapsed(kind, &resp, started_at);
     377              : 
     378            0 :             let resp = resp.context("request deletion")?;
     379            0 :             metrics::BUCKET_METRICS
     380            0 :                 .deleted_objects_total
     381            0 :                 .inc_by(chunk.len() as u64);
     382              : 
     383            0 :             if let Some(errors) = resp.errors {
     384              :                 // Log a bounded number of the errors within the response:
     385              :                 // these requests can carry 1000 keys so logging each one
     386              :                 // would be too verbose, especially as errors may lead us
     387              :                 // to retry repeatedly.
     388              :                 const LOG_UP_TO_N_ERRORS: usize = 10;
     389            0 :                 for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
     390            0 :                     tracing::warn!(
     391            0 :                         "DeleteObjects key {} failed: {}: {}",
     392            0 :                         e.key.as_ref().map(Cow::from).unwrap_or("".into()),
     393            0 :                         e.code.as_ref().map(Cow::from).unwrap_or("".into()),
     394            0 :                         e.message.as_ref().map(Cow::from).unwrap_or("".into())
     395              :                     );
     396              :                 }
     397              : 
     398            0 :                 return Err(anyhow::anyhow!(
     399            0 :                     "Failed to delete {}/{} objects",
     400            0 :                     errors.len(),
     401            0 :                     chunk.len(),
     402            0 :                 ));
     403            0 :             }
     404              :         }
     405            0 :         Ok(())
     406            0 :     }
     407              : }
     408              : 
     409              : pin_project_lite::pin_project! {
     410              :     struct ByteStreamAsStream {
     411              :         #[pin]
     412              :         inner: aws_smithy_types::byte_stream::ByteStream
     413              :     }
     414              : }
     415              : 
     416              : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
     417           24 :     fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
     418           24 :         ByteStreamAsStream { inner }
     419           24 :     }
     420              : }
     421              : 
     422              : impl Stream for ByteStreamAsStream {
     423              :     type Item = std::io::Result<Bytes>;
     424              : 
     425           47 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     426           47 :         // this does the std::io::ErrorKind::Other conversion
     427           47 :         self.project().inner.poll_next(cx).map_err(|x| x.into())
     428           47 :     }
     429              : 
     430              :     // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
     431              :     // sense and Stream::size_hint does not really
     432              : }
     433              : 
     434              : pin_project_lite::pin_project! {
     435              :     /// Times and tracks the outcome of the request.
     436              :     struct TimedDownload<S> {
     437              :         started_at: std::time::Instant,
     438              :         outcome: metrics::AttemptOutcome,
     439              :         #[pin]
     440              :         inner: S
     441              :     }
     442              : 
     443              :     impl<S> PinnedDrop for TimedDownload<S> {
     444              :         fn drop(mut this: Pin<&mut Self>) {
     445              :             metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
     446              :         }
     447              :     }
     448              : }
     449              : 
     450              : impl<S> TimedDownload<S> {
     451            0 :     fn new(started_at: std::time::Instant, inner: S) -> Self {
     452            0 :         TimedDownload {
     453            0 :             started_at,
     454            0 :             outcome: metrics::AttemptOutcome::Cancelled,
     455            0 :             inner,
     456            0 :         }
     457            0 :     }
     458              : }
     459              : 
     460              : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
     461              :     type Item = <S as Stream>::Item;
     462              : 
     463            0 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     464            0 :         use std::task::ready;
     465            0 : 
     466            0 :         let this = self.project();
     467              : 
     468            0 :         let res = ready!(this.inner.poll_next(cx));
     469            0 :         match &res {
     470            0 :             Some(Ok(_)) => {}
     471            0 :             Some(Err(_)) => *this.outcome = metrics::AttemptOutcome::Err,
     472            0 :             None => *this.outcome = metrics::AttemptOutcome::Ok,
     473              :         }
     474              : 
     475            0 :         Poll::Ready(res)
     476            0 :     }
     477              : 
     478            0 :     fn size_hint(&self) -> (usize, Option<usize>) {
     479            0 :         self.inner.size_hint()
     480            0 :     }
     481              : }
     482              : 
     483              : impl RemoteStorage for S3Bucket {
     484           24 :     async fn list(
     485           24 :         &self,
     486           24 :         prefix: Option<&RemotePath>,
     487           24 :         mode: ListingMode,
     488           24 :         max_keys: Option<NonZeroU32>,
     489           24 :         cancel: &CancellationToken,
     490           24 :     ) -> Result<Listing, DownloadError> {
     491            0 :         let kind = RequestKind::List;
     492            0 :         // s3 sdk wants i32
     493            0 :         let mut max_keys = max_keys.map(|mk| mk.get() as i32);
     494            0 :         let mut result = Listing::default();
     495            0 : 
     496            0 :         // get the passed prefix or if it is not set use prefix_in_bucket value
     497            0 :         let list_prefix = prefix
     498            0 :             .map(|p| self.relative_path_to_s3_object(p))
     499            0 :             .or_else(|| {
     500            0 :                 self.prefix_in_bucket.clone().map(|mut s| {
     501            0 :                     s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
     502            0 :                     s
     503            0 :                 })
     504            0 :             });
     505              : 
     506            0 :         let _permit = self.permit(kind, cancel).await?;
     507              : 
     508            0 :         let mut continuation_token = None;
     509              : 
     510              :         loop {
     511            0 :             let started_at = start_measuring_requests(kind);
     512            0 : 
     513            0 :             // min of two Options, returning Some if one is value and another is
     514            0 :             // None (None is smaller than anything, so plain min doesn't work).
     515            0 :             let request_max_keys = self
     516            0 :                 .max_keys_per_list_response
     517            0 :                 .into_iter()
     518            0 :                 .chain(max_keys.into_iter())
     519            0 :                 .min();
     520            0 :             let mut request = self
     521            0 :                 .client
     522            0 :                 .list_objects_v2()
     523            0 :                 .bucket(self.bucket_name.clone())
     524            0 :                 .set_prefix(list_prefix.clone())
     525            0 :                 .set_continuation_token(continuation_token)
     526            0 :                 .set_max_keys(request_max_keys);
     527            0 : 
     528            0 :             if let ListingMode::WithDelimiter = mode {
     529            0 :                 request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
     530            0 :             }
     531              : 
     532            0 :             let request = request.send();
     533              : 
     534            0 :             let response = tokio::select! {
     535              :                 res = request => res,
     536              :                 _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
     537              :                 _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
     538              :             };
     539              : 
     540            0 :             let response = response
     541            0 :                 .context("Failed to list S3 prefixes")
     542            0 :                 .map_err(DownloadError::Other);
     543            0 : 
     544            0 :             let started_at = ScopeGuard::into_inner(started_at);
     545            0 : 
     546            0 :             metrics::BUCKET_METRICS
     547            0 :                 .req_seconds
     548            0 :                 .observe_elapsed(kind, &response, started_at);
     549              : 
     550            0 :             let response = response?;
     551              : 
     552            0 :             let keys = response.contents();
     553            0 :             let empty = Vec::new();
     554            0 :             let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty);
     555            0 : 
     556            0 :             tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
     557              : 
     558            0 :             for object in keys {
     559            0 :                 let object_path = object.key().expect("response does not contain a key");
     560            0 :                 let remote_path = self.s3_object_to_relative_path(object_path);
     561            0 :                 result.keys.push(remote_path);
     562            0 :                 if let Some(mut mk) = max_keys {
     563            0 :                     assert!(mk > 0);
     564            0 :                     mk -= 1;
     565            0 :                     if mk == 0 {
     566            0 :                         return Ok(result); // limit reached
     567            0 :                     }
     568            0 :                     max_keys = Some(mk);
     569            0 :                 }
     570              :             }
     571              : 
     572              :             // S3 gives us prefixes like "foo/", we return them like "foo"
     573            0 :             result.prefixes.extend(prefixes.iter().filter_map(|o| {
     574            0 :                 Some(
     575            0 :                     self.s3_object_to_relative_path(
     576            0 :                         o.prefix()?
     577            0 :                             .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR),
     578              :                     ),
     579              :                 )
     580            0 :             }));
     581              : 
     582            0 :             continuation_token = match response.next_continuation_token {
     583            0 :                 Some(new_token) => Some(new_token),
     584            0 :                 None => break,
     585            0 :             };
     586            0 :         }
     587            0 : 
     588            0 :         Ok(result)
     589            0 :     }
     590              : 
     591            0 :     async fn upload(
     592            0 :         &self,
     593            0 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     594            0 :         from_size_bytes: usize,
     595            0 :         to: &RemotePath,
     596            0 :         metadata: Option<StorageMetadata>,
     597            0 :         cancel: &CancellationToken,
     598            0 :     ) -> anyhow::Result<()> {
     599            0 :         let kind = RequestKind::Put;
     600            0 :         let _permit = self.permit(kind, cancel).await?;
     601              : 
     602            0 :         let started_at = start_measuring_requests(kind);
     603            0 : 
     604            0 :         let body = Body::wrap_stream(from);
     605            0 :         let bytes_stream = ByteStream::new(SdkBody::from_body_0_4(body));
     606              : 
     607            0 :         let upload = self
     608            0 :             .client
     609            0 :             .put_object()
     610            0 :             .bucket(self.bucket_name.clone())
     611            0 :             .key(self.relative_path_to_s3_object(to))
     612            0 :             .set_metadata(metadata.map(|m| m.0))
     613            0 :             .set_storage_class(self.upload_storage_class.clone())
     614            0 :             .content_length(from_size_bytes.try_into()?)
     615            0 :             .body(bytes_stream)
     616            0 :             .send();
     617            0 : 
     618            0 :         let upload = tokio::time::timeout(self.timeout, upload);
     619              : 
     620            0 :         let res = tokio::select! {
     621              :             res = upload => res,
     622              :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     623              :         };
     624              : 
     625            0 :         if let Ok(inner) = &res {
     626            0 :             // do not incl. timeouts as errors in metrics but cancellations
     627            0 :             let started_at = ScopeGuard::into_inner(started_at);
     628            0 :             metrics::BUCKET_METRICS
     629            0 :                 .req_seconds
     630            0 :                 .observe_elapsed(kind, inner, started_at);
     631            0 :         }
     632              : 
     633            0 :         match res {
     634            0 :             Ok(Ok(_put)) => Ok(()),
     635            0 :             Ok(Err(sdk)) => Err(sdk.into()),
     636            0 :             Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
     637              :         }
     638            0 :     }
     639              : 
     640            2 :     async fn copy(
     641            2 :         &self,
     642            2 :         from: &RemotePath,
     643            2 :         to: &RemotePath,
     644            2 :         cancel: &CancellationToken,
     645            2 :     ) -> anyhow::Result<()> {
     646            0 :         let kind = RequestKind::Copy;
     647            0 :         let _permit = self.permit(kind, cancel).await?;
     648              : 
     649            0 :         let timeout = tokio::time::sleep(self.timeout);
     650            0 : 
     651            0 :         let started_at = start_measuring_requests(kind);
     652            0 : 
     653            0 :         // we need to specify bucket_name as a prefix
     654            0 :         let copy_source = format!(
     655            0 :             "{}/{}",
     656            0 :             self.bucket_name,
     657            0 :             self.relative_path_to_s3_object(from)
     658            0 :         );
     659            0 : 
     660            0 :         let op = self
     661            0 :             .client
     662            0 :             .copy_object()
     663            0 :             .bucket(self.bucket_name.clone())
     664            0 :             .key(self.relative_path_to_s3_object(to))
     665            0 :             .set_storage_class(self.upload_storage_class.clone())
     666            0 :             .copy_source(copy_source)
     667            0 :             .send();
     668              : 
     669            0 :         let res = tokio::select! {
     670              :             res = op => res,
     671              :             _ = timeout => return Err(TimeoutOrCancel::Timeout.into()),
     672              :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     673              :         };
     674              : 
     675            0 :         let started_at = ScopeGuard::into_inner(started_at);
     676            0 :         metrics::BUCKET_METRICS
     677            0 :             .req_seconds
     678            0 :             .observe_elapsed(kind, &res, started_at);
     679            0 : 
     680            0 :         res?;
     681              : 
     682            0 :         Ok(())
     683            0 :     }
     684              : 
     685           14 :     async fn download(
     686           14 :         &self,
     687           14 :         from: &RemotePath,
     688           14 :         cancel: &CancellationToken,
     689           14 :     ) -> Result<Download, DownloadError> {
     690            0 :         // if prefix is not none then download file `prefix/from`
     691            0 :         // if prefix is none then download file `from`
     692            0 :         self.download_object(
     693            0 :             GetObjectRequest {
     694            0 :                 bucket: self.bucket_name.clone(),
     695            0 :                 key: self.relative_path_to_s3_object(from),
     696            0 :                 range: None,
     697            0 :             },
     698            0 :             cancel,
     699            0 :         )
     700            0 :         .await
     701            0 :     }
     702              : 
     703           10 :     async fn download_byte_range(
     704           10 :         &self,
     705           10 :         from: &RemotePath,
     706           10 :         start_inclusive: u64,
     707           10 :         end_exclusive: Option<u64>,
     708           10 :         cancel: &CancellationToken,
     709           10 :     ) -> Result<Download, DownloadError> {
     710            0 :         // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
     711            0 :         // and needs both ends to be exclusive
     712            0 :         let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
     713            0 :         let range = Some(match end_inclusive {
     714            0 :             Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
     715            0 :             None => format!("bytes={start_inclusive}-"),
     716              :         });
     717              : 
     718            0 :         self.download_object(
     719            0 :             GetObjectRequest {
     720            0 :                 bucket: self.bucket_name.clone(),
     721            0 :                 key: self.relative_path_to_s3_object(from),
     722            0 :                 range,
     723            0 :             },
     724            0 :             cancel,
     725            0 :         )
     726            0 :         .await
     727            0 :     }
     728              : 
     729          102 :     async fn delete_objects<'a>(
     730          102 :         &self,
     731          102 :         paths: &'a [RemotePath],
     732          102 :         cancel: &CancellationToken,
     733          102 :     ) -> anyhow::Result<()> {
     734            0 :         let kind = RequestKind::Delete;
     735            0 :         let permit = self.permit(kind, cancel).await?;
     736            0 :         let mut delete_objects = Vec::with_capacity(paths.len());
     737            0 :         for path in paths {
     738            0 :             let obj_id = ObjectIdentifier::builder()
     739            0 :                 .set_key(Some(self.relative_path_to_s3_object(path)))
     740            0 :                 .build()
     741            0 :                 .context("convert path to oid")?;
     742            0 :             delete_objects.push(obj_id);
     743              :         }
     744              : 
     745            0 :         self.delete_oids(&permit, &delete_objects, cancel).await
     746            0 :     }
     747              : 
     748           90 :     async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
     749            0 :         let paths = std::array::from_ref(path);
     750            0 :         self.delete_objects(paths, cancel).await
     751            0 :     }
     752              : 
     753            6 :     async fn time_travel_recover(
     754            6 :         &self,
     755            6 :         prefix: Option<&RemotePath>,
     756            6 :         timestamp: SystemTime,
     757            6 :         done_if_after: SystemTime,
     758            6 :         cancel: &CancellationToken,
     759            6 :     ) -> Result<(), TimeTravelError> {
     760            0 :         let kind = RequestKind::TimeTravel;
     761            0 :         let permit = self.permit(kind, cancel).await?;
     762              : 
     763            0 :         let timestamp = DateTime::from(timestamp);
     764            0 :         let done_if_after = DateTime::from(done_if_after);
     765            0 : 
     766            0 :         tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
     767              : 
     768              :         // get the passed prefix or if it is not set use prefix_in_bucket value
     769            0 :         let prefix = prefix
     770            0 :             .map(|p| self.relative_path_to_s3_object(p))
     771            0 :             .or_else(|| self.prefix_in_bucket.clone());
     772            0 : 
     773            0 :         let warn_threshold = 3;
     774            0 :         let max_retries = 10;
     775            0 :         let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
     776              : 
     777            0 :         let mut key_marker = None;
     778            0 :         let mut version_id_marker = None;
     779            0 :         let mut versions_and_deletes = Vec::new();
     780              : 
     781              :         loop {
     782            0 :             let response = backoff::retry(
     783            0 :                 || async {
     784            0 :                     let op = self
     785            0 :                         .client
     786            0 :                         .list_object_versions()
     787            0 :                         .bucket(self.bucket_name.clone())
     788            0 :                         .set_prefix(prefix.clone())
     789            0 :                         .set_key_marker(key_marker.clone())
     790            0 :                         .set_version_id_marker(version_id_marker.clone())
     791            0 :                         .send();
     792            0 : 
     793            0 :                     tokio::select! {
     794            0 :                         res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
     795            0 :                         _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
     796            0 :                     }
     797            0 :                 },
     798            0 :                 is_permanent,
     799            0 :                 warn_threshold,
     800            0 :                 max_retries,
     801            0 :                 "listing object versions for time_travel_recover",
     802            0 :                 cancel,
     803            0 :             )
     804            0 :             .await
     805            0 :             .ok_or_else(|| TimeTravelError::Cancelled)
     806            0 :             .and_then(|x| x)?;
     807              : 
     808            0 :             tracing::trace!(
     809            0 :                 "  Got List response version_id_marker={:?}, key_marker={:?}",
     810              :                 response.version_id_marker,
     811              :                 response.key_marker
     812              :             );
     813            0 :             let versions = response
     814            0 :                 .versions
     815            0 :                 .unwrap_or_default()
     816            0 :                 .into_iter()
     817            0 :                 .map(VerOrDelete::from_version);
     818            0 :             let deletes = response
     819            0 :                 .delete_markers
     820            0 :                 .unwrap_or_default()
     821            0 :                 .into_iter()
     822            0 :                 .map(VerOrDelete::from_delete_marker);
     823            0 :             itertools::process_results(versions.chain(deletes), |n_vds| {
     824            0 :                 versions_and_deletes.extend(n_vds)
     825            0 :             })
     826            0 :             .map_err(TimeTravelError::Other)?;
     827           12 :             fn none_if_empty(v: Option<String>) -> Option<String> {
     828           12 :                 v.filter(|v| !v.is_empty())
     829           12 :             }
     830            0 :             version_id_marker = none_if_empty(response.next_version_id_marker);
     831            0 :             key_marker = none_if_empty(response.next_key_marker);
     832            0 :             if version_id_marker.is_none() {
     833              :                 // The final response is not supposed to be truncated
     834            0 :                 if response.is_truncated.unwrap_or_default() {
     835            0 :                     return Err(TimeTravelError::Other(anyhow::anyhow!(
     836            0 :                         "Received truncated ListObjectVersions response for prefix={prefix:?}"
     837            0 :                     )));
     838            0 :                 }
     839            0 :                 break;
     840            0 :             }
     841            0 :             // Limit the number of versions deletions, mostly so that we don't
     842            0 :             // keep requesting forever if the list is too long, as we'd put the
     843            0 :             // list in RAM.
     844            0 :             // Building a list of 100k entries that reaches the limit roughly takes
     845            0 :             // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
     846            0 :             const COMPLEXITY_LIMIT: usize = 100_000;
     847            0 :             if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
     848            0 :                 return Err(TimeTravelError::TooManyVersions);
     849            0 :             }
     850              :         }
     851              : 
     852            0 :         tracing::info!(
     853            0 :             "Built list for time travel with {} versions and deletions",
     854            0 :             versions_and_deletes.len()
     855              :         );
     856              : 
     857              :         // Work on the list of references instead of the objects directly,
     858              :         // otherwise we get lifetime errors in the sort_by_key call below.
     859            0 :         let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
     860            0 : 
     861            0 :         versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
     862            0 : 
     863            0 :         let mut vds_for_key = HashMap::<_, Vec<_>>::new();
     864              : 
     865            0 :         for vd in &versions_and_deletes {
     866              :             let VerOrDelete {
     867            0 :                 version_id, key, ..
     868            0 :             } = &vd;
     869            0 :             if version_id == "null" {
     870            0 :                 return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \
     871            0 :                     indicating either disabled versioning, or legacy objects with null version id values")));
     872            0 :             }
     873            0 :             tracing::trace!(
     874            0 :                 "Parsing version key={key} version_id={version_id} kind={:?}",
     875              :                 vd.kind
     876              :             );
     877              : 
     878            0 :             vds_for_key.entry(key).or_default().push(vd);
     879              :         }
     880            0 :         for (key, versions) in vds_for_key {
     881            0 :             let last_vd = versions.last().unwrap();
     882            0 :             if last_vd.last_modified > done_if_after {
     883            0 :                 tracing::trace!("Key {key} has version later than done_if_after, skipping");
     884            0 :                 continue;
     885            0 :             }
     886              :             // the version we want to restore to.
     887            0 :             let version_to_restore_to =
     888            0 :                 match versions.binary_search_by_key(&timestamp, |tpl| tpl.last_modified) {
     889            0 :                     Ok(v) => v,
     890            0 :                     Err(e) => e,
     891              :                 };
     892            0 :             if version_to_restore_to == versions.len() {
     893            0 :                 tracing::trace!("Key {key} has no changes since timestamp, skipping");
     894            0 :                 continue;
     895            0 :             }
     896            0 :             let mut do_delete = false;
     897            0 :             if version_to_restore_to == 0 {
     898              :                 // All versions more recent, so the key didn't exist at the specified time point.
     899            0 :                 tracing::trace!(
     900            0 :                     "All {} versions more recent for {key}, deleting",
     901            0 :                     versions.len()
     902              :                 );
     903            0 :                 do_delete = true;
     904              :             } else {
     905            0 :                 match &versions[version_to_restore_to - 1] {
     906              :                     VerOrDelete {
     907              :                         kind: VerOrDeleteKind::Version,
     908            0 :                         version_id,
     909            0 :                         ..
     910            0 :                     } => {
     911            0 :                         tracing::trace!("Copying old version {version_id} for {key}...");
     912              :                         // Restore the state to the last version by copying
     913            0 :                         let source_id =
     914            0 :                             format!("{}/{key}?versionId={version_id}", self.bucket_name);
     915            0 : 
     916            0 :                         backoff::retry(
     917            0 :                             || async {
     918            0 :                                 let op = self
     919            0 :                                     .client
     920            0 :                                     .copy_object()
     921            0 :                                     .bucket(self.bucket_name.clone())
     922            0 :                                     .key(key)
     923            0 :                                     .set_storage_class(self.upload_storage_class.clone())
     924            0 :                                     .copy_source(&source_id)
     925            0 :                                     .send();
     926            0 : 
     927            0 :                                 tokio::select! {
     928            0 :                                     res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
     929            0 :                                     _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
     930            0 :                                 }
     931            0 :                             },
     932            0 :                             is_permanent,
     933            0 :                             warn_threshold,
     934            0 :                             max_retries,
     935            0 :                             "copying object version for time_travel_recover",
     936            0 :                             cancel,
     937            0 :                         )
     938            0 :                         .await
     939            0 :                         .ok_or_else(|| TimeTravelError::Cancelled)
     940            0 :                         .and_then(|x| x)?;
     941            0 :                         tracing::info!(%version_id, %key, "Copied old version in S3");
     942              :                     }
     943              :                     VerOrDelete {
     944              :                         kind: VerOrDeleteKind::DeleteMarker,
     945              :                         ..
     946            0 :                     } => {
     947            0 :                         do_delete = true;
     948            0 :                     }
     949              :                 }
     950              :             };
     951            0 :             if do_delete {
     952            0 :                 if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
     953              :                     // Key has since been deleted (but there was some history), no need to do anything
     954            0 :                     tracing::trace!("Key {key} already deleted, skipping.");
     955              :                 } else {
     956            0 :                     tracing::trace!("Deleting {key}...");
     957              : 
     958            0 :                     let oid = ObjectIdentifier::builder()
     959            0 :                         .key(key.to_owned())
     960            0 :                         .build()
     961            0 :                         .map_err(|e| TimeTravelError::Other(e.into()))?;
     962              : 
     963            0 :                     self.delete_oids(&permit, &[oid], cancel)
     964            0 :                         .await
     965            0 :                         .map_err(|e| {
     966            0 :                             // delete_oid0 will use TimeoutOrCancel
     967            0 :                             if TimeoutOrCancel::caused_by_cancel(&e) {
     968            0 :                                 TimeTravelError::Cancelled
     969              :                             } else {
     970            0 :                                 TimeTravelError::Other(e)
     971              :                             }
     972            0 :                         })?;
     973              :                 }
     974            0 :             }
     975              :         }
     976            0 :         Ok(())
     977            0 :     }
     978              : }
     979              : 
     980              : /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].
     981          265 : fn start_counting_cancelled_wait(
     982          265 :     kind: RequestKind,
     983          265 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
     984          265 :     scopeguard::guard_on_success(std::time::Instant::now(), move |_| {
     985            0 :         metrics::BUCKET_METRICS.cancelled_waits.get(kind).inc()
     986          265 :     })
     987          265 : }
     988              : 
     989              : /// On drop (cancellation) add time to [`metrics::BucketMetrics::req_seconds`].
     990          271 : fn start_measuring_requests(
     991          271 :     kind: RequestKind,
     992          271 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
     993          271 :     scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| {
     994            0 :         metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     995            0 :             kind,
     996            0 :             AttemptOutcome::Cancelled,
     997            0 :             started_at,
     998            0 :         )
     999          271 :     })
    1000          271 : }
    1001              : 
    1002              : // Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
    1003              : struct VerOrDelete {
    1004              :     kind: VerOrDeleteKind,
    1005              :     last_modified: DateTime,
    1006              :     version_id: String,
    1007              :     key: String,
    1008              : }
    1009              : 
    1010              : #[derive(Debug)]
    1011              : enum VerOrDeleteKind {
    1012              :     Version,
    1013              :     DeleteMarker,
    1014              : }
    1015              : 
    1016              : impl VerOrDelete {
    1017           36 :     fn with_kind(
    1018           36 :         kind: VerOrDeleteKind,
    1019           36 :         last_modified: Option<DateTime>,
    1020           36 :         version_id: Option<String>,
    1021           36 :         key: Option<String>,
    1022           36 :     ) -> anyhow::Result<Self> {
    1023           36 :         let lvk = (last_modified, version_id, key);
    1024           36 :         let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
    1025            0 :             anyhow::bail!(
    1026            0 :                 "One (or more) of last_modified, key, and id is None. \
    1027            0 :             Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
    1028            0 :                 lvk.0,
    1029            0 :                 lvk.1,
    1030            0 :                 lvk.2,
    1031            0 :             );
    1032              :         };
    1033           36 :         Ok(Self {
    1034           36 :             kind,
    1035           36 :             last_modified,
    1036           36 :             version_id,
    1037           36 :             key,
    1038           36 :         })
    1039           36 :     }
    1040           28 :     fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
    1041           28 :         Self::with_kind(
    1042           28 :             VerOrDeleteKind::Version,
    1043           28 :             v.last_modified,
    1044           28 :             v.version_id,
    1045           28 :             v.key,
    1046           28 :         )
    1047           28 :     }
    1048            8 :     fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
    1049            8 :         Self::with_kind(
    1050            8 :             VerOrDeleteKind::DeleteMarker,
    1051            8 :             v.last_modified,
    1052            8 :             v.version_id,
    1053            8 :             v.key,
    1054            8 :         )
    1055            8 :     }
    1056              : }
    1057              : 
    1058              : #[cfg(test)]
    1059              : mod tests {
    1060              :     use camino::Utf8Path;
    1061              :     use std::num::NonZeroUsize;
    1062              : 
    1063              :     use crate::{RemotePath, S3Bucket, S3Config};
    1064              : 
    1065              :     #[test]
    1066            2 :     fn relative_path() {
    1067            2 :         let all_paths = ["", "some/path", "some/path/"];
    1068            2 :         let all_paths: Vec<RemotePath> = all_paths
    1069            2 :             .iter()
    1070            6 :             .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
    1071            2 :             .collect();
    1072            2 :         let prefixes = [
    1073            2 :             None,
    1074            2 :             Some(""),
    1075            2 :             Some("test/prefix"),
    1076            2 :             Some("test/prefix/"),
    1077            2 :             Some("/test/prefix/"),
    1078            2 :         ];
    1079            2 :         let expected_outputs = [
    1080            2 :             vec!["", "some/path", "some/path/"],
    1081            2 :             vec!["/", "/some/path", "/some/path/"],
    1082            2 :             vec![
    1083            2 :                 "test/prefix/",
    1084            2 :                 "test/prefix/some/path",
    1085            2 :                 "test/prefix/some/path/",
    1086            2 :             ],
    1087            2 :             vec![
    1088            2 :                 "test/prefix/",
    1089            2 :                 "test/prefix/some/path",
    1090            2 :                 "test/prefix/some/path/",
    1091            2 :             ],
    1092            2 :             vec![
    1093            2 :                 "test/prefix/",
    1094            2 :                 "test/prefix/some/path",
    1095            2 :                 "test/prefix/some/path/",
    1096            2 :             ],
    1097            2 :         ];
    1098              : 
    1099           10 :         for (prefix_idx, prefix) in prefixes.iter().enumerate() {
    1100           10 :             let config = S3Config {
    1101           10 :                 bucket_name: "bucket".to_owned(),
    1102           10 :                 bucket_region: "region".to_owned(),
    1103           10 :                 prefix_in_bucket: prefix.map(str::to_string),
    1104           10 :                 endpoint: None,
    1105           10 :                 concurrency_limit: NonZeroUsize::new(100).unwrap(),
    1106           10 :                 max_keys_per_list_response: Some(5),
    1107           10 :                 upload_storage_class: None,
    1108           10 :             };
    1109           10 :             let storage =
    1110           10 :                 S3Bucket::new(&config, std::time::Duration::ZERO).expect("remote storage init");
    1111           30 :             for (test_path_idx, test_path) in all_paths.iter().enumerate() {
    1112           30 :                 let result = storage.relative_path_to_s3_object(test_path);
    1113           30 :                 let expected = expected_outputs[prefix_idx][test_path_idx];
    1114           30 :                 assert_eq!(result, expected);
    1115              :             }
    1116              :         }
    1117            2 :     }
    1118              : }
        

Generated by: LCOV version 2.1-beta