LCOV - code coverage report
Current view: top level - libs/remote_storage/src - s3_bucket.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 86.5 % 703 608
Test Date: 2024-02-07 07:37:29 Functions: 60.2 % 98 59

            Line data    Source code
       1              : //! AWS S3 storage wrapper around `rusoto` library.
       2              : //!
       3              : //! Respects `prefix_in_bucket` property from [`S3Config`],
       4              : //! allowing multiple api users to independently work with the same S3 bucket, if
       5              : //! their bucket prefixes are both specified and different.
       6              : 
       7              : use std::{
       8              :     borrow::Cow,
       9              :     collections::HashMap,
      10              :     pin::Pin,
      11              :     sync::Arc,
      12              :     task::{Context, Poll},
      13              :     time::SystemTime,
      14              : };
      15              : 
      16              : use anyhow::{anyhow, Context as _};
      17              : use aws_config::{
      18              :     environment::credentials::EnvironmentVariableCredentialsProvider,
      19              :     imds::credentials::ImdsCredentialsProvider,
      20              :     meta::credentials::CredentialsProviderChain,
      21              :     profile::ProfileFileCredentialsProvider,
      22              :     provider_config::ProviderConfig,
      23              :     retry::{RetryConfigBuilder, RetryMode},
      24              :     web_identity_token::WebIdentityTokenCredentialsProvider,
      25              :     BehaviorVersion,
      26              : };
      27              : use aws_credential_types::provider::SharedCredentialsProvider;
      28              : use aws_sdk_s3::{
      29              :     config::{AsyncSleep, Builder, IdentityCache, Region, SharedAsyncSleep},
      30              :     error::SdkError,
      31              :     operation::get_object::GetObjectError,
      32              :     types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion},
      33              :     Client,
      34              : };
      35              : use aws_smithy_async::rt::sleep::TokioSleep;
      36              : 
      37              : use aws_smithy_types::byte_stream::ByteStream;
      38              : use aws_smithy_types::{body::SdkBody, DateTime};
      39              : use bytes::Bytes;
      40              : use futures::stream::Stream;
      41              : use hyper::Body;
      42              : use scopeguard::ScopeGuard;
      43              : use tokio_util::sync::CancellationToken;
      44              : use utils::backoff;
      45              : 
      46              : use super::StorageMetadata;
      47              : use crate::{
      48              :     ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
      49              :     S3Config, TimeTravelError, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
      50              : };
      51              : 
      52              : pub(super) mod metrics;
      53              : 
      54              : use self::metrics::AttemptOutcome;
      55              : pub(super) use self::metrics::RequestKind;
      56              : 
      57              : /// AWS S3 storage.
      58              : pub struct S3Bucket {
      59              :     client: Client,
      60              :     bucket_name: String,
      61              :     prefix_in_bucket: Option<String>,
      62              :     max_keys_per_list_response: Option<i32>,
      63              :     concurrency_limiter: ConcurrencyLimiter,
      64              : }
      65              : 
      66            0 : #[derive(Default)]
      67              : struct GetObjectRequest {
      68              :     bucket: String,
      69              :     key: String,
      70              :     range: Option<String>,
      71              : }
      72              : impl S3Bucket {
      73              :     /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
      74          275 :     pub fn new(aws_config: &S3Config) -> anyhow::Result<Self> {
      75          275 :         tracing::debug!(
      76            0 :             "Creating s3 remote storage for S3 bucket {}",
      77            0 :             aws_config.bucket_name
      78            0 :         );
      79              : 
      80          275 :         let region = Some(Region::new(aws_config.bucket_region.clone()));
      81          275 : 
      82          275 :         let provider_conf = ProviderConfig::without_region().with_region(region.clone());
      83          275 : 
      84          275 :         let credentials_provider = {
      85          275 :             // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
      86          275 :             CredentialsProviderChain::first_try(
      87          275 :                 "env",
      88          275 :                 EnvironmentVariableCredentialsProvider::new(),
      89          275 :             )
      90          275 :             // uses "AWS_PROFILE" / `aws sso login --profile <profile>`
      91          275 :             .or_else(
      92          275 :                 "profile-sso",
      93          275 :                 ProfileFileCredentialsProvider::builder()
      94          275 :                     .configure(&provider_conf)
      95          275 :                     .build(),
      96          275 :             )
      97          275 :             // uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
      98          275 :             // needed to access remote extensions bucket
      99          275 :             .or_else(
     100          275 :                 "token",
     101          275 :                 WebIdentityTokenCredentialsProvider::builder()
     102          275 :                     .configure(&provider_conf)
     103          275 :                     .build(),
     104          275 :             )
     105          275 :             // uses imds v2
     106          275 :             .or_else("imds", ImdsCredentialsProvider::builder().build())
     107          275 :         };
     108          275 : 
     109          275 :         // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
     110          275 :         let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
     111          275 : 
     112          275 :         // We do our own retries (see [`backoff::retry`]).  However, for the AWS SDK to enable rate limiting in response to throttling
     113          275 :         // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config.  We set it to use at most one
     114          275 :         // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
     115          275 :         let mut retry_config = RetryConfigBuilder::new();
     116          275 :         retry_config
     117          275 :             .set_max_attempts(Some(1))
     118          275 :             .set_mode(Some(RetryMode::Adaptive));
     119          275 : 
     120          275 :         let mut config_builder = Builder::default()
     121          275 :             .behavior_version(BehaviorVersion::v2023_11_09())
     122          275 :             .region(region)
     123          275 :             .identity_cache(IdentityCache::lazy().build())
     124          275 :             .credentials_provider(SharedCredentialsProvider::new(credentials_provider))
     125          275 :             .retry_config(retry_config.build())
     126          275 :             .sleep_impl(SharedAsyncSleep::from(sleep_impl));
     127              : 
     128          275 :         if let Some(custom_endpoint) = aws_config.endpoint.clone() {
     129          143 :             config_builder = config_builder
     130          143 :                 .endpoint_url(custom_endpoint)
     131          143 :                 .force_path_style(true);
     132          143 :         }
     133              : 
     134          275 :         let client = Client::from_conf(config_builder.build());
     135          275 : 
     136          275 :         let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| {
     137          273 :             let mut prefix = prefix;
     138          275 :             while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     139            2 :                 prefix = &prefix[1..]
     140              :             }
     141              : 
     142          273 :             let mut prefix = prefix.to_string();
     143          291 :             while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     144           18 :                 prefix.pop();
     145           18 :             }
     146          273 :             prefix
     147          275 :         });
     148          275 :         Ok(Self {
     149          275 :             client,
     150          275 :             bucket_name: aws_config.bucket_name.clone(),
     151          275 :             max_keys_per_list_response: aws_config.max_keys_per_list_response,
     152          275 :             prefix_in_bucket,
     153          275 :             concurrency_limiter: ConcurrencyLimiter::new(aws_config.concurrency_limit.get()),
     154          275 :         })
     155          275 :     }
     156              : 
     157         2702 :     fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
     158         2702 :         let relative_path =
     159         2702 :             match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
     160         2702 :                 Some(stripped) => stripped,
     161              :                 // we rely on AWS to return properly prefixed paths
     162              :                 // for requests with a certain prefix
     163            0 :                 None => panic!(
     164            0 :                     "Key {} does not start with bucket prefix {:?}",
     165            0 :                     key, self.prefix_in_bucket
     166            0 :                 ),
     167              :             };
     168         2702 :         RemotePath(
     169         2702 :             relative_path
     170         2702 :                 .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     171         2702 :                 .collect(),
     172         2702 :         )
     173         2702 :     }
     174              : 
     175        34314 :     pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
     176        34314 :         assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
     177        34314 :         let path_string = path
     178        34314 :             .get_path()
     179        34314 :             .as_str()
     180        34314 :             .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR);
     181        34314 :         match &self.prefix_in_bucket {
     182        34308 :             Some(prefix) => prefix.clone() + "/" + path_string,
     183            6 :             None => path_string.to_string(),
     184              :         }
     185        34314 :     }
     186              : 
     187        19049 :     async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> {
     188        18810 :         let started_at = start_counting_cancelled_wait(kind);
     189        18810 :         let permit = self
     190        18810 :             .concurrency_limiter
     191        18810 :             .acquire(kind)
     192         4209 :             .await
     193        18810 :             .expect("semaphore is never closed");
     194        18810 : 
     195        18810 :         let started_at = ScopeGuard::into_inner(started_at);
     196        18810 :         metrics::BUCKET_METRICS
     197        18810 :             .wait_seconds
     198        18810 :             .observe_elapsed(kind, started_at);
     199        18810 : 
     200        18810 :         permit
     201        18810 :     }
     202              : 
     203        10764 :     async fn owned_permit(&self, kind: RequestKind) -> tokio::sync::OwnedSemaphorePermit {
     204        10744 :         let started_at = start_counting_cancelled_wait(kind);
     205        10744 :         let permit = self
     206        10744 :             .concurrency_limiter
     207        10744 :             .acquire_owned(kind)
     208            1 :             .await
     209        10744 :             .expect("semaphore is never closed");
     210        10744 : 
     211        10744 :         let started_at = ScopeGuard::into_inner(started_at);
     212        10744 :         metrics::BUCKET_METRICS
     213        10744 :             .wait_seconds
     214        10744 :             .observe_elapsed(kind, started_at);
     215        10744 :         permit
     216        10744 :     }
     217              : 
     218        10764 :     async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
     219        10744 :         let kind = RequestKind::Get;
     220        10744 :         let permit = self.owned_permit(kind).await;
     221              : 
     222        10744 :         let started_at = start_measuring_requests(kind);
     223              : 
     224        10744 :         let get_object = self
     225        10744 :             .client
     226        10744 :             .get_object()
     227        10744 :             .bucket(request.bucket)
     228        10744 :             .key(request.key)
     229        10744 :             .set_range(request.range)
     230        10744 :             .send()
     231        31575 :             .await;
     232              : 
     233        10739 :         let started_at = ScopeGuard::into_inner(started_at);
     234              : 
     235          199 :         match get_object {
     236        10540 :             Ok(object_output) => {
     237        10540 :                 let metadata = object_output.metadata().cloned().map(StorageMetadata);
     238        10540 :                 let etag = object_output.e_tag.clone();
     239        10540 :                 let last_modified = object_output.last_modified.and_then(|t| t.try_into().ok());
     240        10540 : 
     241        10540 :                 let body = object_output.body;
     242        10540 :                 let body = ByteStreamAsStream::from(body);
     243        10540 :                 let body = PermitCarrying::new(permit, body);
     244        10540 :                 let body = TimedDownload::new(started_at, body);
     245        10540 : 
     246        10540 :                 Ok(Download {
     247        10540 :                     metadata,
     248        10540 :                     etag,
     249        10540 :                     last_modified,
     250        10540 :                     download_stream: Box::pin(body),
     251        10540 :                 })
     252              :             }
     253          199 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
     254              :                 // Count this in the AttemptOutcome::Ok bucket, because 404 is not
     255              :                 // an error: we expect to sometimes fetch an object and find it missing,
     256              :                 // e.g. when probing for timeline indices.
     257          199 :                 metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     258          199 :                     kind,
     259          199 :                     AttemptOutcome::Ok,
     260          199 :                     started_at,
     261          199 :                 );
     262          199 :                 Err(DownloadError::NotFound)
     263              :             }
     264            0 :             Err(e) => {
     265            0 :                 metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     266            0 :                     kind,
     267            0 :                     AttemptOutcome::Err,
     268            0 :                     started_at,
     269            0 :                 );
     270            0 : 
     271            0 :                 Err(DownloadError::Other(
     272            0 :                     anyhow::Error::new(e).context("download s3 object"),
     273            0 :                 ))
     274              :             }
     275              :         }
     276        10739 :     }
     277              : 
     278         2343 :     async fn delete_oids(
     279         2343 :         &self,
     280         2343 :         kind: RequestKind,
     281         2343 :         delete_objects: &[ObjectIdentifier],
     282         2343 :     ) -> anyhow::Result<()> {
     283         2241 :         for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
     284         2239 :             let started_at = start_measuring_requests(kind);
     285              : 
     286         2239 :             let resp = self
     287         2239 :                 .client
     288         2239 :                 .delete_objects()
     289         2239 :                 .bucket(self.bucket_name.clone())
     290         2239 :                 .delete(
     291         2239 :                     Delete::builder()
     292         2239 :                         .set_objects(Some(chunk.to_vec()))
     293         2239 :                         .build()?,
     294              :                 )
     295         2239 :                 .send()
     296         9278 :                 .await;
     297              : 
     298         2239 :             let started_at = ScopeGuard::into_inner(started_at);
     299         2239 :             metrics::BUCKET_METRICS
     300         2239 :                 .req_seconds
     301         2239 :                 .observe_elapsed(kind, &resp, started_at);
     302              : 
     303         2239 :             let resp = resp?;
     304         2239 :             metrics::BUCKET_METRICS
     305         2239 :                 .deleted_objects_total
     306         2239 :                 .inc_by(chunk.len() as u64);
     307         2239 :             if let Some(errors) = resp.errors {
     308              :                 // Log a bounded number of the errors within the response:
     309              :                 // these requests can carry 1000 keys so logging each one
     310              :                 // would be too verbose, especially as errors may lead us
     311              :                 // to retry repeatedly.
     312              :                 const LOG_UP_TO_N_ERRORS: usize = 10;
     313            0 :                 for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
     314            0 :                     tracing::warn!(
     315            0 :                         "DeleteObjects key {} failed: {}: {}",
     316            0 :                         e.key.as_ref().map(Cow::from).unwrap_or("".into()),
     317            0 :                         e.code.as_ref().map(Cow::from).unwrap_or("".into()),
     318            0 :                         e.message.as_ref().map(Cow::from).unwrap_or("".into())
     319            0 :                     );
     320              :                 }
     321              : 
     322            0 :                 return Err(anyhow::format_err!(
     323            0 :                     "Failed to delete {} objects",
     324            0 :                     errors.len()
     325            0 :                 ));
     326         2239 :             }
     327              :         }
     328         2241 :         Ok(())
     329         2241 :     }
     330              : }
     331              : 
     332              : pin_project_lite::pin_project! {
     333              :     struct ByteStreamAsStream {
     334              :         #[pin]
     335              :         inner: aws_smithy_types::byte_stream::ByteStream
     336              :     }
     337              : }
     338              : 
     339              : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
     340        10560 :     fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
     341        10560 :         ByteStreamAsStream { inner }
     342        10560 :     }
     343              : }
     344              : 
     345              : impl Stream for ByteStreamAsStream {
     346              :     type Item = std::io::Result<Bytes>;
     347              : 
     348       127207 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     349       127207 :         // this does the std::io::ErrorKind::Other conversion
     350       127207 :         self.project().inner.poll_next(cx).map_err(|x| x.into())
     351       127207 :     }
     352              : 
     353              :     // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
     354              :     // sense and Stream::size_hint does not really
     355              : }
     356              : 
     357              : pin_project_lite::pin_project! {
     358              :     /// An `AsyncRead` adapter which carries a permit for the lifetime of the value.
     359              :     struct PermitCarrying<S> {
     360              :         permit: tokio::sync::OwnedSemaphorePermit,
     361              :         #[pin]
     362              :         inner: S,
     363              :     }
     364              : }
     365              : 
     366              : impl<S> PermitCarrying<S> {
     367        10540 :     fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self {
     368        10540 :         Self { permit, inner }
     369        10540 :     }
     370              : }
     371              : 
     372              : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for PermitCarrying<S> {
     373              :     type Item = <S as Stream>::Item;
     374              : 
     375       127161 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     376       127161 :         self.project().inner.poll_next(cx)
     377       127161 :     }
     378              : 
     379            0 :     fn size_hint(&self) -> (usize, Option<usize>) {
     380            0 :         self.inner.size_hint()
     381            0 :     }
     382              : }
     383              : 
     384              : pin_project_lite::pin_project! {
     385              :     /// Times and tracks the outcome of the request.
     386              :     struct TimedDownload<S> {
     387              :         started_at: std::time::Instant,
     388              :         outcome: metrics::AttemptOutcome,
     389              :         #[pin]
     390              :         inner: S
     391              :     }
     392              : 
     393              :     impl<S> PinnedDrop for TimedDownload<S> {
     394              :         fn drop(mut this: Pin<&mut Self>) {
     395              :             metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
     396              :         }
     397              :     }
     398              : }
     399              : 
     400              : impl<S> TimedDownload<S> {
     401        10540 :     fn new(started_at: std::time::Instant, inner: S) -> Self {
     402        10540 :         TimedDownload {
     403        10540 :             started_at,
     404        10540 :             outcome: metrics::AttemptOutcome::Cancelled,
     405        10540 :             inner,
     406        10540 :         }
     407        10540 :     }
     408              : }
     409              : 
     410              : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
     411              :     type Item = <S as Stream>::Item;
     412              : 
     413       127161 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     414       127161 :         use std::task::ready;
     415       127161 : 
     416       127161 :         let this = self.project();
     417              : 
     418       127161 :         let res = ready!(this.inner.poll_next(cx));
     419        61596 :         match &res {
     420        61596 :             Some(Ok(_)) => {}
     421            0 :             Some(Err(_)) => *this.outcome = metrics::AttemptOutcome::Err,
     422        24206 :             None => *this.outcome = metrics::AttemptOutcome::Ok,
     423              :         }
     424              : 
     425        85802 :         Poll::Ready(res)
     426       127161 :     }
     427              : 
     428            0 :     fn size_hint(&self) -> (usize, Option<usize>) {
     429            0 :         self.inner.size_hint()
     430            0 :     }
     431              : }
     432              : 
     433              : impl RemoteStorage for S3Bucket {
     434          534 :     async fn list(
     435          534 :         &self,
     436          534 :         prefix: Option<&RemotePath>,
     437          534 :         mode: ListingMode,
     438          534 :     ) -> Result<Listing, DownloadError> {
     439          512 :         let kind = RequestKind::List;
     440          512 :         let mut result = Listing::default();
     441          512 : 
     442          512 :         // get the passed prefix or if it is not set use prefix_in_bucket value
     443          512 :         let list_prefix = prefix
     444          512 :             .map(|p| self.relative_path_to_s3_object(p))
     445          512 :             .or_else(|| self.prefix_in_bucket.clone())
     446          512 :             .map(|mut p| {
     447              :                 // required to end with a separator
     448              :                 // otherwise request will return only the entry of a prefix
     449          512 :                 if matches!(mode, ListingMode::WithDelimiter)
     450          268 :                     && !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
     451          268 :                 {
     452          268 :                     p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
     453          268 :                 }
     454          512 :                 p
     455          512 :             });
     456          512 : 
     457          512 :         let mut continuation_token = None;
     458              : 
     459              :         loop {
     460          512 :             let _guard = self.permit(kind).await;
     461          512 :             let started_at = start_measuring_requests(kind);
     462          512 : 
     463          512 :             let mut request = self
     464          512 :                 .client
     465          512 :                 .list_objects_v2()
     466          512 :                 .bucket(self.bucket_name.clone())
     467          512 :                 .set_prefix(list_prefix.clone())
     468          512 :                 .set_continuation_token(continuation_token)
     469          512 :                 .set_max_keys(self.max_keys_per_list_response);
     470          512 : 
     471          512 :             if let ListingMode::WithDelimiter = mode {
     472          268 :                 request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
     473          268 :             }
     474              : 
     475          512 :             let response = request
     476          512 :                 .send()
     477         2643 :                 .await
     478          512 :                 .context("Failed to list S3 prefixes")
     479          512 :                 .map_err(DownloadError::Other);
     480          512 : 
     481          512 :             let started_at = ScopeGuard::into_inner(started_at);
     482          512 : 
     483          512 :             metrics::BUCKET_METRICS
     484          512 :                 .req_seconds
     485          512 :                 .observe_elapsed(kind, &response, started_at);
     486              : 
     487          512 :             let response = response?;
     488              : 
     489          512 :             let keys = response.contents();
     490          512 :             let empty = Vec::new();
     491          512 :             let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty);
     492              : 
     493            0 :             tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
     494              : 
     495         2930 :             for object in keys {
     496         2418 :                 let object_path = object.key().expect("response does not contain a key");
     497         2418 :                 let remote_path = self.s3_object_to_relative_path(object_path);
     498         2418 :                 result.keys.push(remote_path);
     499         2418 :             }
     500              : 
     501          512 :             result.prefixes.extend(
     502          512 :                 prefixes
     503          512 :                     .iter()
     504          512 :                     .filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))),
     505          512 :             );
     506              : 
     507          512 :             continuation_token = match response.next_continuation_token {
     508            0 :                 Some(new_token) => Some(new_token),
     509          512 :                 None => break,
     510          512 :             };
     511          512 :         }
     512          512 : 
     513          512 :         Ok(result)
     514          512 :     }
     515              : 
     516        16044 :     async fn upload(
     517        16044 :         &self,
     518        16044 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     519        16044 :         from_size_bytes: usize,
     520        16044 :         to: &RemotePath,
     521        16044 :         metadata: Option<StorageMetadata>,
     522        16044 :     ) -> anyhow::Result<()> {
     523        16044 :         let kind = RequestKind::Put;
     524        16044 :         let _guard = self.permit(kind).await;
     525              : 
     526        16044 :         let started_at = start_measuring_requests(kind);
     527        16044 : 
     528        16044 :         let body = Body::wrap_stream(from);
     529        16044 :         let bytes_stream = ByteStream::new(SdkBody::from_body_0_4(body));
     530              : 
     531        16044 :         let res = self
     532        16044 :             .client
     533        16044 :             .put_object()
     534        16044 :             .bucket(self.bucket_name.clone())
     535        16044 :             .key(self.relative_path_to_s3_object(to))
     536        16044 :             .set_metadata(metadata.map(|m| m.0))
     537        16044 :             .content_length(from_size_bytes.try_into()?)
     538        16044 :             .body(bytes_stream)
     539        16044 :             .send()
     540        51926 :             .await;
     541              : 
     542        16043 :         let started_at = ScopeGuard::into_inner(started_at);
     543        16043 :         metrics::BUCKET_METRICS
     544        16043 :             .req_seconds
     545        16043 :             .observe_elapsed(kind, &res, started_at);
     546        16043 : 
     547        16043 :         res?;
     548              : 
     549        16043 :         Ok(())
     550        16043 :     }
     551              : 
     552           14 :     async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
     553           12 :         let kind = RequestKind::Copy;
     554           12 :         let _guard = self.permit(kind).await;
     555              : 
     556           12 :         let started_at = start_measuring_requests(kind);
     557           12 : 
     558           12 :         // we need to specify bucket_name as a prefix
     559           12 :         let copy_source = format!(
     560           12 :             "{}/{}",
     561           12 :             self.bucket_name,
     562           12 :             self.relative_path_to_s3_object(from)
     563           12 :         );
     564              : 
     565           12 :         let res = self
     566           12 :             .client
     567           12 :             .copy_object()
     568           12 :             .bucket(self.bucket_name.clone())
     569           12 :             .key(self.relative_path_to_s3_object(to))
     570           12 :             .copy_source(copy_source)
     571           12 :             .send()
     572           48 :             .await;
     573              : 
     574           12 :         let started_at = ScopeGuard::into_inner(started_at);
     575           12 :         metrics::BUCKET_METRICS
     576           12 :             .req_seconds
     577           12 :             .observe_elapsed(kind, &res, started_at);
     578           12 : 
     579           12 :         res?;
     580              : 
     581           12 :         Ok(())
     582           12 :     }
     583              : 
     584        10718 :     async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
     585        10708 :         // if prefix is not none then download file `prefix/from`
     586        10708 :         // if prefix is none then download file `from`
     587        10708 :         self.download_object(GetObjectRequest {
     588        10708 :             bucket: self.bucket_name.clone(),
     589        10708 :             key: self.relative_path_to_s3_object(from),
     590        10708 :             range: None,
     591        10708 :         })
     592        31469 :         .await
     593        10703 :     }
     594              : 
     595           46 :     async fn download_byte_range(
     596           46 :         &self,
     597           46 :         from: &RemotePath,
     598           46 :         start_inclusive: u64,
     599           46 :         end_exclusive: Option<u64>,
     600           46 :     ) -> Result<Download, DownloadError> {
     601           36 :         // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
     602           36 :         // and needs both ends to be exclusive
     603           36 :         let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
     604           36 :         let range = Some(match end_inclusive {
     605            0 :             Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
     606           36 :             None => format!("bytes={start_inclusive}-"),
     607              :         });
     608              : 
     609           36 :         self.download_object(GetObjectRequest {
     610           36 :             bucket: self.bucket_name.clone(),
     611           36 :             key: self.relative_path_to_s3_object(from),
     612           36 :             range,
     613           36 :         })
     614          107 :         .await
     615           36 :     }
     616         2339 :     async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
     617         2241 :         let kind = RequestKind::Delete;
     618         2241 :         let _guard = self.permit(kind).await;
     619              : 
     620         2241 :         let mut delete_objects = Vec::with_capacity(paths.len());
     621         8963 :         for path in paths {
     622         6722 :             let obj_id = ObjectIdentifier::builder()
     623         6722 :                 .set_key(Some(self.relative_path_to_s3_object(path)))
     624         6722 :                 .build()?;
     625         6722 :             delete_objects.push(obj_id);
     626              :         }
     627              : 
     628         9278 :         self.delete_oids(kind, &delete_objects).await
     629         2241 :     }
     630              : 
     631         2106 :     async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
     632         2016 :         let paths = std::array::from_ref(path);
     633         8257 :         self.delete_objects(paths).await
     634         2016 :     }
     635              : 
     636            7 :     async fn time_travel_recover(
     637            7 :         &self,
     638            7 :         prefix: Option<&RemotePath>,
     639            7 :         timestamp: SystemTime,
     640            7 :         done_if_after: SystemTime,
     641            7 :         cancel: &CancellationToken,
     642            7 :     ) -> Result<(), TimeTravelError> {
     643            1 :         let kind = RequestKind::TimeTravel;
     644            1 :         let _guard = self.permit(kind).await;
     645              : 
     646            1 :         let timestamp = DateTime::from(timestamp);
     647            1 :         let done_if_after = DateTime::from(done_if_after);
     648              : 
     649            0 :         tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
     650              : 
     651              :         // get the passed prefix or if it is not set use prefix_in_bucket value
     652            1 :         let prefix = prefix
     653            1 :             .map(|p| self.relative_path_to_s3_object(p))
     654            1 :             .or_else(|| self.prefix_in_bucket.clone());
     655            1 : 
     656            1 :         let warn_threshold = 3;
     657            1 :         let max_retries = 10;
     658            1 :         let is_permanent = |_e: &_| false;
     659              : 
     660            1 :         let mut key_marker = None;
     661            1 :         let mut version_id_marker = None;
     662            1 :         let mut versions_and_deletes = Vec::new();
     663              : 
     664              :         loop {
     665            1 :             let response = backoff::retry(
     666            1 :                 || async {
     667            1 :                     self.client
     668            1 :                         .list_object_versions()
     669            1 :                         .bucket(self.bucket_name.clone())
     670            1 :                         .set_prefix(prefix.clone())
     671            1 :                         .set_key_marker(key_marker.clone())
     672            1 :                         .set_version_id_marker(version_id_marker.clone())
     673            1 :                         .send()
     674           40 :                         .await
     675            1 :                         .map_err(|e| TimeTravelError::Other(e.into()))
     676            1 :                 },
     677            1 :                 is_permanent,
     678            1 :                 warn_threshold,
     679            1 :                 max_retries,
     680            1 :                 "listing object versions for time_travel_recover",
     681            1 :                 cancel,
     682            1 :             )
     683           40 :             .await
     684            1 :             .ok_or_else(|| TimeTravelError::Cancelled)
     685            1 :             .and_then(|x| x)?;
     686              : 
     687            0 :             tracing::trace!(
     688            0 :                 "  Got List response version_id_marker={:?}, key_marker={:?}",
     689            0 :                 response.version_id_marker,
     690            0 :                 response.key_marker
     691            0 :             );
     692            1 :             let versions = response
     693            1 :                 .versions
     694            1 :                 .unwrap_or_default()
     695            1 :                 .into_iter()
     696            1 :                 .map(VerOrDelete::from_version);
     697            1 :             let deletes = response
     698            1 :                 .delete_markers
     699            1 :                 .unwrap_or_default()
     700            1 :                 .into_iter()
     701            1 :                 .map(VerOrDelete::from_delete_marker);
     702            1 :             itertools::process_results(versions.chain(deletes), |n_vds| {
     703            1 :                 versions_and_deletes.extend(n_vds)
     704            1 :             })
     705            1 :             .map_err(TimeTravelError::Other)?;
     706           14 :             fn none_if_empty(v: Option<String>) -> Option<String> {
     707           14 :                 v.filter(|v| !v.is_empty())
     708           14 :             }
     709            1 :             version_id_marker = none_if_empty(response.next_version_id_marker);
     710            1 :             key_marker = none_if_empty(response.next_key_marker);
     711            1 :             if version_id_marker.is_none() {
     712              :                 // The final response is not supposed to be truncated
     713            1 :                 if response.is_truncated.unwrap_or_default() {
     714            0 :                     return Err(TimeTravelError::Other(anyhow::anyhow!(
     715            0 :                         "Received truncated ListObjectVersions response for prefix={prefix:?}"
     716            0 :                     )));
     717            1 :                 }
     718            1 :                 break;
     719            0 :             }
     720            0 :             // Limit the number of versions deletions, mostly so that we don't
     721            0 :             // keep requesting forever if the list is too long, as we'd put the
     722            0 :             // list in RAM.
     723            0 :             // Building a list of 100k entries that reaches the limit roughly takes
     724            0 :             // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
     725            0 :             const COMPLEXITY_LIMIT: usize = 100_000;
     726            0 :             if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
     727            0 :                 return Err(TimeTravelError::TooManyVersions);
     728            0 :             }
     729              :         }
     730              : 
     731            1 :         tracing::info!(
     732            1 :             "Built list for time travel with {} versions and deletions",
     733            1 :             versions_and_deletes.len()
     734            1 :         );
     735              : 
     736              :         // Work on the list of references instead of the objects directly,
     737              :         // otherwise we get lifetime errors in the sort_by_key call below.
     738            1 :         let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
     739            1 : 
     740         2276 :         versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
     741            1 : 
     742            1 :         let mut vds_for_key = HashMap::<_, Vec<_>>::new();
     743              : 
     744          268 :         for vd in &versions_and_deletes {
     745              :             let VerOrDelete {
     746          267 :                 version_id, key, ..
     747          267 :             } = &vd;
     748          267 :             if version_id == "null" {
     749            0 :                 return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \
     750            0 :                     indicating either disabled versioning, or legacy objects with null version id values")));
     751          267 :             }
     752            0 :             tracing::trace!(
     753            0 :                 "Parsing version key={key} version_id={version_id} kind={:?}",
     754            0 :                 vd.kind
     755            0 :             );
     756              : 
     757          267 :             vds_for_key.entry(key).or_default().push(vd);
     758              :         }
     759           93 :         for (key, versions) in vds_for_key {
     760           92 :             let last_vd = versions.last().unwrap();
     761           92 :             if last_vd.last_modified > done_if_after {
     762            0 :                 tracing::trace!("Key {key} has version later than done_if_after, skipping");
     763            0 :                 continue;
     764           92 :             }
     765              :             // the version we want to restore to.
     766           92 :             let version_to_restore_to =
     767          188 :                 match versions.binary_search_by_key(&timestamp, |tpl| tpl.last_modified) {
     768            0 :                     Ok(v) => v,
     769           92 :                     Err(e) => e,
     770              :                 };
     771           92 :             if version_to_restore_to == versions.len() {
     772            0 :                 tracing::trace!("Key {key} has no changes since timestamp, skipping");
     773            0 :                 continue;
     774           92 :             }
     775           92 :             let mut do_delete = false;
     776           92 :             if version_to_restore_to == 0 {
     777              :                 // All versions more recent, so the key didn't exist at the specified time point.
     778            0 :                 tracing::trace!(
     779            0 :                     "All {} versions more recent for {key}, deleting",
     780            0 :                     versions.len()
     781            0 :                 );
     782            3 :                 do_delete = true;
     783              :             } else {
     784           89 :                 match &versions[version_to_restore_to - 1] {
     785              :                     VerOrDelete {
     786              :                         kind: VerOrDeleteKind::Version,
     787           89 :                         version_id,
     788              :                         ..
     789              :                     } => {
     790            0 :                         tracing::trace!("Copying old version {version_id} for {key}...");
     791              :                         // Restore the state to the last version by copying
     792           89 :                         let source_id =
     793           89 :                             format!("{}/{key}?versionId={version_id}", self.bucket_name);
     794           89 : 
     795           89 :                         backoff::retry(
     796           89 :                             || async {
     797           89 :                                 self.client
     798           89 :                                     .copy_object()
     799           89 :                                     .bucket(self.bucket_name.clone())
     800           89 :                                     .key(key)
     801           89 :                                     .copy_source(&source_id)
     802           89 :                                     .send()
     803           91 :                                     .await
     804           89 :                                     .map_err(|e| TimeTravelError::Other(e.into()))
     805           89 :                             },
     806           89 :                             is_permanent,
     807           89 :                             warn_threshold,
     808           89 :                             max_retries,
     809           89 :                             "copying object version for time_travel_recover",
     810           89 :                             cancel,
     811           89 :                         )
     812           91 :                         .await
     813           89 :                         .ok_or_else(|| TimeTravelError::Cancelled)
     814           89 :                         .and_then(|x| x)?;
     815           89 :                         tracing::info!(%version_id, %key, "Copied old version in S3");
     816              :                     }
     817              :                     VerOrDelete {
     818              :                         kind: VerOrDeleteKind::DeleteMarker,
     819              :                         ..
     820            0 :                     } => {
     821            0 :                         do_delete = true;
     822            0 :                     }
     823              :                 }
     824              :             };
     825           92 :             if do_delete {
     826            3 :                 if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
     827              :                     // Key has since been deleted (but there was some history), no need to do anything
     828            0 :                     tracing::trace!("Key {key} already deleted, skipping.");
     829              :                 } else {
     830            0 :                     tracing::trace!("Deleting {key}...");
     831              : 
     832            0 :                     let oid = ObjectIdentifier::builder()
     833            0 :                         .key(key.to_owned())
     834            0 :                         .build()
     835            0 :                         .map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?;
     836            0 :                     self.delete_oids(kind, &[oid])
     837            0 :                         .await
     838            0 :                         .map_err(TimeTravelError::Other)?;
     839              :                 }
     840           89 :             }
     841              :         }
     842            1 :         Ok(())
     843            1 :     }
     844              : }
     845              : 
     846              : /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].
     847        29813 : fn start_counting_cancelled_wait(
     848        29813 :     kind: RequestKind,
     849        29813 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
     850        29813 :     scopeguard::guard_on_success(std::time::Instant::now(), move |_| {
     851            0 :         metrics::BUCKET_METRICS.cancelled_waits.get(kind).inc()
     852        29813 :     })
     853        29813 : }
     854              : 
     855              : /// On drop (cancellation) add time to [`metrics::BucketMetrics::req_seconds`].
     856        29808 : fn start_measuring_requests(
     857        29808 :     kind: RequestKind,
     858        29808 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
     859        29808 :     scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| {
     860            5 :         metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     861            5 :             kind,
     862            5 :             AttemptOutcome::Cancelled,
     863            5 :             started_at,
     864            5 :         )
     865        29808 :     })
     866        29808 : }
     867              : 
     868              : // Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
     869              : struct VerOrDelete {
     870              :     kind: VerOrDeleteKind,
     871              :     last_modified: DateTime,
     872              :     version_id: String,
     873              :     key: String,
     874              : }
     875              : 
     876            0 : #[derive(Debug)]
     877              : enum VerOrDeleteKind {
     878              :     Version,
     879              :     DeleteMarker,
     880              : }
     881              : 
     882              : impl VerOrDelete {
     883          303 :     fn with_kind(
     884          303 :         kind: VerOrDeleteKind,
     885          303 :         last_modified: Option<DateTime>,
     886          303 :         version_id: Option<String>,
     887          303 :         key: Option<String>,
     888          303 :     ) -> anyhow::Result<Self> {
     889          303 :         let lvk = (last_modified, version_id, key);
     890          303 :         let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
     891            0 :             anyhow::bail!(
     892            0 :                 "One (or more) of last_modified, key, and id is None. \
     893            0 :             Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
     894            0 :                 lvk.0,
     895            0 :                 lvk.1,
     896            0 :                 lvk.2,
     897            0 :             );
     898              :         };
     899          303 :         Ok(Self {
     900          303 :             kind,
     901          303 :             last_modified,
     902          303 :             version_id,
     903          303 :             key,
     904          303 :         })
     905          303 :     }
     906          163 :     fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
     907          163 :         Self::with_kind(
     908          163 :             VerOrDeleteKind::Version,
     909          163 :             v.last_modified,
     910          163 :             v.version_id,
     911          163 :             v.key,
     912          163 :         )
     913          163 :     }
     914          140 :     fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
     915          140 :         Self::with_kind(
     916          140 :             VerOrDeleteKind::DeleteMarker,
     917          140 :             v.last_modified,
     918          140 :             v.version_id,
     919          140 :             v.key,
     920          140 :         )
     921          140 :     }
     922              : }
     923              : 
     924              : #[cfg(test)]
     925              : mod tests {
     926              :     use camino::Utf8Path;
     927              :     use std::num::NonZeroUsize;
     928              : 
     929              :     use crate::{RemotePath, S3Bucket, S3Config};
     930              : 
     931            2 :     #[test]
     932            2 :     fn relative_path() {
     933            2 :         let all_paths = ["", "some/path", "some/path/"];
     934            2 :         let all_paths: Vec<RemotePath> = all_paths
     935            2 :             .iter()
     936            6 :             .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
     937            2 :             .collect();
     938            2 :         let prefixes = [
     939            2 :             None,
     940            2 :             Some(""),
     941            2 :             Some("test/prefix"),
     942            2 :             Some("test/prefix/"),
     943            2 :             Some("/test/prefix/"),
     944            2 :         ];
     945            2 :         let expected_outputs = vec![
     946            2 :             vec!["", "some/path", "some/path"],
     947            2 :             vec!["/", "/some/path", "/some/path"],
     948            2 :             vec![
     949            2 :                 "test/prefix/",
     950            2 :                 "test/prefix/some/path",
     951            2 :                 "test/prefix/some/path",
     952            2 :             ],
     953            2 :             vec![
     954            2 :                 "test/prefix/",
     955            2 :                 "test/prefix/some/path",
     956            2 :                 "test/prefix/some/path",
     957            2 :             ],
     958            2 :             vec![
     959            2 :                 "test/prefix/",
     960            2 :                 "test/prefix/some/path",
     961            2 :                 "test/prefix/some/path",
     962            2 :             ],
     963            2 :         ];
     964              : 
     965           10 :         for (prefix_idx, prefix) in prefixes.iter().enumerate() {
     966           10 :             let config = S3Config {
     967           10 :                 bucket_name: "bucket".to_owned(),
     968           10 :                 bucket_region: "region".to_owned(),
     969           10 :                 prefix_in_bucket: prefix.map(str::to_string),
     970           10 :                 endpoint: None,
     971           10 :                 concurrency_limit: NonZeroUsize::new(100).unwrap(),
     972           10 :                 max_keys_per_list_response: Some(5),
     973           10 :             };
     974           10 :             let storage = S3Bucket::new(&config).expect("remote storage init");
     975           30 :             for (test_path_idx, test_path) in all_paths.iter().enumerate() {
     976           30 :                 let result = storage.relative_path_to_s3_object(test_path);
     977           30 :                 let expected = expected_outputs[prefix_idx][test_path_idx];
     978           30 :                 assert_eq!(result, expected);
     979              :             }
     980              :         }
     981            2 :     }
     982              : }
        

Generated by: LCOV version 2.1-beta