LCOV - differential code coverage report
Current view: top level - libs/remote_storage/src - s3_bucket.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 91.8 % 488 448 40 448
Current Date: 2024-01-09 02:06:09 Functions: 76.8 % 56 43 13 43
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! AWS S3 storage wrapper around `rusoto` library.
       2                 : //!
       3                 : //! Respects `prefix_in_bucket` property from [`S3Config`],
       4                 : //! allowing multiple api users to independently work with the same S3 bucket, if
       5                 : //! their bucket prefixes are both specified and different.
       6                 : 
       7                 : use std::{
       8                 :     borrow::Cow,
       9                 :     pin::Pin,
      10                 :     sync::Arc,
      11                 :     task::{Context, Poll},
      12                 : };
      13                 : 
      14                 : use anyhow::Context as _;
      15                 : use aws_config::{
      16                 :     environment::credentials::EnvironmentVariableCredentialsProvider,
      17                 :     imds::credentials::ImdsCredentialsProvider,
      18                 :     meta::credentials::CredentialsProviderChain,
      19                 :     profile::ProfileFileCredentialsProvider,
      20                 :     provider_config::ProviderConfig,
      21                 :     retry::{RetryConfigBuilder, RetryMode},
      22                 :     web_identity_token::WebIdentityTokenCredentialsProvider,
      23                 :     BehaviorVersion,
      24                 : };
      25                 : use aws_credential_types::provider::SharedCredentialsProvider;
      26                 : use aws_sdk_s3::{
      27                 :     config::{AsyncSleep, Builder, IdentityCache, Region, SharedAsyncSleep},
      28                 :     error::SdkError,
      29                 :     operation::get_object::GetObjectError,
      30                 :     types::{Delete, ObjectIdentifier},
      31                 :     Client,
      32                 : };
      33                 : use aws_smithy_async::rt::sleep::TokioSleep;
      34                 : 
      35                 : use aws_smithy_types::body::SdkBody;
      36                 : use aws_smithy_types::byte_stream::ByteStream;
      37                 : use bytes::Bytes;
      38                 : use futures::stream::Stream;
      39                 : use hyper::Body;
      40                 : use scopeguard::ScopeGuard;
      41                 : 
      42                 : use super::StorageMetadata;
      43                 : use crate::{
      44                 :     ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
      45                 :     S3Config, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
      46                 : };
      47                 : 
      48                 : pub(super) mod metrics;
      49                 : 
      50                 : use self::metrics::AttemptOutcome;
      51                 : pub(super) use self::metrics::RequestKind;
      52                 : 
      53                 : /// AWS S3 storage.
      54                 : pub struct S3Bucket {
      55                 :     client: Client,
      56                 :     bucket_name: String,
      57                 :     prefix_in_bucket: Option<String>,
      58                 :     max_keys_per_list_response: Option<i32>,
      59                 :     concurrency_limiter: ConcurrencyLimiter,
      60                 : }
      61                 : 
      62 UBC           0 : #[derive(Default)]
      63                 : struct GetObjectRequest {
      64                 :     bucket: String,
      65                 :     key: String,
      66                 :     range: Option<String>,
      67                 : }
      68                 : impl S3Bucket {
      69                 :     /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
      70 CBC         239 :     pub fn new(aws_config: &S3Config) -> anyhow::Result<Self> {
      71             239 :         tracing::debug!(
      72 UBC           0 :             "Creating s3 remote storage for S3 bucket {}",
      73               0 :             aws_config.bucket_name
      74               0 :         );
      75                 : 
      76 CBC         239 :         let region = Some(Region::new(aws_config.bucket_region.clone()));
      77             239 : 
      78             239 :         let provider_conf = ProviderConfig::without_region().with_region(region.clone());
      79             239 : 
      80             239 :         let credentials_provider = {
      81             239 :             // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
      82             239 :             CredentialsProviderChain::first_try(
      83             239 :                 "env",
      84             239 :                 EnvironmentVariableCredentialsProvider::new(),
      85             239 :             )
      86             239 :             // uses "AWS_PROFILE" / `aws sso login --profile <profile>`
      87             239 :             .or_else(
      88             239 :                 "profile-sso",
      89             239 :                 ProfileFileCredentialsProvider::builder()
      90             239 :                     .configure(&provider_conf)
      91             239 :                     .build(),
      92             239 :             )
      93             239 :             // uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
      94             239 :             // needed to access remote extensions bucket
      95             239 :             .or_else(
      96             239 :                 "token",
      97             239 :                 WebIdentityTokenCredentialsProvider::builder()
      98             239 :                     .configure(&provider_conf)
      99             239 :                     .build(),
     100             239 :             )
     101             239 :             // uses imds v2
     102             239 :             .or_else("imds", ImdsCredentialsProvider::builder().build())
     103             239 :         };
     104             239 : 
     105             239 :         // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
     106             239 :         let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
     107             239 : 
     108             239 :         // We do our own retries (see [`backoff::retry`]).  However, for the AWS SDK to enable rate limiting in response to throttling
     109             239 :         // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config.  We set it to use at most one
     110             239 :         // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
     111             239 :         let mut retry_config = RetryConfigBuilder::new();
     112             239 :         retry_config
     113             239 :             .set_max_attempts(Some(1))
     114             239 :             .set_mode(Some(RetryMode::Adaptive));
     115             239 : 
     116             239 :         let mut config_builder = Builder::default()
     117             239 :             .behavior_version(BehaviorVersion::v2023_11_09())
     118             239 :             .region(region)
     119             239 :             .identity_cache(IdentityCache::lazy().build())
     120             239 :             .credentials_provider(SharedCredentialsProvider::new(credentials_provider))
     121             239 :             .retry_config(retry_config.build())
     122             239 :             .sleep_impl(SharedAsyncSleep::from(sleep_impl));
     123                 : 
     124             239 :         if let Some(custom_endpoint) = aws_config.endpoint.clone() {
     125             143 :             config_builder = config_builder
     126             143 :                 .endpoint_url(custom_endpoint)
     127             143 :                 .force_path_style(true);
     128             143 :         }
     129                 : 
     130             239 :         let client = Client::from_conf(config_builder.build());
     131             239 : 
     132             239 :         let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| {
     133             238 :             let mut prefix = prefix;
     134             239 :             while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     135               1 :                 prefix = &prefix[1..]
     136                 :             }
     137                 : 
     138             238 :             let mut prefix = prefix.to_string();
     139             250 :             while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     140              12 :                 prefix.pop();
     141              12 :             }
     142             238 :             prefix
     143             239 :         });
     144             239 :         Ok(Self {
     145             239 :             client,
     146             239 :             bucket_name: aws_config.bucket_name.clone(),
     147             239 :             max_keys_per_list_response: aws_config.max_keys_per_list_response,
     148             239 :             prefix_in_bucket,
     149             239 :             concurrency_limiter: ConcurrencyLimiter::new(aws_config.concurrency_limit.get()),
     150             239 :         })
     151             239 :     }
     152                 : 
     153            2672 :     fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
     154            2672 :         let relative_path =
     155            2672 :             match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
     156            2672 :                 Some(stripped) => stripped,
     157                 :                 // we rely on AWS to return properly prefixed paths
     158                 :                 // for requests with a certain prefix
     159 UBC           0 :                 None => panic!(
     160               0 :                     "Key {} does not start with bucket prefix {:?}",
     161               0 :                     key, self.prefix_in_bucket
     162               0 :                 ),
     163                 :             };
     164 CBC        2672 :         RemotePath(
     165            2672 :             relative_path
     166            2672 :                 .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     167            2672 :                 .collect(),
     168            2672 :         )
     169            2672 :     }
     170                 : 
     171           32496 :     pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
     172           32496 :         assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
     173           32496 :         let path_string = path
     174           32496 :             .get_path()
     175           32496 :             .as_str()
     176           32496 :             .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR);
     177           32496 :         match &self.prefix_in_bucket {
     178           32493 :             Some(prefix) => prefix.clone() + "/" + path_string,
     179               3 :             None => path_string.to_string(),
     180                 :         }
     181           32496 :     }
     182                 : 
     183           18037 :     async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> {
     184           18037 :         let started_at = start_counting_cancelled_wait(kind);
     185           18037 :         let permit = self
     186           18037 :             .concurrency_limiter
     187           18037 :             .acquire(kind)
     188            4254 :             .await
     189           18037 :             .expect("semaphore is never closed");
     190           18037 : 
     191           18037 :         let started_at = ScopeGuard::into_inner(started_at);
     192           18037 :         metrics::BUCKET_METRICS
     193           18037 :             .wait_seconds
     194           18037 :             .observe_elapsed(kind, started_at);
     195           18037 : 
     196           18037 :         permit
     197           18037 :     }
     198                 : 
     199           10457 :     async fn owned_permit(&self, kind: RequestKind) -> tokio::sync::OwnedSemaphorePermit {
     200           10457 :         let started_at = start_counting_cancelled_wait(kind);
     201           10457 :         let permit = self
     202           10457 :             .concurrency_limiter
     203           10457 :             .acquire_owned(kind)
     204 UBC           0 :             .await
     205 CBC       10457 :             .expect("semaphore is never closed");
     206           10457 : 
     207           10457 :         let started_at = ScopeGuard::into_inner(started_at);
     208           10457 :         metrics::BUCKET_METRICS
     209           10457 :             .wait_seconds
     210           10457 :             .observe_elapsed(kind, started_at);
     211           10457 :         permit
     212           10457 :     }
     213                 : 
     214           10457 :     async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
     215           10457 :         let kind = RequestKind::Get;
     216           10457 :         let permit = self.owned_permit(kind).await;
     217                 : 
     218           10457 :         let started_at = start_measuring_requests(kind);
     219                 : 
     220           10457 :         let get_object = self
     221           10457 :             .client
     222           10457 :             .get_object()
     223           10457 :             .bucket(request.bucket)
     224           10457 :             .key(request.key)
     225           10457 :             .set_range(request.range)
     226           10457 :             .send()
     227           30605 :             .await;
     228                 : 
     229           10454 :         let started_at = ScopeGuard::into_inner(started_at);
     230                 : 
     231             256 :         match get_object {
     232           10198 :             Ok(object_output) => {
     233           10198 :                 let metadata = object_output.metadata().cloned().map(StorageMetadata);
     234           10198 :                 let etag = object_output.e_tag.clone();
     235           10198 :                 let last_modified = object_output.last_modified.and_then(|t| t.try_into().ok());
     236           10198 : 
     237           10198 :                 let body = object_output.body;
     238           10198 :                 let body = ByteStreamAsStream::from(body);
     239           10198 :                 let body = PermitCarrying::new(permit, body);
     240           10198 :                 let body = TimedDownload::new(started_at, body);
     241           10198 : 
     242           10198 :                 Ok(Download {
     243           10198 :                     metadata,
     244           10198 :                     etag,
     245           10198 :                     last_modified,
     246           10198 :                     download_stream: Box::pin(body),
     247           10198 :                 })
     248                 :             }
     249             256 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
     250                 :                 // Count this in the AttemptOutcome::Ok bucket, because 404 is not
     251                 :                 // an error: we expect to sometimes fetch an object and find it missing,
     252                 :                 // e.g. when probing for timeline indices.
     253             256 :                 metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     254             256 :                     kind,
     255             256 :                     AttemptOutcome::Ok,
     256             256 :                     started_at,
     257             256 :                 );
     258             256 :                 Err(DownloadError::NotFound)
     259                 :             }
     260 UBC           0 :             Err(e) => {
     261               0 :                 metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     262               0 :                     kind,
     263               0 :                     AttemptOutcome::Err,
     264               0 :                     started_at,
     265               0 :                 );
     266               0 : 
     267               0 :                 Err(DownloadError::Other(
     268               0 :                     anyhow::Error::new(e).context("download s3 object"),
     269               0 :                 ))
     270                 :             }
     271                 :         }
     272 CBC       10454 :     }
     273                 : }
     274                 : 
     275                 : pin_project_lite::pin_project! {
     276                 :     struct ByteStreamAsStream {
     277                 :         #[pin]
     278                 :         inner: aws_smithy_types::byte_stream::ByteStream
     279                 :     }
     280                 : }
     281                 : 
     282                 : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
     283           10198 :     fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
     284           10198 :         ByteStreamAsStream { inner }
     285           10198 :     }
     286                 : }
     287                 : 
     288                 : impl Stream for ByteStreamAsStream {
     289                 :     type Item = std::io::Result<Bytes>;
     290                 : 
     291          120583 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     292          120583 :         // this does the std::io::ErrorKind::Other conversion
     293          120583 :         self.project().inner.poll_next(cx).map_err(|x| x.into())
     294          120583 :     }
     295                 : 
     296                 :     // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
     297                 :     // sense and Stream::size_hint does not really
     298                 : }
     299                 : 
     300                 : pin_project_lite::pin_project! {
     301                 :     /// An `AsyncRead` adapter which carries a permit for the lifetime of the value.
     302                 :     struct PermitCarrying<S> {
     303                 :         permit: tokio::sync::OwnedSemaphorePermit,
     304                 :         #[pin]
     305                 :         inner: S,
     306                 :     }
     307                 : }
     308                 : 
     309                 : impl<S> PermitCarrying<S> {
     310           10198 :     fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self {
     311           10198 :         Self { permit, inner }
     312           10198 :     }
     313                 : }
     314                 : 
     315                 : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for PermitCarrying<S> {
     316                 :     type Item = <S as Stream>::Item;
     317                 : 
     318          120583 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     319          120583 :         self.project().inner.poll_next(cx)
     320          120583 :     }
     321                 : 
     322 UBC           0 :     fn size_hint(&self) -> (usize, Option<usize>) {
     323               0 :         self.inner.size_hint()
     324               0 :     }
     325                 : }
     326                 : 
     327                 : pin_project_lite::pin_project! {
     328                 :     /// Times and tracks the outcome of the request.
     329                 :     struct TimedDownload<S> {
     330                 :         started_at: std::time::Instant,
     331                 :         outcome: metrics::AttemptOutcome,
     332                 :         #[pin]
     333                 :         inner: S
     334                 :     }
     335                 : 
     336                 :     impl<S> PinnedDrop for TimedDownload<S> {
     337                 :         fn drop(mut this: Pin<&mut Self>) {
     338                 :             metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
     339                 :         }
     340                 :     }
     341                 : }
     342                 : 
     343                 : impl<S> TimedDownload<S> {
     344 CBC       10198 :     fn new(started_at: std::time::Instant, inner: S) -> Self {
     345           10198 :         TimedDownload {
     346           10198 :             started_at,
     347           10198 :             outcome: metrics::AttemptOutcome::Cancelled,
     348           10198 :             inner,
     349           10198 :         }
     350           10198 :     }
     351                 : }
     352                 : 
     353                 : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
     354                 :     type Item = <S as Stream>::Item;
     355                 : 
     356          120583 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     357          120583 :         use std::task::ready;
     358          120583 : 
     359          120583 :         let this = self.project();
     360                 : 
     361          120583 :         let res = ready!(this.inner.poll_next(cx));
     362           58122 :         match &res {
     363           58122 :             Some(Ok(_)) => {}
     364 UBC           0 :             Some(Err(_)) => *this.outcome = metrics::AttemptOutcome::Err,
     365 CBC       22985 :             None => *this.outcome = metrics::AttemptOutcome::Ok,
     366                 :         }
     367                 : 
     368           81107 :         Poll::Ready(res)
     369          120583 :     }
     370                 : 
     371 UBC           0 :     fn size_hint(&self) -> (usize, Option<usize>) {
     372               0 :         self.inner.size_hint()
     373               0 :     }
     374                 : }
     375                 : 
     376                 : #[async_trait::async_trait]
     377                 : impl RemoteStorage for S3Bucket {
     378 CBC         406 :     async fn list(
     379             406 :         &self,
     380             406 :         prefix: Option<&RemotePath>,
     381             406 :         mode: ListingMode,
     382             406 :     ) -> Result<Listing, DownloadError> {
     383             406 :         let kind = RequestKind::List;
     384             406 :         let mut result = Listing::default();
     385             406 : 
     386             406 :         // get the passed prefix or if it is not set use prefix_in_bucket value
     387             406 :         let list_prefix = prefix
     388             406 :             .map(|p| self.relative_path_to_s3_object(p))
     389             406 :             .or_else(|| self.prefix_in_bucket.clone())
     390             406 :             .map(|mut p| {
     391                 :                 // required to end with a separator
     392                 :                 // otherwise request will return only the entry of a prefix
     393             406 :                 if matches!(mode, ListingMode::WithDelimiter)
     394             145 :                     && !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
     395             145 :                 {
     396             145 :                     p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
     397             261 :                 }
     398             406 :                 p
     399             406 :             });
     400             406 : 
     401             406 :         let mut continuation_token = None;
     402                 : 
     403                 :         loop {
     404             414 :             let _guard = self.permit(kind).await;
     405             414 :             let started_at = start_measuring_requests(kind);
     406             414 : 
     407             414 :             let mut request = self
     408             414 :                 .client
     409             414 :                 .list_objects_v2()
     410             414 :                 .bucket(self.bucket_name.clone())
     411             414 :                 .set_prefix(list_prefix.clone())
     412             414 :                 .set_continuation_token(continuation_token)
     413             414 :                 .set_max_keys(self.max_keys_per_list_response);
     414             414 : 
     415             414 :             if let ListingMode::WithDelimiter = mode {
     416             149 :                 request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
     417             265 :             }
     418                 : 
     419             414 :             let response = request
     420             414 :                 .send()
     421            1862 :                 .await
     422             414 :                 .context("Failed to list S3 prefixes")
     423             414 :                 .map_err(DownloadError::Other);
     424             414 : 
     425             414 :             let started_at = ScopeGuard::into_inner(started_at);
     426             414 : 
     427             414 :             metrics::BUCKET_METRICS
     428             414 :                 .req_seconds
     429             414 :                 .observe_elapsed(kind, &response, started_at);
     430                 : 
     431             414 :             let response = response?;
     432                 : 
     433             414 :             let keys = response.contents();
     434             414 :             let empty = Vec::new();
     435             414 :             let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty);
     436             414 : 
     437             414 :             tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
     438                 : 
     439            2860 :             for object in keys {
     440            2446 :                 let object_path = object.key().expect("response does not contain a key");
     441            2446 :                 let remote_path = self.s3_object_to_relative_path(object_path);
     442            2446 :                 result.keys.push(remote_path);
     443            2446 :             }
     444                 : 
     445             414 :             result.prefixes.extend(
     446             414 :                 prefixes
     447             414 :                     .iter()
     448             414 :                     .filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))),
     449             414 :             );
     450                 : 
     451             414 :             continuation_token = match response.next_continuation_token {
     452               8 :                 Some(new_token) => Some(new_token),
     453             406 :                 None => break,
     454             406 :             };
     455             406 :         }
     456             406 : 
     457             406 :         Ok(result)
     458             812 :     }
     459                 : 
     460           15315 :     async fn upload(
     461           15315 :         &self,
     462           15315 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     463           15315 :         from_size_bytes: usize,
     464           15315 :         to: &RemotePath,
     465           15315 :         metadata: Option<StorageMetadata>,
     466           15315 :     ) -> anyhow::Result<()> {
     467           15315 :         let kind = RequestKind::Put;
     468           15315 :         let _guard = self.permit(kind).await;
     469                 : 
     470           15315 :         let started_at = start_measuring_requests(kind);
     471           15315 : 
     472           15315 :         let body = Body::wrap_stream(from);
     473           15315 :         let bytes_stream = ByteStream::new(SdkBody::from_body_0_4(body));
     474                 : 
     475           15315 :         let res = self
     476           15315 :             .client
     477           15315 :             .put_object()
     478           15315 :             .bucket(self.bucket_name.clone())
     479           15315 :             .key(self.relative_path_to_s3_object(to))
     480           15315 :             .set_metadata(metadata.map(|m| m.0))
     481           15315 :             .content_length(from_size_bytes.try_into()?)
     482           15315 :             .body(bytes_stream)
     483           15315 :             .send()
     484           50020 :             .await;
     485                 : 
     486           15313 :         let started_at = ScopeGuard::into_inner(started_at);
     487           15313 :         metrics::BUCKET_METRICS
     488           15313 :             .req_seconds
     489           15313 :             .observe_elapsed(kind, &res, started_at);
     490           15313 : 
     491           15313 :         res?;
     492                 : 
     493           15313 :         Ok(())
     494           30628 :     }
     495                 : 
     496              12 :     async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
     497              12 :         let kind = RequestKind::Copy;
     498              12 :         let _guard = self.permit(kind).await;
     499                 : 
     500              12 :         let started_at = start_measuring_requests(kind);
     501              12 : 
     502              12 :         // we need to specify bucket_name as a prefix
     503              12 :         let copy_source = format!(
     504              12 :             "{}/{}",
     505              12 :             self.bucket_name,
     506              12 :             self.relative_path_to_s3_object(from)
     507              12 :         );
     508                 : 
     509              12 :         let res = self
     510              12 :             .client
     511              12 :             .copy_object()
     512              12 :             .bucket(self.bucket_name.clone())
     513              12 :             .key(self.relative_path_to_s3_object(to))
     514              12 :             .copy_source(copy_source)
     515              12 :             .send()
     516              48 :             .await;
     517                 : 
     518              12 :         let started_at = ScopeGuard::into_inner(started_at);
     519              12 :         metrics::BUCKET_METRICS
     520              12 :             .req_seconds
     521              12 :             .observe_elapsed(kind, &res, started_at);
     522              12 : 
     523              12 :         res?;
     524                 : 
     525              12 :         Ok(())
     526              24 :     }
     527                 : 
     528           10411 :     async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
     529                 :         // if prefix is not none then download file `prefix/from`
     530                 :         // if prefix is none then download file `from`
     531           10411 :         self.download_object(GetObjectRequest {
     532           10411 :             bucket: self.bucket_name.clone(),
     533           10411 :             key: self.relative_path_to_s3_object(from),
     534           10411 :             range: None,
     535           10411 :         })
     536           30489 :         .await
     537           20819 :     }
     538                 : 
     539              46 :     async fn download_byte_range(
     540              46 :         &self,
     541              46 :         from: &RemotePath,
     542              46 :         start_inclusive: u64,
     543              46 :         end_exclusive: Option<u64>,
     544              46 :     ) -> Result<Download, DownloadError> {
     545                 :         // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
     546                 :         // and needs both ends to be exclusive
     547              46 :         let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
     548              46 :         let range = Some(match end_inclusive {
     549               6 :             Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
     550              40 :             None => format!("bytes={start_inclusive}-"),
     551                 :         });
     552                 : 
     553              46 :         self.download_object(GetObjectRequest {
     554              46 :             bucket: self.bucket_name.clone(),
     555              46 :             key: self.relative_path_to_s3_object(from),
     556              46 :             range,
     557              46 :         })
     558             116 :         .await
     559              92 :     }
     560            2204 :     async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
     561            2204 :         let kind = RequestKind::Delete;
     562            2204 :         let _guard = self.permit(kind).await;
     563                 : 
     564            2204 :         let mut delete_objects = Vec::with_capacity(paths.len());
     565            8397 :         for path in paths {
     566            6193 :             let obj_id = ObjectIdentifier::builder()
     567            6193 :                 .set_key(Some(self.relative_path_to_s3_object(path)))
     568            6193 :                 .build()?;
     569            6193 :             delete_objects.push(obj_id);
     570                 :         }
     571                 : 
     572            2204 :         for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
     573            2204 :             let started_at = start_measuring_requests(kind);
     574                 : 
     575            2204 :             let resp = self
     576            2204 :                 .client
     577            2204 :                 .delete_objects()
     578            2204 :                 .bucket(self.bucket_name.clone())
     579            2204 :                 .delete(
     580            2204 :                     Delete::builder()
     581            2204 :                         .set_objects(Some(chunk.to_vec()))
     582            2204 :                         .build()?,
     583                 :                 )
     584            2204 :                 .send()
     585            9008 :                 .await;
     586                 : 
     587            2204 :             let started_at = ScopeGuard::into_inner(started_at);
     588            2204 :             metrics::BUCKET_METRICS
     589            2204 :                 .req_seconds
     590            2204 :                 .observe_elapsed(kind, &resp, started_at);
     591            2204 : 
     592            2204 :             match resp {
     593            2204 :                 Ok(resp) => {
     594            2204 :                     metrics::BUCKET_METRICS
     595            2204 :                         .deleted_objects_total
     596            2204 :                         .inc_by(chunk.len() as u64);
     597            2204 :                     if let Some(errors) = resp.errors {
     598                 :                         // Log a bounded number of the errors within the response:
     599                 :                         // these requests can carry 1000 keys so logging each one
     600                 :                         // would be too verbose, especially as errors may lead us
     601                 :                         // to retry repeatedly.
     602                 :                         const LOG_UP_TO_N_ERRORS: usize = 10;
     603 UBC           0 :                         for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
     604               0 :                             tracing::warn!(
     605               0 :                                 "DeleteObjects key {} failed: {}: {}",
     606               0 :                                 e.key.as_ref().map(Cow::from).unwrap_or("".into()),
     607               0 :                                 e.code.as_ref().map(Cow::from).unwrap_or("".into()),
     608               0 :                                 e.message.as_ref().map(Cow::from).unwrap_or("".into())
     609               0 :                             );
     610                 :                         }
     611                 : 
     612               0 :                         return Err(anyhow::format_err!(
     613               0 :                             "Failed to delete {} objects",
     614               0 :                             errors.len()
     615               0 :                         ));
     616 CBC        2204 :                     }
     617                 :                 }
     618 UBC           0 :                 Err(e) => {
     619               0 :                     return Err(e.into());
     620                 :                 }
     621                 :             }
     622                 :         }
     623 CBC        2204 :         Ok(())
     624            4408 :     }
     625                 : 
     626            2003 :     async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
     627            2003 :         let paths = std::array::from_ref(path);
     628            8101 :         self.delete_objects(paths).await
     629            4006 :     }
     630                 : }
     631                 : 
     632                 : /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].
     633           28494 : fn start_counting_cancelled_wait(
     634           28494 :     kind: RequestKind,
     635           28494 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
     636           28494 :     scopeguard::guard_on_success(std::time::Instant::now(), move |_| {
     637 UBC           0 :         metrics::BUCKET_METRICS.cancelled_waits.get(kind).inc()
     638 CBC       28494 :     })
     639           28494 : }
     640                 : 
     641                 : /// On drop (cancellation) add time to [`metrics::BucketMetrics::req_seconds`].
     642           28494 : fn start_measuring_requests(
     643           28494 :     kind: RequestKind,
     644           28494 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
     645           28494 :     scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| {
     646               4 :         metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     647               4 :             kind,
     648               4 :             AttemptOutcome::Cancelled,
     649               4 :             started_at,
     650               4 :         )
     651           28494 :     })
     652           28494 : }
     653                 : 
     654                 : #[cfg(test)]
     655                 : mod tests {
     656                 :     use camino::Utf8Path;
     657                 :     use std::num::NonZeroUsize;
     658                 : 
     659                 :     use crate::{RemotePath, S3Bucket, S3Config};
     660                 : 
     661               1 :     #[test]
     662               1 :     fn relative_path() {
     663               1 :         let all_paths = ["", "some/path", "some/path/"];
     664               1 :         let all_paths: Vec<RemotePath> = all_paths
     665               1 :             .iter()
     666               3 :             .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
     667               1 :             .collect();
     668               1 :         let prefixes = [
     669               1 :             None,
     670               1 :             Some(""),
     671               1 :             Some("test/prefix"),
     672               1 :             Some("test/prefix/"),
     673               1 :             Some("/test/prefix/"),
     674               1 :         ];
     675               1 :         let expected_outputs = vec![
     676               1 :             vec!["", "some/path", "some/path"],
     677               1 :             vec!["/", "/some/path", "/some/path"],
     678               1 :             vec![
     679               1 :                 "test/prefix/",
     680               1 :                 "test/prefix/some/path",
     681               1 :                 "test/prefix/some/path",
     682               1 :             ],
     683               1 :             vec![
     684               1 :                 "test/prefix/",
     685               1 :                 "test/prefix/some/path",
     686               1 :                 "test/prefix/some/path",
     687               1 :             ],
     688               1 :             vec![
     689               1 :                 "test/prefix/",
     690               1 :                 "test/prefix/some/path",
     691               1 :                 "test/prefix/some/path",
     692               1 :             ],
     693               1 :         ];
     694                 : 
     695               5 :         for (prefix_idx, prefix) in prefixes.iter().enumerate() {
     696               5 :             let config = S3Config {
     697               5 :                 bucket_name: "bucket".to_owned(),
     698               5 :                 bucket_region: "region".to_owned(),
     699               5 :                 prefix_in_bucket: prefix.map(str::to_string),
     700               5 :                 endpoint: None,
     701               5 :                 concurrency_limit: NonZeroUsize::new(100).unwrap(),
     702               5 :                 max_keys_per_list_response: Some(5),
     703               5 :             };
     704               5 :             let storage = S3Bucket::new(&config).expect("remote storage init");
     705              15 :             for (test_path_idx, test_path) in all_paths.iter().enumerate() {
     706              15 :                 let result = storage.relative_path_to_s3_object(test_path);
     707              15 :                 let expected = expected_outputs[prefix_idx][test_path_idx];
     708              15 :                 assert_eq!(result, expected);
     709                 :             }
     710                 :         }
     711               1 :     }
     712                 : }
        

Generated by: LCOV version 2.1-beta