LCOV - differential code coverage report
Current view: top level - libs/remote_storage/src - s3_bucket.rs (source / functions) Coverage Total Hit LBC UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 92.2 % 449 414 11 24 414
Current Date: 2023-10-19 02:04:12 Functions: 66.7 % 63 42 1 20 42
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! AWS S3 storage wrapper around `rusoto` library.
       2                 : //!
       3                 : //! Respects `prefix_in_bucket` property from [`S3Config`],
       4                 : //! allowing multiple api users to independently work with the same S3 bucket, if
       5                 : //! their bucket prefixes are both specified and different.
       6                 : 
       7                 : use std::borrow::Cow;
       8                 : 
       9                 : use anyhow::Context;
      10                 : use aws_config::{
      11                 :     environment::credentials::EnvironmentVariableCredentialsProvider,
      12                 :     imds::credentials::ImdsCredentialsProvider, meta::credentials::CredentialsProviderChain,
      13                 :     provider_config::ProviderConfig, web_identity_token::WebIdentityTokenCredentialsProvider,
      14                 : };
      15                 : use aws_credential_types::cache::CredentialsCache;
      16                 : use aws_sdk_s3::{
      17                 :     config::{Config, Region},
      18                 :     error::SdkError,
      19                 :     operation::get_object::GetObjectError,
      20                 :     primitives::ByteStream,
      21                 :     types::{Delete, ObjectIdentifier},
      22                 :     Client,
      23                 : };
      24                 : use aws_smithy_http::body::SdkBody;
      25                 : use hyper::Body;
      26                 : use scopeguard::ScopeGuard;
      27                 : use tokio::io::{self, AsyncRead};
      28                 : use tokio_util::io::ReaderStream;
      29                 : use tracing::debug;
      30                 : 
      31                 : use super::StorageMetadata;
      32                 : use crate::{
      33                 :     ConcurrencyLimiter, Download, DownloadError, RemotePath, RemoteStorage, S3Config,
      34                 :     MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
      35                 : };
      36                 : 
      37                 : pub(super) mod metrics;
      38                 : 
      39                 : use self::metrics::AttemptOutcome;
      40                 : pub(super) use self::metrics::RequestKind;
      41                 : 
      42                 : /// AWS S3 storage.
      43                 : pub struct S3Bucket {
      44                 :     client: Client,
      45                 :     bucket_name: String,
      46                 :     prefix_in_bucket: Option<String>,
      47                 :     max_keys_per_list_response: Option<i32>,
      48                 :     concurrency_limiter: ConcurrencyLimiter,
      49                 : }
      50                 : 
      51 UBC           0 : #[derive(Default)]
      52                 : struct GetObjectRequest {
      53                 :     bucket: String,
      54                 :     key: String,
      55                 :     range: Option<String>,
      56                 : }
      57                 : impl S3Bucket {
      58                 :     /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
      59 CBC         270 :     pub fn new(aws_config: &S3Config) -> anyhow::Result<Self> {
      60             270 :         debug!(
      61 UBC           0 :             "Creating s3 remote storage for S3 bucket {}",
      62               0 :             aws_config.bucket_name
      63               0 :         );
      64                 : 
      65 CBC         270 :         let region = Some(Region::new(aws_config.bucket_region.clone()));
      66             270 : 
      67             270 :         let credentials_provider = {
      68             270 :             // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
      69             270 :             CredentialsProviderChain::first_try(
      70             270 :                 "env",
      71             270 :                 EnvironmentVariableCredentialsProvider::new(),
      72             270 :             )
      73             270 :             // uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
      74             270 :             // needed to access remote extensions bucket
      75             270 :             .or_else("token", {
      76             270 :                 let provider_conf = ProviderConfig::without_region().with_region(region.clone());
      77             270 : 
      78             270 :                 WebIdentityTokenCredentialsProvider::builder()
      79             270 :                     .configure(&provider_conf)
      80             270 :                     .build()
      81             270 :             })
      82             270 :             // uses imds v2
      83             270 :             .or_else("imds", ImdsCredentialsProvider::builder().build())
      84             270 :         };
      85             270 : 
      86             270 :         let mut config_builder = Config::builder()
      87             270 :             .region(region)
      88             270 :             .credentials_cache(CredentialsCache::lazy())
      89             270 :             .credentials_provider(credentials_provider);
      90                 : 
      91             270 :         if let Some(custom_endpoint) = aws_config.endpoint.clone() {
      92             137 :             config_builder = config_builder
      93             137 :                 .endpoint_url(custom_endpoint)
      94             137 :                 .force_path_style(true);
      95             137 :         }
      96             270 :         let client = Client::from_conf(config_builder.build());
      97             270 : 
      98             270 :         let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| {
      99             269 :             let mut prefix = prefix;
     100             270 :             while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     101               1 :                 prefix = &prefix[1..]
     102                 :             }
     103                 : 
     104             269 :             let mut prefix = prefix.to_string();
     105             271 :             while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     106               2 :                 prefix.pop();
     107               2 :             }
     108             269 :             prefix
     109             270 :         });
     110             270 :         Ok(Self {
     111             270 :             client,
     112             270 :             bucket_name: aws_config.bucket_name.clone(),
     113             270 :             max_keys_per_list_response: aws_config.max_keys_per_list_response,
     114             270 :             prefix_in_bucket,
     115             270 :             concurrency_limiter: ConcurrencyLimiter::new(aws_config.concurrency_limit.get()),
     116             270 :         })
     117             270 :     }
     118                 : 
     119            1304 :     fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
     120            1304 :         let relative_path =
     121            1304 :             match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
     122            1304 :                 Some(stripped) => stripped,
     123                 :                 // we rely on AWS to return properly prefixed paths
     124                 :                 // for requests with a certain prefix
     125 UBC           0 :                 None => panic!(
     126               0 :                     "Key {} does not start with bucket prefix {:?}",
     127               0 :                     key, self.prefix_in_bucket
     128               0 :                 ),
     129                 :             };
     130 CBC        1304 :         RemotePath(
     131            1304 :             relative_path
     132            1304 :                 .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     133            1304 :                 .collect(),
     134            1304 :         )
     135            1304 :     }
     136                 : 
     137           24181 :     pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
     138           24181 :         assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
     139           24181 :         let path_string = path
     140           24181 :             .get_path()
     141           24181 :             .as_str()
     142           24181 :             .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR);
     143           24181 :         match &self.prefix_in_bucket {
     144           24178 :             Some(prefix) => prefix.clone() + "/" + path_string,
     145               3 :             None => path_string.to_string(),
     146                 :         }
     147           24181 :     }
     148                 : 
     149           16943 :     async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> {
     150           16943 :         let started_at = start_counting_cancelled_wait(kind);
     151           16943 :         let permit = self
     152           16943 :             .concurrency_limiter
     153           16943 :             .acquire(kind)
     154            2412 :             .await
     155           16943 :             .expect("semaphore is never closed");
     156           16943 : 
     157           16943 :         let started_at = ScopeGuard::into_inner(started_at);
     158           16943 :         metrics::BUCKET_METRICS
     159           16943 :             .wait_seconds
     160           16943 :             .observe_elapsed(kind, started_at);
     161           16943 : 
     162           16943 :         permit
     163           16943 :     }
     164                 : 
     165            1198 :     async fn owned_permit(&self, kind: RequestKind) -> tokio::sync::OwnedSemaphorePermit {
     166            1198 :         let started_at = start_counting_cancelled_wait(kind);
     167            1198 :         let permit = self
     168            1198 :             .concurrency_limiter
     169            1198 :             .acquire_owned(kind)
     170 UBC           0 :             .await
     171 CBC        1198 :             .expect("semaphore is never closed");
     172            1198 : 
     173            1198 :         let started_at = ScopeGuard::into_inner(started_at);
     174            1198 :         metrics::BUCKET_METRICS
     175            1198 :             .wait_seconds
     176            1198 :             .observe_elapsed(kind, started_at);
     177            1198 :         permit
     178            1198 :     }
     179                 : 
     180            1198 :     async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
     181            1198 :         let kind = RequestKind::Get;
     182            1198 :         let permit = self.owned_permit(kind).await;
     183                 : 
     184            1198 :         let started_at = start_measuring_requests(kind);
     185                 : 
     186            1198 :         let get_object = self
     187            1198 :             .client
     188            1198 :             .get_object()
     189            1198 :             .bucket(request.bucket)
     190            1198 :             .key(request.key)
     191            1198 :             .set_range(request.range)
     192            1198 :             .send()
     193            4079 :             .await;
     194                 : 
     195            1198 :         let started_at = ScopeGuard::into_inner(started_at);
     196            1198 : 
     197            1198 :         if get_object.is_err() {
     198             286 :             metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     199             286 :                 kind,
     200             286 :                 AttemptOutcome::Err,
     201             286 :                 started_at,
     202             286 :             );
     203             912 :         }
     204                 : 
     205             286 :         match get_object {
     206             912 :             Ok(object_output) => {
     207             912 :                 let metadata = object_output.metadata().cloned().map(StorageMetadata);
     208             912 :                 Ok(Download {
     209             912 :                     metadata,
     210             912 :                     download_stream: Box::pin(io::BufReader::new(TimedDownload::new(
     211             912 :                         started_at,
     212             912 :                         RatelimitedAsyncRead::new(permit, object_output.body.into_async_read()),
     213             912 :                     ))),
     214             912 :                 })
     215                 :             }
     216             286 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
     217             286 :                 Err(DownloadError::NotFound)
     218                 :             }
     219 UBC           0 :             Err(e) => Err(DownloadError::Other(
     220               0 :                 anyhow::Error::new(e).context("download s3 object"),
     221               0 :             )),
     222                 :         }
     223 CBC        1198 :     }
     224                 : }
     225                 : 
     226                 : pin_project_lite::pin_project! {
     227                 :     /// An `AsyncRead` adapter which carries a permit for the lifetime of the value.
     228                 :     struct RatelimitedAsyncRead<S> {
     229                 :         permit: tokio::sync::OwnedSemaphorePermit,
     230                 :         #[pin]
     231                 :         inner: S,
     232                 :     }
     233                 : }
     234                 : 
     235                 : impl<S: AsyncRead> RatelimitedAsyncRead<S> {
     236             912 :     fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self {
     237             912 :         RatelimitedAsyncRead { permit, inner }
     238             912 :     }
     239                 : }
     240                 : 
     241                 : impl<S: AsyncRead> AsyncRead for RatelimitedAsyncRead<S> {
     242          278043 :     fn poll_read(
     243          278043 :         self: std::pin::Pin<&mut Self>,
     244          278043 :         cx: &mut std::task::Context<'_>,
     245          278043 :         buf: &mut io::ReadBuf<'_>,
     246          278043 :     ) -> std::task::Poll<std::io::Result<()>> {
     247          278043 :         let this = self.project();
     248          278043 :         this.inner.poll_read(cx, buf)
     249          278043 :     }
     250                 : }
     251                 : 
     252                 : pin_project_lite::pin_project! {
     253                 :     /// Times and tracks the outcome of the request.
     254                 :     struct TimedDownload<S> {
     255                 :         started_at: std::time::Instant,
     256                 :         outcome: metrics::AttemptOutcome,
     257                 :         #[pin]
     258                 :         inner: S
     259                 :     }
     260                 : 
     261                 :     impl<S> PinnedDrop for TimedDownload<S> {
     262                 :         fn drop(mut this: Pin<&mut Self>) {
     263                 :             metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
     264                 :         }
     265                 :     }
     266                 : }
     267                 : 
     268                 : impl<S: AsyncRead> TimedDownload<S> {
     269             912 :     fn new(started_at: std::time::Instant, inner: S) -> Self {
     270             912 :         TimedDownload {
     271             912 :             started_at,
     272             912 :             outcome: metrics::AttemptOutcome::Cancelled,
     273             912 :             inner,
     274             912 :         }
     275             912 :     }
     276                 : }
     277                 : 
     278                 : impl<S: AsyncRead> AsyncRead for TimedDownload<S> {
     279          278043 :     fn poll_read(
     280          278043 :         self: std::pin::Pin<&mut Self>,
     281          278043 :         cx: &mut std::task::Context<'_>,
     282          278043 :         buf: &mut io::ReadBuf<'_>,
     283          278043 :     ) -> std::task::Poll<std::io::Result<()>> {
     284          278043 :         let this = self.project();
     285          278043 :         let before = buf.filled().len();
     286          278043 :         let read = std::task::ready!(this.inner.poll_read(cx, buf));
     287                 : 
     288          202139 :         let read_eof = buf.filled().len() == before;
     289                 : 
     290          202139 :         match read {
     291             902 :             Ok(()) if read_eof => *this.outcome = AttemptOutcome::Ok,
     292          201237 :             Ok(()) => { /* still in progress */ }
     293 UBC           0 :             Err(_) => *this.outcome = AttemptOutcome::Err,
     294                 :         }
     295                 : 
     296 CBC      202139 :         std::task::Poll::Ready(read)
     297          278043 :     }
     298                 : }
     299                 : 
     300                 : #[async_trait::async_trait]
     301                 : impl RemoteStorage for S3Bucket {
     302                 :     /// See the doc for `RemoteStorage::list_prefixes`
     303                 :     /// Note: it wont include empty "directories"
     304              22 :     async fn list_prefixes(
     305              22 :         &self,
     306              22 :         prefix: Option<&RemotePath>,
     307              22 :     ) -> Result<Vec<RemotePath>, DownloadError> {
     308              22 :         let kind = RequestKind::List;
     309              22 : 
     310              22 :         // get the passed prefix or if it is not set use prefix_in_bucket value
     311              22 :         let list_prefix = prefix
     312              22 :             .map(|p| self.relative_path_to_s3_object(p))
     313              22 :             .or_else(|| self.prefix_in_bucket.clone())
     314              22 :             .map(|mut p| {
     315              22 :                 // required to end with a separator
     316              22 :                 // otherwise request will return only the entry of a prefix
     317              22 :                 if !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     318              22 :                     p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
     319              22 :                 }
     320              22 :                 p
     321              22 :             });
     322              22 : 
     323              22 :         let mut document_keys = Vec::new();
     324              22 : 
     325              22 :         let mut continuation_token = None;
     326                 : 
     327                 :         loop {
     328              22 :             let _guard = self.permit(kind).await;
     329              22 :             let started_at = start_measuring_requests(kind);
     330                 : 
     331              22 :             let fetch_response = self
     332              22 :                 .client
     333              22 :                 .list_objects_v2()
     334              22 :                 .bucket(self.bucket_name.clone())
     335              22 :                 .set_prefix(list_prefix.clone())
     336              22 :                 .set_continuation_token(continuation_token)
     337              22 :                 .delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string())
     338              22 :                 .set_max_keys(self.max_keys_per_list_response)
     339              22 :                 .send()
     340              67 :                 .await
     341              22 :                 .context("Failed to list S3 prefixes")
     342              22 :                 .map_err(DownloadError::Other);
     343              22 : 
     344              22 :             let started_at = ScopeGuard::into_inner(started_at);
     345              22 : 
     346              22 :             metrics::BUCKET_METRICS
     347              22 :                 .req_seconds
     348              22 :                 .observe_elapsed(kind, &fetch_response, started_at);
     349                 : 
     350              22 :             let fetch_response = fetch_response?;
     351                 : 
     352              22 :             document_keys.extend(
     353              22 :                 fetch_response
     354              22 :                     .common_prefixes
     355              22 :                     .unwrap_or_default()
     356              22 :                     .into_iter()
     357              32 :                     .filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))),
     358              22 :             );
     359                 : 
     360              22 :             continuation_token = match fetch_response.next_continuation_token {
     361 UBC           0 :                 Some(new_token) => Some(new_token),
     362 CBC          22 :                 None => break,
     363              22 :             };
     364              22 :         }
     365              22 : 
     366              22 :         Ok(document_keys)
     367              44 :     }
     368                 : 
     369                 :     /// See the doc for `RemoteStorage::list_files`
     370             193 :     async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
     371             193 :         let kind = RequestKind::List;
     372             193 : 
     373             193 :         let folder_name = folder
     374             193 :             .map(|p| self.relative_path_to_s3_object(p))
     375             193 :             .or_else(|| self.prefix_in_bucket.clone());
     376             193 : 
     377             193 :         // AWS may need to break the response into several parts
     378             193 :         let mut continuation_token = None;
     379             193 :         let mut all_files = vec![];
     380                 :         loop {
     381             193 :             let _guard = self.permit(kind).await;
     382             193 :             let started_at = start_measuring_requests(kind);
     383                 : 
     384             193 :             let response = self
     385             193 :                 .client
     386             193 :                 .list_objects_v2()
     387             193 :                 .bucket(self.bucket_name.clone())
     388             193 :                 .set_prefix(folder_name.clone())
     389             193 :                 .set_continuation_token(continuation_token)
     390             193 :                 .set_max_keys(self.max_keys_per_list_response)
     391             193 :                 .send()
     392             817 :                 .await
     393             193 :                 .context("Failed to list files in S3 bucket");
     394             193 : 
     395             193 :             let started_at = ScopeGuard::into_inner(started_at);
     396             193 :             metrics::BUCKET_METRICS
     397             193 :                 .req_seconds
     398             193 :                 .observe_elapsed(kind, &response, started_at);
     399                 : 
     400             193 :             let response = response?;
     401                 : 
     402            1272 :             for object in response.contents().unwrap_or_default() {
     403            1272 :                 let object_path = object.key().expect("response does not contain a key");
     404            1272 :                 let remote_path = self.s3_object_to_relative_path(object_path);
     405            1272 :                 all_files.push(remote_path);
     406            1272 :             }
     407             193 :             match response.next_continuation_token {
     408 UBC           0 :                 Some(new_token) => continuation_token = Some(new_token),
     409 CBC         193 :                 None => break,
     410             193 :             }
     411             193 :         }
     412             193 :         Ok(all_files)
     413             386 :     }
     414                 : 
     415           14483 :     async fn upload(
     416           14483 :         &self,
     417           14483 :         from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
     418           14483 :         from_size_bytes: usize,
     419           14483 :         to: &RemotePath,
     420           14483 :         metadata: Option<StorageMetadata>,
     421           14483 :     ) -> anyhow::Result<()> {
     422           14483 :         let kind = RequestKind::Put;
     423           14483 :         let _guard = self.permit(kind).await;
     424                 : 
     425           14483 :         let started_at = start_measuring_requests(kind);
     426           14483 : 
     427           14483 :         let body = Body::wrap_stream(ReaderStream::new(from));
     428           14483 :         let bytes_stream = ByteStream::new(SdkBody::from(body));
     429                 : 
     430           14483 :         let res = self
     431           14483 :             .client
     432           14483 :             .put_object()
     433           14483 :             .bucket(self.bucket_name.clone())
     434           14483 :             .key(self.relative_path_to_s3_object(to))
     435           14483 :             .set_metadata(metadata.map(|m| m.0))
     436           14483 :             .content_length(from_size_bytes.try_into()?)
     437           14483 :             .body(bytes_stream)
     438           14483 :             .send()
     439           48259 :             .await;
     440                 : 
     441           14412 :         let started_at = ScopeGuard::into_inner(started_at);
     442           14412 :         metrics::BUCKET_METRICS
     443           14412 :             .req_seconds
     444           14412 :             .observe_elapsed(kind, &res, started_at);
     445           14412 : 
     446           14412 :         res?;
     447                 : 
     448           14409 :         Ok(())
     449           28895 :     }
     450                 : 
     451            1194 :     async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
     452                 :         // if prefix is not none then download file `prefix/from`
     453                 :         // if prefix is none then download file `from`
     454            1194 :         self.download_object(GetObjectRequest {
     455            1194 :             bucket: self.bucket_name.clone(),
     456            1194 :             key: self.relative_path_to_s3_object(from),
     457            1194 :             range: None,
     458            1194 :         })
     459            4060 :         .await
     460            2388 :     }
     461                 : 
     462               4 :     async fn download_byte_range(
     463               4 :         &self,
     464               4 :         from: &RemotePath,
     465               4 :         start_inclusive: u64,
     466               4 :         end_exclusive: Option<u64>,
     467               4 :     ) -> Result<Download, DownloadError> {
     468                 :         // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
     469                 :         // and needs both ends to be exclusive
     470               4 :         let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
     471               4 :         let range = Some(match end_inclusive {
     472 UBC           0 :             Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
     473 CBC           4 :             None => format!("bytes={start_inclusive}-"),
     474                 :         });
     475                 : 
     476               4 :         self.download_object(GetObjectRequest {
     477               4 :             bucket: self.bucket_name.clone(),
     478               4 :             key: self.relative_path_to_s3_object(from),
     479               4 :             range,
     480               4 :         })
     481              19 :         .await
     482               8 :     }
     483            2245 :     async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
     484            2245 :         let kind = RequestKind::Delete;
     485            2245 :         let _guard = self.permit(kind).await;
     486                 : 
     487            2245 :         let mut delete_objects = Vec::with_capacity(paths.len());
     488           10515 :         for path in paths {
     489            8270 :             let obj_id = ObjectIdentifier::builder()
     490            8270 :                 .set_key(Some(self.relative_path_to_s3_object(path)))
     491            8270 :                 .build();
     492            8270 :             delete_objects.push(obj_id);
     493            8270 :         }
     494                 : 
     495            2245 :         for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
     496            2245 :             let started_at = start_measuring_requests(kind);
     497                 : 
     498            2245 :             let resp = self
     499            2245 :                 .client
     500            2245 :                 .delete_objects()
     501            2245 :                 .bucket(self.bucket_name.clone())
     502            2245 :                 .delete(Delete::builder().set_objects(Some(chunk.to_vec())).build())
     503            2245 :                 .send()
     504            9181 :                 .await;
     505                 : 
     506            2245 :             let started_at = ScopeGuard::into_inner(started_at);
     507            2245 :             metrics::BUCKET_METRICS
     508            2245 :                 .req_seconds
     509            2245 :                 .observe_elapsed(kind, &resp, started_at);
     510            2245 : 
     511            2245 :             match resp {
     512            2245 :                 Ok(resp) => {
     513            2245 :                     metrics::BUCKET_METRICS
     514            2245 :                         .deleted_objects_total
     515            2245 :                         .inc_by(chunk.len() as u64);
     516            2245 :                     if let Some(errors) = resp.errors {
     517                 :                         // Log a bounded number of the errors within the response:
     518                 :                         // these requests can carry 1000 keys so logging each one
     519                 :                         // would be too verbose, especially as errors may lead us
     520                 :                         // to retry repeatedly.
     521                 :                         const LOG_UP_TO_N_ERRORS: usize = 10;
     522 LBC         (1) :                         for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
     523             (1) :                             tracing::warn!(
     524             (1) :                                 "DeleteObjects key {} failed: {}: {}",
     525             (1) :                                 e.key.as_ref().map(Cow::from).unwrap_or("".into()),
     526             (1) :                                 e.code.as_ref().map(Cow::from).unwrap_or("".into()),
     527             (1) :                                 e.message.as_ref().map(Cow::from).unwrap_or("".into())
     528             (1) :                             );
     529                 :                         }
     530                 : 
     531             (1) :                         return Err(anyhow::format_err!(
     532             (1) :                             "Failed to delete {} objects",
     533             (1) :                             errors.len()
     534             (1) :                         ));
     535 CBC        2245 :                     }
     536                 :                 }
     537 UBC           0 :                 Err(e) => {
     538               0 :                     return Err(e.into());
     539                 :                 }
     540                 :             }
     541                 :         }
     542 CBC        2245 :         Ok(())
     543            4490 :     }
     544                 : 
     545            1931 :     async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
     546            1931 :         let paths = std::array::from_ref(path);
     547            7911 :         self.delete_objects(paths).await
     548            3862 :     }
     549                 : }
     550                 : 
     551                 : /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].
     552           18141 : fn start_counting_cancelled_wait(
     553           18141 :     kind: RequestKind,
     554           18141 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
     555           18141 :     scopeguard::guard_on_success(std::time::Instant::now(), move |_| {
     556 UBC           0 :         metrics::BUCKET_METRICS.cancelled_waits.get(kind).inc()
     557 CBC       18141 :     })
     558           18141 : }
     559                 : 
     560                 : /// On drop (cancellation) add time to [`metrics::BucketMetrics::req_seconds`].
     561           18141 : fn start_measuring_requests(
     562           18141 :     kind: RequestKind,
     563           18141 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
     564           18141 :     scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| {
     565 UBC           0 :         metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     566               0 :             kind,
     567               0 :             AttemptOutcome::Cancelled,
     568               0 :             started_at,
     569               0 :         )
     570 CBC       18141 :     })
     571           18141 : }
     572                 : 
     573                 : #[cfg(test)]
     574                 : mod tests {
     575                 :     use camino::Utf8Path;
     576                 :     use std::num::NonZeroUsize;
     577                 : 
     578                 :     use crate::{RemotePath, S3Bucket, S3Config};
     579                 : 
     580               1 :     #[test]
     581               1 :     fn relative_path() {
     582               1 :         let all_paths = ["", "some/path", "some/path/"];
     583               1 :         let all_paths: Vec<RemotePath> = all_paths
     584               1 :             .iter()
     585               3 :             .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
     586               1 :             .collect();
     587               1 :         let prefixes = [
     588               1 :             None,
     589               1 :             Some(""),
     590               1 :             Some("test/prefix"),
     591               1 :             Some("test/prefix/"),
     592               1 :             Some("/test/prefix/"),
     593               1 :         ];
     594               1 :         let expected_outputs = vec![
     595               1 :             vec!["", "some/path", "some/path"],
     596               1 :             vec!["/", "/some/path", "/some/path"],
     597               1 :             vec![
     598               1 :                 "test/prefix/",
     599               1 :                 "test/prefix/some/path",
     600               1 :                 "test/prefix/some/path",
     601               1 :             ],
     602               1 :             vec![
     603               1 :                 "test/prefix/",
     604               1 :                 "test/prefix/some/path",
     605               1 :                 "test/prefix/some/path",
     606               1 :             ],
     607               1 :             vec![
     608               1 :                 "test/prefix/",
     609               1 :                 "test/prefix/some/path",
     610               1 :                 "test/prefix/some/path",
     611               1 :             ],
     612               1 :         ];
     613                 : 
     614               5 :         for (prefix_idx, prefix) in prefixes.iter().enumerate() {
     615               5 :             let config = S3Config {
     616               5 :                 bucket_name: "bucket".to_owned(),
     617               5 :                 bucket_region: "region".to_owned(),
     618               5 :                 prefix_in_bucket: prefix.map(str::to_string),
     619               5 :                 endpoint: None,
     620               5 :                 concurrency_limit: NonZeroUsize::new(100).unwrap(),
     621               5 :                 max_keys_per_list_response: Some(5),
     622               5 :             };
     623               5 :             let storage = S3Bucket::new(&config).expect("remote storage init");
     624              15 :             for (test_path_idx, test_path) in all_paths.iter().enumerate() {
     625              15 :                 let result = storage.relative_path_to_s3_object(test_path);
     626              15 :                 let expected = expected_outputs[prefix_idx][test_path_idx];
     627              15 :                 assert_eq!(result, expected);
     628                 :             }
     629                 :         }
     630               1 :     }
     631                 : }
        

Generated by: LCOV version 2.1-beta