LCOV - code coverage report
Current view: top level - libs/remote_storage/src - s3_bucket.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 94.8 % 444 421
Test Date: 2023-09-06 10:18:01 Functions: 69.4 % 62 43

            Line data    Source code
       1              : //! AWS S3 storage wrapper around `rusoto` library.
       2              : //!
       3              : //! Respects `prefix_in_bucket` property from [`S3Config`],
       4              : //! allowing multiple api users to independently work with the same S3 bucket, if
       5              : //! their bucket prefixes are both specified and different.
       6              : 
       7              : use std::sync::Arc;
       8              : 
       9              : use anyhow::Context;
      10              : use aws_config::{
      11              :     environment::credentials::EnvironmentVariableCredentialsProvider,
      12              :     imds::credentials::ImdsCredentialsProvider, meta::credentials::CredentialsProviderChain,
      13              :     provider_config::ProviderConfig, web_identity_token::WebIdentityTokenCredentialsProvider,
      14              : };
      15              : use aws_credential_types::cache::CredentialsCache;
      16              : use aws_sdk_s3::{
      17              :     config::{Config, Region},
      18              :     error::SdkError,
      19              :     operation::get_object::GetObjectError,
      20              :     primitives::ByteStream,
      21              :     types::{Delete, ObjectIdentifier},
      22              :     Client,
      23              : };
      24              : use aws_smithy_http::body::SdkBody;
      25              : use hyper::Body;
      26              : use scopeguard::ScopeGuard;
      27              : use tokio::{
      28              :     io::{self, AsyncRead},
      29              :     sync::Semaphore,
      30              : };
      31              : use tokio_util::io::ReaderStream;
      32              : use tracing::debug;
      33              : 
      34              : use super::StorageMetadata;
      35              : use crate::{
      36              :     Download, DownloadError, RemotePath, RemoteStorage, S3Config, REMOTE_STORAGE_PREFIX_SEPARATOR,
      37              : };
      38              : 
      39              : const MAX_DELETE_OBJECTS_REQUEST_SIZE: usize = 1000;
      40              : 
      41              : pub(super) mod metrics;
      42              : 
      43              : use self::metrics::{AttemptOutcome, RequestKind};
      44              : 
      45              : /// AWS S3 storage.
      46              : pub struct S3Bucket {
      47              :     client: Client,
      48              :     bucket_name: String,
      49              :     prefix_in_bucket: Option<String>,
      50              :     max_keys_per_list_response: Option<i32>,
      51              :     // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
      52              :     // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
      53              :     // The helps to ensure we don't exceed the thresholds.
      54              :     concurrency_limiter: Arc<Semaphore>,
      55              : }
      56              : 
      57            0 : #[derive(Default)]
      58              : struct GetObjectRequest {
      59              :     bucket: String,
      60              :     key: String,
      61              :     range: Option<String>,
      62              : }
      63              : impl S3Bucket {
      64              :     /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
      65          241 :     pub fn new(aws_config: &S3Config) -> anyhow::Result<Self> {
      66          241 :         debug!(
      67            0 :             "Creating s3 remote storage for S3 bucket {}",
      68            0 :             aws_config.bucket_name
      69            0 :         );
      70              : 
      71          241 :         let region = Some(Region::new(aws_config.bucket_region.clone()));
      72          241 : 
      73          241 :         let credentials_provider = {
      74          241 :             // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
      75          241 :             CredentialsProviderChain::first_try(
      76          241 :                 "env",
      77          241 :                 EnvironmentVariableCredentialsProvider::new(),
      78          241 :             )
      79          241 :             // uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
      80          241 :             // needed to access remote extensions bucket
      81          241 :             .or_else("token", {
      82          241 :                 let provider_conf = ProviderConfig::without_region().with_region(region.clone());
      83          241 : 
      84          241 :                 WebIdentityTokenCredentialsProvider::builder()
      85          241 :                     .configure(&provider_conf)
      86          241 :                     .build()
      87          241 :             })
      88          241 :             // uses imds v2
      89          241 :             .or_else("imds", ImdsCredentialsProvider::builder().build())
      90          241 :         };
      91          241 : 
      92          241 :         let mut config_builder = Config::builder()
      93          241 :             .region(region)
      94          241 :             .credentials_cache(CredentialsCache::lazy())
      95          241 :             .credentials_provider(credentials_provider);
      96              : 
      97          241 :         if let Some(custom_endpoint) = aws_config.endpoint.clone() {
      98          114 :             config_builder = config_builder
      99          114 :                 .endpoint_url(custom_endpoint)
     100          114 :                 .force_path_style(true);
     101          127 :         }
     102          241 :         let client = Client::from_conf(config_builder.build());
     103          241 : 
     104          241 :         let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| {
     105          240 :             let mut prefix = prefix;
     106          241 :             while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     107            1 :                 prefix = &prefix[1..]
     108              :             }
     109              : 
     110          240 :             let mut prefix = prefix.to_string();
     111          242 :             while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     112            2 :                 prefix.pop();
     113            2 :             }
     114          240 :             prefix
     115          241 :         });
     116          241 :         Ok(Self {
     117          241 :             client,
     118          241 :             bucket_name: aws_config.bucket_name.clone(),
     119          241 :             max_keys_per_list_response: aws_config.max_keys_per_list_response,
     120          241 :             prefix_in_bucket,
     121          241 :             concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())),
     122          241 :         })
     123          241 :     }
     124              : 
     125         1098 :     fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
     126         1098 :         let relative_path =
     127         1098 :             match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
     128         1098 :                 Some(stripped) => stripped,
     129              :                 // we rely on AWS to return properly prefixed paths
     130              :                 // for requests with a certain prefix
     131            0 :                 None => panic!(
     132            0 :                     "Key {} does not start with bucket prefix {:?}",
     133            0 :                     key, self.prefix_in_bucket
     134            0 :                 ),
     135              :             };
     136         1098 :         RemotePath(
     137         1098 :             relative_path
     138         1098 :                 .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     139         1098 :                 .collect(),
     140         1098 :         )
     141         1098 :     }
     142              : 
     143        18844 :     pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
     144        18844 :         assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
     145        18844 :         let path_string = path
     146        18844 :             .get_path()
     147        18844 :             .to_string_lossy()
     148        18844 :             .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR)
     149        18844 :             .to_string();
     150        18844 :         match &self.prefix_in_bucket {
     151        18841 :             Some(prefix) => prefix.clone() + "/" + &path_string,
     152            3 :             None => path_string,
     153              :         }
     154        18844 :     }
     155              : 
     156        17346 :     async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> {
     157        17346 :         let started_at = start_counting_cancelled_wait(kind);
     158        17346 :         let permit = self
     159        17346 :             .concurrency_limiter
     160        17346 :             .acquire()
     161            1 :             .await
     162        17346 :             .expect("semaphore is never closed");
     163        17346 : 
     164        17346 :         let started_at = ScopeGuard::into_inner(started_at);
     165        17346 :         metrics::BUCKET_METRICS
     166        17346 :             .wait_seconds
     167        17346 :             .observe_elapsed(kind, started_at);
     168        17346 : 
     169        17346 :         permit
     170        17346 :     }
     171              : 
     172          761 :     async fn owned_permit(&self, kind: RequestKind) -> tokio::sync::OwnedSemaphorePermit {
     173          761 :         let started_at = start_counting_cancelled_wait(kind);
     174          761 :         let permit = self
     175          761 :             .concurrency_limiter
     176          761 :             .clone()
     177          761 :             .acquire_owned()
     178            0 :             .await
     179          761 :             .expect("semaphore is never closed");
     180          761 : 
     181          761 :         let started_at = ScopeGuard::into_inner(started_at);
     182          761 :         metrics::BUCKET_METRICS
     183          761 :             .wait_seconds
     184          761 :             .observe_elapsed(kind, started_at);
     185          761 :         permit
     186          761 :     }
     187              : 
     188          761 :     async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
     189          761 :         let kind = RequestKind::Get;
     190          761 :         let permit = self.owned_permit(kind).await;
     191              : 
     192          761 :         let started_at = start_measuring_requests(kind);
     193              : 
     194          761 :         let get_object = self
     195          761 :             .client
     196          761 :             .get_object()
     197          761 :             .bucket(request.bucket)
     198          761 :             .key(request.key)
     199          761 :             .set_range(request.range)
     200          761 :             .send()
     201         2736 :             .await;
     202              : 
     203          761 :         let started_at = ScopeGuard::into_inner(started_at);
     204          761 : 
     205          761 :         if get_object.is_err() {
     206          239 :             metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     207          239 :                 kind,
     208          239 :                 AttemptOutcome::Err,
     209          239 :                 started_at,
     210          239 :             );
     211          522 :         }
     212              : 
     213          239 :         match get_object {
     214          522 :             Ok(object_output) => {
     215          522 :                 let metadata = object_output.metadata().cloned().map(StorageMetadata);
     216          522 :                 Ok(Download {
     217          522 :                     metadata,
     218          522 :                     download_stream: Box::pin(io::BufReader::new(TimedDownload::new(
     219          522 :                         started_at,
     220          522 :                         RatelimitedAsyncRead::new(permit, object_output.body.into_async_read()),
     221          522 :                     ))),
     222          522 :                 })
     223              :             }
     224          239 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
     225          239 :                 Err(DownloadError::NotFound)
     226              :             }
     227            0 :             Err(e) => Err(DownloadError::Other(
     228            0 :                 anyhow::Error::new(e).context("download s3 object"),
     229            0 :             )),
     230              :         }
     231          761 :     }
     232              : }
     233              : 
     234              : pin_project_lite::pin_project! {
     235              :     /// An `AsyncRead` adapter which carries a permit for the lifetime of the value.
     236              :     struct RatelimitedAsyncRead<S> {
     237              :         permit: tokio::sync::OwnedSemaphorePermit,
     238              :         #[pin]
     239              :         inner: S,
     240              :     }
     241              : }
     242              : 
     243              : impl<S: AsyncRead> RatelimitedAsyncRead<S> {
     244          522 :     fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self {
     245          522 :         RatelimitedAsyncRead { permit, inner }
     246          522 :     }
     247              : }
     248              : 
     249              : impl<S: AsyncRead> AsyncRead for RatelimitedAsyncRead<S> {
     250       169160 :     fn poll_read(
     251       169160 :         self: std::pin::Pin<&mut Self>,
     252       169160 :         cx: &mut std::task::Context<'_>,
     253       169160 :         buf: &mut io::ReadBuf<'_>,
     254       169160 :     ) -> std::task::Poll<std::io::Result<()>> {
     255       169160 :         let this = self.project();
     256       169160 :         this.inner.poll_read(cx, buf)
     257       169160 :     }
     258              : }
     259              : 
     260              : pin_project_lite::pin_project! {
     261              :     /// Times and tracks the outcome of the request.
     262              :     struct TimedDownload<S> {
     263              :         started_at: std::time::Instant,
     264              :         outcome: metrics::AttemptOutcome,
     265              :         #[pin]
     266              :         inner: S
     267              :     }
     268              : 
     269              :     impl<S> PinnedDrop for TimedDownload<S> {
     270              :         fn drop(mut this: Pin<&mut Self>) {
     271              :             metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
     272              :         }
     273              :     }
     274              : }
     275              : 
     276              : impl<S: AsyncRead> TimedDownload<S> {
     277          522 :     fn new(started_at: std::time::Instant, inner: S) -> Self {
     278          522 :         TimedDownload {
     279          522 :             started_at,
     280          522 :             outcome: metrics::AttemptOutcome::Cancelled,
     281          522 :             inner,
     282          522 :         }
     283          522 :     }
     284              : }
     285              : 
     286              : impl<S: AsyncRead> AsyncRead for TimedDownload<S> {
     287       169160 :     fn poll_read(
     288       169160 :         self: std::pin::Pin<&mut Self>,
     289       169160 :         cx: &mut std::task::Context<'_>,
     290       169160 :         buf: &mut io::ReadBuf<'_>,
     291       169160 :     ) -> std::task::Poll<std::io::Result<()>> {
     292       169160 :         let this = self.project();
     293       169160 :         let before = buf.filled().len();
     294       169160 :         let read = std::task::ready!(this.inner.poll_read(cx, buf));
     295              : 
     296       133819 :         let read_eof = buf.filled().len() == before;
     297              : 
     298       133819 :         match read {
     299          512 :             Ok(()) if read_eof => *this.outcome = AttemptOutcome::Ok,
     300       133307 :             Ok(()) => { /* still in progress */ }
     301            0 :             Err(_) => *this.outcome = AttemptOutcome::Err,
     302              :         }
     303              : 
     304       133819 :         std::task::Poll::Ready(read)
     305       169160 :     }
     306              : }
     307              : 
     308              : #[async_trait::async_trait]
     309              : impl RemoteStorage for S3Bucket {
     310              :     /// See the doc for `RemoteStorage::list_prefixes`
     311              :     /// Note: it wont include empty "directories"
     312           20 :     async fn list_prefixes(
     313           20 :         &self,
     314           20 :         prefix: Option<&RemotePath>,
     315           20 :     ) -> Result<Vec<RemotePath>, DownloadError> {
     316           20 :         let kind = RequestKind::List;
     317           20 : 
     318           20 :         // get the passed prefix or if it is not set use prefix_in_bucket value
     319           20 :         let list_prefix = prefix
     320           20 :             .map(|p| self.relative_path_to_s3_object(p))
     321           20 :             .or_else(|| self.prefix_in_bucket.clone())
     322           20 :             .map(|mut p| {
     323           20 :                 // required to end with a separator
     324           20 :                 // otherwise request will return only the entry of a prefix
     325           20 :                 if !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     326           20 :                     p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
     327           20 :                 }
     328           20 :                 p
     329           20 :             });
     330           20 : 
     331           20 :         let mut document_keys = Vec::new();
     332           20 : 
     333           20 :         let mut continuation_token = None;
     334              : 
     335              :         loop {
     336           20 :             let _guard = self.permit(kind).await;
     337           20 :             let started_at = start_measuring_requests(kind);
     338              : 
     339           20 :             let fetch_response = self
     340           20 :                 .client
     341           20 :                 .list_objects_v2()
     342           20 :                 .bucket(self.bucket_name.clone())
     343           20 :                 .set_prefix(list_prefix.clone())
     344           20 :                 .set_continuation_token(continuation_token)
     345           20 :                 .delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string())
     346           20 :                 .set_max_keys(self.max_keys_per_list_response)
     347           20 :                 .send()
     348           65 :                 .await
     349           20 :                 .context("Failed to list S3 prefixes")
     350           20 :                 .map_err(DownloadError::Other);
     351           20 : 
     352           20 :             let started_at = ScopeGuard::into_inner(started_at);
     353           20 : 
     354           20 :             metrics::BUCKET_METRICS
     355           20 :                 .req_seconds
     356           20 :                 .observe_elapsed(kind, &fetch_response, started_at);
     357              : 
     358           20 :             let fetch_response = fetch_response?;
     359              : 
     360           20 :             document_keys.extend(
     361           20 :                 fetch_response
     362           20 :                     .common_prefixes
     363           20 :                     .unwrap_or_default()
     364           20 :                     .into_iter()
     365           30 :                     .filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))),
     366           20 :             );
     367              : 
     368           20 :             continuation_token = match fetch_response.next_continuation_token {
     369            0 :                 Some(new_token) => Some(new_token),
     370           20 :                 None => break,
     371           20 :             };
     372           20 :         }
     373           20 : 
     374           20 :         Ok(document_keys)
     375           40 :     }
     376              : 
     377              :     /// See the doc for `RemoteStorage::list_files`
     378          188 :     async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
     379          188 :         let kind = RequestKind::List;
     380          188 : 
     381          188 :         let folder_name = folder
     382          188 :             .map(|p| self.relative_path_to_s3_object(p))
     383          188 :             .or_else(|| self.prefix_in_bucket.clone());
     384          188 : 
     385          188 :         // AWS may need to break the response into several parts
     386          188 :         let mut continuation_token = None;
     387          188 :         let mut all_files = vec![];
     388              :         loop {
     389          188 :             let _guard = self.permit(kind).await;
     390          188 :             let started_at = start_measuring_requests(kind);
     391              : 
     392          188 :             let response = self
     393          188 :                 .client
     394          188 :                 .list_objects_v2()
     395          188 :                 .bucket(self.bucket_name.clone())
     396          188 :                 .set_prefix(folder_name.clone())
     397          188 :                 .set_continuation_token(continuation_token)
     398          188 :                 .set_max_keys(self.max_keys_per_list_response)
     399          188 :                 .send()
     400         1142 :                 .await
     401          188 :                 .context("Failed to list files in S3 bucket");
     402          188 : 
     403          188 :             let started_at = ScopeGuard::into_inner(started_at);
     404          188 :             metrics::BUCKET_METRICS
     405          188 :                 .req_seconds
     406          188 :                 .observe_elapsed(kind, &response, started_at);
     407              : 
     408          188 :             let response = response?;
     409              : 
     410         1068 :             for object in response.contents().unwrap_or_default() {
     411         1068 :                 let object_path = object.key().expect("response does not contain a key");
     412         1068 :                 let remote_path = self.s3_object_to_relative_path(object_path);
     413         1068 :                 all_files.push(remote_path);
     414         1068 :             }
     415          188 :             match response.next_continuation_token {
     416            0 :                 Some(new_token) => continuation_token = Some(new_token),
     417          188 :                 None => break,
     418          188 :             }
     419          188 :         }
     420          188 :         Ok(all_files)
     421          376 :     }
     422              : 
     423        10490 :     async fn upload(
     424        10490 :         &self,
     425        10490 :         from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
     426        10490 :         from_size_bytes: usize,
     427        10490 :         to: &RemotePath,
     428        10490 :         metadata: Option<StorageMetadata>,
     429        10490 :     ) -> anyhow::Result<()> {
     430        10490 :         let kind = RequestKind::Put;
     431        10490 :         let _guard = self.permit(kind).await;
     432              : 
     433        10490 :         let started_at = start_measuring_requests(kind);
     434        10490 : 
     435        10490 :         let body = Body::wrap_stream(ReaderStream::new(from));
     436        10490 :         let bytes_stream = ByteStream::new(SdkBody::from(body));
     437              : 
     438        10490 :         let res = self
     439        10490 :             .client
     440        10490 :             .put_object()
     441        10490 :             .bucket(self.bucket_name.clone())
     442        10490 :             .key(self.relative_path_to_s3_object(to))
     443        10490 :             .set_metadata(metadata.map(|m| m.0))
     444        10490 :             .content_length(from_size_bytes.try_into()?)
     445        10490 :             .body(bytes_stream)
     446        10490 :             .send()
     447        37479 :             .await;
     448              : 
     449        10482 :         let started_at = ScopeGuard::into_inner(started_at);
     450        10482 :         metrics::BUCKET_METRICS
     451        10482 :             .req_seconds
     452        10482 :             .observe_elapsed(kind, &res, started_at);
     453        10482 : 
     454        10482 :         res?;
     455              : 
     456        10481 :         Ok(())
     457        20972 :     }
     458              : 
     459          757 :     async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
     460              :         // if prefix is not none then download file `prefix/from`
     461              :         // if prefix is none then download file `from`
     462          757 :         self.download_object(GetObjectRequest {
     463          757 :             bucket: self.bucket_name.clone(),
     464          757 :             key: self.relative_path_to_s3_object(from),
     465          757 :             range: None,
     466          757 :         })
     467         2719 :         .await
     468         1514 :     }
     469              : 
     470            4 :     async fn download_byte_range(
     471            4 :         &self,
     472            4 :         from: &RemotePath,
     473            4 :         start_inclusive: u64,
     474            4 :         end_exclusive: Option<u64>,
     475            4 :     ) -> Result<Download, DownloadError> {
     476              :         // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
     477              :         // and needs both ends to be exclusive
     478            4 :         let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
     479            4 :         let range = Some(match end_inclusive {
     480            0 :             Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
     481            4 :             None => format!("bytes={start_inclusive}-"),
     482              :         });
     483              : 
     484            4 :         self.download_object(GetObjectRequest {
     485            4 :             bucket: self.bucket_name.clone(),
     486            4 :             key: self.relative_path_to_s3_object(from),
     487            4 :             range,
     488            4 :         })
     489           17 :         .await
     490            8 :     }
     491         6648 :     async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
     492         6648 :         let kind = RequestKind::Delete;
     493         6648 :         let _guard = self.permit(kind).await;
     494              : 
     495         6648 :         let mut delete_objects = Vec::with_capacity(paths.len());
     496        14018 :         for path in paths {
     497         7370 :             let obj_id = ObjectIdentifier::builder()
     498         7370 :                 .set_key(Some(self.relative_path_to_s3_object(path)))
     499         7370 :                 .build();
     500         7370 :             delete_objects.push(obj_id);
     501         7370 :         }
     502              : 
     503         6648 :         for chunk in delete_objects.chunks(MAX_DELETE_OBJECTS_REQUEST_SIZE) {
     504         6648 :             let started_at = start_measuring_requests(kind);
     505              : 
     506         6648 :             let resp = self
     507         6648 :                 .client
     508         6648 :                 .delete_objects()
     509         6648 :                 .bucket(self.bucket_name.clone())
     510         6648 :                 .delete(Delete::builder().set_objects(Some(chunk.to_vec())).build())
     511         6648 :                 .send()
     512        30692 :                 .await;
     513              : 
     514         6648 :             let started_at = ScopeGuard::into_inner(started_at);
     515         6648 :             metrics::BUCKET_METRICS
     516         6648 :                 .req_seconds
     517         6648 :                 .observe_elapsed(kind, &resp, started_at);
     518         6648 : 
     519         6648 :             match resp {
     520         6648 :                 Ok(resp) => {
     521         6648 :                     metrics::BUCKET_METRICS
     522         6648 :                         .deleted_objects_total
     523         6648 :                         .inc_by(chunk.len() as u64);
     524         6648 :                     if let Some(errors) = resp.errors {
     525            0 :                         return Err(anyhow::format_err!(
     526            0 :                             "Failed to delete {} objects",
     527            0 :                             errors.len()
     528            0 :                         ));
     529         6648 :                     }
     530              :                 }
     531            0 :                 Err(e) => {
     532            0 :                     return Err(e.into());
     533              :                 }
     534              :             }
     535              :         }
     536         6648 :         Ok(())
     537        13296 :     }
     538              : 
     539         6610 :     async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
     540         6610 :         let paths = std::array::from_ref(path);
     541        30569 :         self.delete_objects(paths).await
     542        13220 :     }
     543              : }
     544              : 
     545              : /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].
     546        18107 : fn start_counting_cancelled_wait(
     547        18107 :     kind: RequestKind,
     548        18107 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
     549        18107 :     scopeguard::guard_on_success(std::time::Instant::now(), move |_| {
     550            0 :         metrics::BUCKET_METRICS.cancelled_waits.get(kind).inc()
     551        18107 :     })
     552        18107 : }
     553              : 
     554              : /// On drop (cancellation) add time to [`metrics::BucketMetrics::req_seconds`].
     555        18107 : fn start_measuring_requests(
     556        18107 :     kind: RequestKind,
     557        18107 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
     558        18107 :     scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| {
     559            3 :         metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     560            3 :             kind,
     561            3 :             AttemptOutcome::Cancelled,
     562            3 :             started_at,
     563            3 :         )
     564        18107 :     })
     565        18107 : }
     566              : 
     567              : #[cfg(test)]
     568              : mod tests {
     569              :     use std::num::NonZeroUsize;
     570              :     use std::path::Path;
     571              : 
     572              :     use crate::{RemotePath, S3Bucket, S3Config};
     573              : 
     574            1 :     #[test]
     575            1 :     fn relative_path() {
     576            1 :         let all_paths = vec!["", "some/path", "some/path/"];
     577            1 :         let all_paths: Vec<RemotePath> = all_paths
     578            1 :             .iter()
     579            3 :             .map(|x| RemotePath::new(Path::new(x)).expect("bad path"))
     580            1 :             .collect();
     581            1 :         let prefixes = [
     582            1 :             None,
     583            1 :             Some(""),
     584            1 :             Some("test/prefix"),
     585            1 :             Some("test/prefix/"),
     586            1 :             Some("/test/prefix/"),
     587            1 :         ];
     588            1 :         let expected_outputs = vec![
     589            1 :             vec!["", "some/path", "some/path"],
     590            1 :             vec!["/", "/some/path", "/some/path"],
     591            1 :             vec![
     592            1 :                 "test/prefix/",
     593            1 :                 "test/prefix/some/path",
     594            1 :                 "test/prefix/some/path",
     595            1 :             ],
     596            1 :             vec![
     597            1 :                 "test/prefix/",
     598            1 :                 "test/prefix/some/path",
     599            1 :                 "test/prefix/some/path",
     600            1 :             ],
     601            1 :             vec![
     602            1 :                 "test/prefix/",
     603            1 :                 "test/prefix/some/path",
     604            1 :                 "test/prefix/some/path",
     605            1 :             ],
     606            1 :         ];
     607              : 
     608            5 :         for (prefix_idx, prefix) in prefixes.iter().enumerate() {
     609            5 :             let config = S3Config {
     610            5 :                 bucket_name: "bucket".to_owned(),
     611            5 :                 bucket_region: "region".to_owned(),
     612            5 :                 prefix_in_bucket: prefix.map(str::to_string),
     613            5 :                 endpoint: None,
     614            5 :                 concurrency_limit: NonZeroUsize::new(100).unwrap(),
     615            5 :                 max_keys_per_list_response: Some(5),
     616            5 :             };
     617            5 :             let storage = S3Bucket::new(&config).expect("remote storage init");
     618           15 :             for (test_path_idx, test_path) in all_paths.iter().enumerate() {
     619           15 :                 let result = storage.relative_path_to_s3_object(test_path);
     620           15 :                 let expected = expected_outputs[prefix_idx][test_path_idx];
     621           15 :                 assert_eq!(result, expected);
     622              :             }
     623              :         }
     624            1 :     }
     625              : }
        

Generated by: LCOV version 2.1-beta