LCOV - code coverage report
Current view: top level - libs/remote_storage/src - s3_bucket.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 86.9 % 710 617
Test Date: 2024-02-14 18:05:35 Functions: 63.8 % 94 60

            Line data    Source code
       1              : //! AWS S3 storage wrapper around `rusoto` library.
       2              : //!
       3              : //! Respects `prefix_in_bucket` property from [`S3Config`],
       4              : //! allowing multiple api users to independently work with the same S3 bucket, if
       5              : //! their bucket prefixes are both specified and different.
       6              : 
       7              : use std::{
       8              :     borrow::Cow,
       9              :     collections::HashMap,
      10              :     num::NonZeroU32,
      11              :     pin::Pin,
      12              :     sync::Arc,
      13              :     task::{Context, Poll},
      14              :     time::SystemTime,
      15              : };
      16              : 
      17              : use anyhow::{anyhow, Context as _};
      18              : use aws_config::{
      19              :     environment::credentials::EnvironmentVariableCredentialsProvider,
      20              :     imds::credentials::ImdsCredentialsProvider,
      21              :     meta::credentials::CredentialsProviderChain,
      22              :     profile::ProfileFileCredentialsProvider,
      23              :     provider_config::ProviderConfig,
      24              :     retry::{RetryConfigBuilder, RetryMode},
      25              :     web_identity_token::WebIdentityTokenCredentialsProvider,
      26              :     BehaviorVersion,
      27              : };
      28              : use aws_credential_types::provider::SharedCredentialsProvider;
      29              : use aws_sdk_s3::{
      30              :     config::{AsyncSleep, Builder, IdentityCache, Region, SharedAsyncSleep},
      31              :     error::SdkError,
      32              :     operation::get_object::GetObjectError,
      33              :     types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion},
      34              :     Client,
      35              : };
      36              : use aws_smithy_async::rt::sleep::TokioSleep;
      37              : 
      38              : use aws_smithy_types::byte_stream::ByteStream;
      39              : use aws_smithy_types::{body::SdkBody, DateTime};
      40              : use bytes::Bytes;
      41              : use futures::stream::Stream;
      42              : use hyper::Body;
      43              : use scopeguard::ScopeGuard;
      44              : use tokio_util::sync::CancellationToken;
      45              : use utils::backoff;
      46              : 
      47              : use super::StorageMetadata;
      48              : use crate::{
      49              :     support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode,
      50              :     RemotePath, RemoteStorage, S3Config, TimeTravelError, MAX_KEYS_PER_DELETE,
      51              :     REMOTE_STORAGE_PREFIX_SEPARATOR,
      52              : };
      53              : 
      54              : pub(super) mod metrics;
      55              : 
      56              : use self::metrics::AttemptOutcome;
      57              : pub(super) use self::metrics::RequestKind;
      58              : 
      59              : /// AWS S3 storage.
      60              : pub struct S3Bucket {
      61              :     client: Client,
      62              :     bucket_name: String,
      63              :     prefix_in_bucket: Option<String>,
      64              :     max_keys_per_list_response: Option<i32>,
      65              :     concurrency_limiter: ConcurrencyLimiter,
      66              : }
      67              : 
      68              : struct GetObjectRequest {
      69              :     bucket: String,
      70              :     key: String,
      71              :     range: Option<String>,
      72              : }
      73              : impl S3Bucket {
      74              :     /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
      75          294 :     pub fn new(aws_config: &S3Config) -> anyhow::Result<Self> {
      76          294 :         tracing::debug!(
      77            0 :             "Creating s3 remote storage for S3 bucket {}",
      78            0 :             aws_config.bucket_name
      79            0 :         );
      80              : 
      81          294 :         let region = Some(Region::new(aws_config.bucket_region.clone()));
      82          294 : 
      83          294 :         let provider_conf = ProviderConfig::without_region().with_region(region.clone());
      84          294 : 
      85          294 :         let credentials_provider = {
      86          294 :             // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
      87          294 :             CredentialsProviderChain::first_try(
      88          294 :                 "env",
      89          294 :                 EnvironmentVariableCredentialsProvider::new(),
      90          294 :             )
      91          294 :             // uses "AWS_PROFILE" / `aws sso login --profile <profile>`
      92          294 :             .or_else(
      93          294 :                 "profile-sso",
      94          294 :                 ProfileFileCredentialsProvider::builder()
      95          294 :                     .configure(&provider_conf)
      96          294 :                     .build(),
      97          294 :             )
      98          294 :             // uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
      99          294 :             // needed to access remote extensions bucket
     100          294 :             .or_else(
     101          294 :                 "token",
     102          294 :                 WebIdentityTokenCredentialsProvider::builder()
     103          294 :                     .configure(&provider_conf)
     104          294 :                     .build(),
     105          294 :             )
     106          294 :             // uses imds v2
     107          294 :             .or_else("imds", ImdsCredentialsProvider::builder().build())
     108          294 :         };
     109          294 : 
     110          294 :         // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
     111          294 :         let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
     112          294 : 
     113          294 :         // We do our own retries (see [`backoff::retry`]).  However, for the AWS SDK to enable rate limiting in response to throttling
     114          294 :         // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config.  We set it to use at most one
     115          294 :         // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
     116          294 :         let mut retry_config = RetryConfigBuilder::new();
     117          294 :         retry_config
     118          294 :             .set_max_attempts(Some(1))
     119          294 :             .set_mode(Some(RetryMode::Adaptive));
     120          294 : 
     121          294 :         let mut config_builder = Builder::default()
     122          294 :             .behavior_version(BehaviorVersion::v2023_11_09())
     123          294 :             .region(region)
     124          294 :             .identity_cache(IdentityCache::lazy().build())
     125          294 :             .credentials_provider(SharedCredentialsProvider::new(credentials_provider))
     126          294 :             .retry_config(retry_config.build())
     127          294 :             .sleep_impl(SharedAsyncSleep::from(sleep_impl));
     128              : 
     129          294 :         if let Some(custom_endpoint) = aws_config.endpoint.clone() {
     130          146 :             config_builder = config_builder
     131          146 :                 .endpoint_url(custom_endpoint)
     132          146 :                 .force_path_style(true);
     133          148 :         }
     134              : 
     135          294 :         let client = Client::from_conf(config_builder.build());
     136          294 : 
     137          294 :         let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| {
     138          292 :             let mut prefix = prefix;
     139          294 :             while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     140            2 :                 prefix = &prefix[1..]
     141              :             }
     142              : 
     143          292 :             let mut prefix = prefix.to_string();
     144          310 :             while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     145           18 :                 prefix.pop();
     146           18 :             }
     147          292 :             prefix
     148          294 :         });
     149          294 :         Ok(Self {
     150          294 :             client,
     151          294 :             bucket_name: aws_config.bucket_name.clone(),
     152          294 :             max_keys_per_list_response: aws_config.max_keys_per_list_response,
     153          294 :             prefix_in_bucket,
     154          294 :             concurrency_limiter: ConcurrencyLimiter::new(aws_config.concurrency_limit.get()),
     155          294 :         })
     156          294 :     }
     157              : 
     158         2607 :     fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
     159         2607 :         let relative_path =
     160         2607 :             match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
     161         2607 :                 Some(stripped) => stripped,
     162              :                 // we rely on AWS to return properly prefixed paths
     163              :                 // for requests with a certain prefix
     164            0 :                 None => panic!(
     165            0 :                     "Key {} does not start with bucket prefix {:?}",
     166            0 :                     key, self.prefix_in_bucket
     167            0 :                 ),
     168              :             };
     169         2607 :         RemotePath(
     170         2607 :             relative_path
     171         2607 :                 .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     172         2607 :                 .collect(),
     173         2607 :         )
     174         2607 :     }
     175              : 
     176        34298 :     pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
     177        34298 :         assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
     178        34298 :         let path_string = path
     179        34298 :             .get_path()
     180        34298 :             .as_str()
     181        34298 :             .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR);
     182        34298 :         match &self.prefix_in_bucket {
     183        34292 :             Some(prefix) => prefix.clone() + "/" + path_string,
     184            6 :             None => path_string.to_string(),
     185              :         }
     186        34298 :     }
     187              : 
     188        19248 :     async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> {
     189        19006 :         let started_at = start_counting_cancelled_wait(kind);
     190        19006 :         let permit = self
     191        19006 :             .concurrency_limiter
     192        19006 :             .acquire(kind)
     193         4143 :             .await
     194        19006 :             .expect("semaphore is never closed");
     195        19006 : 
     196        19006 :         let started_at = ScopeGuard::into_inner(started_at);
     197        19006 :         metrics::BUCKET_METRICS
     198        19006 :             .wait_seconds
     199        19006 :             .observe_elapsed(kind, started_at);
     200        19006 : 
     201        19006 :         permit
     202        19006 :     }
     203              : 
     204        10485 :     async fn owned_permit(&self, kind: RequestKind) -> tokio::sync::OwnedSemaphorePermit {
     205        10463 :         let started_at = start_counting_cancelled_wait(kind);
     206        10463 :         let permit = self
     207        10463 :             .concurrency_limiter
     208        10463 :             .acquire_owned(kind)
     209            0 :             .await
     210        10463 :             .expect("semaphore is never closed");
     211        10463 : 
     212        10463 :         let started_at = ScopeGuard::into_inner(started_at);
     213        10463 :         metrics::BUCKET_METRICS
     214        10463 :             .wait_seconds
     215        10463 :             .observe_elapsed(kind, started_at);
     216        10463 :         permit
     217        10463 :     }
     218              : 
     219        10485 :     async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
     220        10463 :         let kind = RequestKind::Get;
     221        10463 :         let permit = self.owned_permit(kind).await;
     222              : 
     223        10463 :         let started_at = start_measuring_requests(kind);
     224              : 
     225        10463 :         let get_object = self
     226        10463 :             .client
     227        10463 :             .get_object()
     228        10463 :             .bucket(request.bucket)
     229        10463 :             .key(request.key)
     230        10463 :             .set_range(request.range)
     231        10463 :             .send()
     232        30427 :             .await;
     233              : 
     234        10460 :         let started_at = ScopeGuard::into_inner(started_at);
     235              : 
     236        10252 :         let object_output = match get_object {
     237        10252 :             Ok(object_output) => object_output,
     238          208 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
     239              :                 // Count this in the AttemptOutcome::Ok bucket, because 404 is not
     240              :                 // an error: we expect to sometimes fetch an object and find it missing,
     241              :                 // e.g. when probing for timeline indices.
     242          208 :                 metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     243          208 :                     kind,
     244          208 :                     AttemptOutcome::Ok,
     245          208 :                     started_at,
     246          208 :                 );
     247          208 :                 return Err(DownloadError::NotFound);
     248              :             }
     249            0 :             Err(e) => {
     250            0 :                 metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     251            0 :                     kind,
     252            0 :                     AttemptOutcome::Err,
     253            0 :                     started_at,
     254            0 :                 );
     255            0 : 
     256            0 :                 return Err(DownloadError::Other(
     257            0 :                     anyhow::Error::new(e).context("download s3 object"),
     258            0 :                 ));
     259              :             }
     260              :         };
     261              : 
     262        10252 :         let metadata = object_output.metadata().cloned().map(StorageMetadata);
     263        10252 :         let etag = object_output.e_tag;
     264        10252 :         let last_modified = object_output.last_modified.and_then(|t| t.try_into().ok());
     265        10252 : 
     266        10252 :         let body = object_output.body;
     267        10252 :         let body = ByteStreamAsStream::from(body);
     268        10252 :         let body = PermitCarrying::new(permit, body);
     269        10252 :         let body = TimedDownload::new(started_at, body);
     270        10252 : 
     271        10252 :         Ok(Download {
     272        10252 :             metadata,
     273        10252 :             etag,
     274        10252 :             last_modified,
     275        10252 :             download_stream: Box::pin(body),
     276        10252 :         })
     277        10460 :     }
     278              : 
     279         2343 :     async fn delete_oids(
     280         2343 :         &self,
     281         2343 :         kind: RequestKind,
     282         2343 :         delete_objects: &[ObjectIdentifier],
     283         2343 :     ) -> anyhow::Result<()> {
     284         2241 :         for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
     285         2241 :             let started_at = start_measuring_requests(kind);
     286              : 
     287         2241 :             let resp = self
     288         2241 :                 .client
     289         2241 :                 .delete_objects()
     290         2241 :                 .bucket(self.bucket_name.clone())
     291         2241 :                 .delete(
     292         2241 :                     Delete::builder()
     293         2241 :                         .set_objects(Some(chunk.to_vec()))
     294         2241 :                         .build()?,
     295              :                 )
     296         2241 :                 .send()
     297         9319 :                 .await;
     298              : 
     299         2241 :             let started_at = ScopeGuard::into_inner(started_at);
     300         2241 :             metrics::BUCKET_METRICS
     301         2241 :                 .req_seconds
     302         2241 :                 .observe_elapsed(kind, &resp, started_at);
     303              : 
     304         2241 :             let resp = resp?;
     305         2241 :             metrics::BUCKET_METRICS
     306         2241 :                 .deleted_objects_total
     307         2241 :                 .inc_by(chunk.len() as u64);
     308         2241 :             if let Some(errors) = resp.errors {
     309              :                 // Log a bounded number of the errors within the response:
     310              :                 // these requests can carry 1000 keys so logging each one
     311              :                 // would be too verbose, especially as errors may lead us
     312              :                 // to retry repeatedly.
     313              :                 const LOG_UP_TO_N_ERRORS: usize = 10;
     314            0 :                 for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
     315            0 :                     tracing::warn!(
     316            0 :                         "DeleteObjects key {} failed: {}: {}",
     317            0 :                         e.key.as_ref().map(Cow::from).unwrap_or("".into()),
     318            0 :                         e.code.as_ref().map(Cow::from).unwrap_or("".into()),
     319            0 :                         e.message.as_ref().map(Cow::from).unwrap_or("".into())
     320            0 :                     );
     321              :                 }
     322              : 
     323            0 :                 return Err(anyhow::format_err!(
     324            0 :                     "Failed to delete {} objects",
     325            0 :                     errors.len()
     326            0 :                 ));
     327         2241 :             }
     328              :         }
     329         2241 :         Ok(())
     330         2241 :     }
     331              : }
     332              : 
     333              : pin_project_lite::pin_project! {
     334              :     struct ByteStreamAsStream {
     335              :         #[pin]
     336              :         inner: aws_smithy_types::byte_stream::ByteStream
     337              :     }
     338              : }
     339              : 
     340              : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
     341        10272 :     fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
     342        10272 :         ByteStreamAsStream { inner }
     343        10272 :     }
     344              : }
     345              : 
     346              : impl Stream for ByteStreamAsStream {
     347              :     type Item = std::io::Result<Bytes>;
     348              : 
     349       164238 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     350       164238 :         // this does the std::io::ErrorKind::Other conversion
     351       164238 :         self.project().inner.poll_next(cx).map_err(|x| x.into())
     352       164238 :     }
     353              : 
     354              :     // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
     355              :     // sense and Stream::size_hint does not really
     356              : }
     357              : 
     358              : pin_project_lite::pin_project! {
     359              :     /// Times and tracks the outcome of the request.
     360              :     struct TimedDownload<S> {
     361              :         started_at: std::time::Instant,
     362              :         outcome: metrics::AttemptOutcome,
     363              :         #[pin]
     364              :         inner: S
     365              :     }
     366              : 
     367              :     impl<S> PinnedDrop for TimedDownload<S> {
     368              :         fn drop(mut this: Pin<&mut Self>) {
     369              :             metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
     370              :         }
     371              :     }
     372              : }
     373              : 
     374              : impl<S> TimedDownload<S> {
     375        10252 :     fn new(started_at: std::time::Instant, inner: S) -> Self {
     376        10252 :         TimedDownload {
     377        10252 :             started_at,
     378        10252 :             outcome: metrics::AttemptOutcome::Cancelled,
     379        10252 :             inner,
     380        10252 :         }
     381        10252 :     }
     382              : }
     383              : 
     384              : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
     385              :     type Item = <S as Stream>::Item;
     386              : 
     387       164202 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     388       164202 :         use std::task::ready;
     389       164202 : 
     390       164202 :         let this = self.project();
     391              : 
     392       164202 :         let res = ready!(this.inner.poll_next(cx));
     393        80382 :         match &res {
     394        80382 :             Some(Ok(_)) => {}
     395            0 :             Some(Err(_)) => *this.outcome = metrics::AttemptOutcome::Err,
     396        23645 :             None => *this.outcome = metrics::AttemptOutcome::Ok,
     397              :         }
     398              : 
     399       104027 :         Poll::Ready(res)
     400       164202 :     }
     401              : 
     402            0 :     fn size_hint(&self) -> (usize, Option<usize>) {
     403            0 :         self.inner.size_hint()
     404            0 :     }
     405              : }
     406              : 
     407              : impl RemoteStorage for S3Bucket {
     408          561 :     async fn list(
     409          561 :         &self,
     410          561 :         prefix: Option<&RemotePath>,
     411          561 :         mode: ListingMode,
     412          561 :         max_keys: Option<NonZeroU32>,
     413          561 :     ) -> Result<Listing, DownloadError> {
     414          537 :         let kind = RequestKind::List;
     415          537 :         // s3 sdk wants i32
     416          537 :         let mut max_keys = max_keys.map(|mk| mk.get() as i32);
     417          537 :         let mut result = Listing::default();
     418          537 : 
     419          537 :         // get the passed prefix or if it is not set use prefix_in_bucket value
     420          537 :         let list_prefix = prefix
     421          537 :             .map(|p| self.relative_path_to_s3_object(p))
     422          537 :             .or_else(|| self.prefix_in_bucket.clone())
     423          537 :             .map(|mut p| {
     424              :                 // required to end with a separator
     425              :                 // otherwise request will return only the entry of a prefix
     426          537 :                 if matches!(mode, ListingMode::WithDelimiter)
     427          293 :                     && !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
     428          293 :                 {
     429          293 :                     p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
     430          293 :                 }
     431          537 :                 p
     432          537 :             });
     433          537 : 
     434          537 :         let mut continuation_token = None;
     435              : 
     436              :         loop {
     437          537 :             let _guard = self.permit(kind).await;
     438          537 :             let started_at = start_measuring_requests(kind);
     439          537 : 
     440          537 :             // min of two Options, returning Some if one is value and another is
     441          537 :             // None (None is smaller than anything, so plain min doesn't work).
     442          537 :             let request_max_keys = self
     443          537 :                 .max_keys_per_list_response
     444          537 :                 .into_iter()
     445          537 :                 .chain(max_keys.into_iter())
     446          537 :                 .min();
     447          537 :             let mut request = self
     448          537 :                 .client
     449          537 :                 .list_objects_v2()
     450          537 :                 .bucket(self.bucket_name.clone())
     451          537 :                 .set_prefix(list_prefix.clone())
     452          537 :                 .set_continuation_token(continuation_token)
     453          537 :                 .set_max_keys(request_max_keys);
     454          537 : 
     455          537 :             if let ListingMode::WithDelimiter = mode {
     456          293 :                 request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
     457          293 :             }
     458              : 
     459          537 :             let response = request
     460          537 :                 .send()
     461         2878 :                 .await
     462          537 :                 .context("Failed to list S3 prefixes")
     463          537 :                 .map_err(DownloadError::Other);
     464          537 : 
     465          537 :             let started_at = ScopeGuard::into_inner(started_at);
     466          537 : 
     467          537 :             metrics::BUCKET_METRICS
     468          537 :                 .req_seconds
     469          537 :                 .observe_elapsed(kind, &response, started_at);
     470              : 
     471          537 :             let response = response?;
     472              : 
     473          537 :             let keys = response.contents();
     474          537 :             let empty = Vec::new();
     475          537 :             let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty);
     476              : 
     477            0 :             tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
     478              : 
     479         2837 :             for object in keys {
     480         2300 :                 let object_path = object.key().expect("response does not contain a key");
     481         2300 :                 let remote_path = self.s3_object_to_relative_path(object_path);
     482         2300 :                 result.keys.push(remote_path);
     483         2300 :                 if let Some(mut mk) = max_keys {
     484            4 :                     assert!(mk > 0);
     485            4 :                     mk -= 1;
     486            4 :                     if mk == 0 {
     487            0 :                         return Ok(result); // limit reached
     488            4 :                     }
     489            4 :                     max_keys = Some(mk);
     490         2296 :                 }
     491              :             }
     492              : 
     493          537 :             result.prefixes.extend(
     494          537 :                 prefixes
     495          537 :                     .iter()
     496          537 :                     .filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))),
     497          537 :             );
     498              : 
     499          537 :             continuation_token = match response.next_continuation_token {
     500            0 :                 Some(new_token) => Some(new_token),
     501          537 :                 None => break,
     502          537 :             };
     503          537 :         }
     504          537 : 
     505          537 :         Ok(result)
     506          537 :     }
     507              : 
     508        16215 :     async fn upload(
     509        16215 :         &self,
     510        16215 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     511        16215 :         from_size_bytes: usize,
     512        16215 :         to: &RemotePath,
     513        16215 :         metadata: Option<StorageMetadata>,
     514        16215 :     ) -> anyhow::Result<()> {
     515        16215 :         let kind = RequestKind::Put;
     516        16215 :         let _guard = self.permit(kind).await;
     517              : 
     518        16215 :         let started_at = start_measuring_requests(kind);
     519        16215 : 
     520        16215 :         let body = Body::wrap_stream(from);
     521        16215 :         let bytes_stream = ByteStream::new(SdkBody::from_body_0_4(body));
     522              : 
     523        16215 :         let res = self
     524        16215 :             .client
     525        16215 :             .put_object()
     526        16215 :             .bucket(self.bucket_name.clone())
     527        16215 :             .key(self.relative_path_to_s3_object(to))
     528        16215 :             .set_metadata(metadata.map(|m| m.0))
     529        16215 :             .content_length(from_size_bytes.try_into()?)
     530        16215 :             .body(bytes_stream)
     531        16215 :             .send()
     532        52442 :             .await;
     533              : 
     534        16214 :         let started_at = ScopeGuard::into_inner(started_at);
     535        16214 :         metrics::BUCKET_METRICS
     536        16214 :             .req_seconds
     537        16214 :             .observe_elapsed(kind, &res, started_at);
     538        16214 : 
     539        16214 :         res?;
     540              : 
     541        16214 :         Ok(())
     542        16214 :     }
     543              : 
     544           14 :     async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
     545           12 :         let kind = RequestKind::Copy;
     546           12 :         let _guard = self.permit(kind).await;
     547              : 
     548           12 :         let started_at = start_measuring_requests(kind);
     549           12 : 
     550           12 :         // we need to specify bucket_name as a prefix
     551           12 :         let copy_source = format!(
     552           12 :             "{}/{}",
     553           12 :             self.bucket_name,
     554           12 :             self.relative_path_to_s3_object(from)
     555           12 :         );
     556              : 
     557           12 :         let res = self
     558           12 :             .client
     559           12 :             .copy_object()
     560           12 :             .bucket(self.bucket_name.clone())
     561           12 :             .key(self.relative_path_to_s3_object(to))
     562           12 :             .copy_source(copy_source)
     563           12 :             .send()
     564           48 :             .await;
     565              : 
     566           12 :         let started_at = ScopeGuard::into_inner(started_at);
     567           12 :         metrics::BUCKET_METRICS
     568           12 :             .req_seconds
     569           12 :             .observe_elapsed(kind, &res, started_at);
     570           12 : 
     571           12 :         res?;
     572              : 
     573           12 :         Ok(())
     574           12 :     }
     575              : 
     576        10439 :     async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
     577        10427 :         // if prefix is not none then download file `prefix/from`
     578        10427 :         // if prefix is none then download file `from`
     579        10427 :         self.download_object(GetObjectRequest {
     580        10427 :             bucket: self.bucket_name.clone(),
     581        10427 :             key: self.relative_path_to_s3_object(from),
     582        10427 :             range: None,
     583        10427 :         })
     584        30321 :         .await
     585        10424 :     }
     586              : 
     587           46 :     async fn download_byte_range(
     588           46 :         &self,
     589           46 :         from: &RemotePath,
     590           46 :         start_inclusive: u64,
     591           46 :         end_exclusive: Option<u64>,
     592           46 :     ) -> Result<Download, DownloadError> {
     593           36 :         // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
     594           36 :         // and needs both ends to be exclusive
     595           36 :         let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
     596           36 :         let range = Some(match end_inclusive {
     597            0 :             Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
     598           36 :             None => format!("bytes={start_inclusive}-"),
     599              :         });
     600              : 
     601           36 :         self.download_object(GetObjectRequest {
     602           36 :             bucket: self.bucket_name.clone(),
     603           36 :             key: self.relative_path_to_s3_object(from),
     604           36 :             range,
     605           36 :         })
     606          106 :         .await
     607           36 :     }
     608         2339 :     async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
     609         2241 :         let kind = RequestKind::Delete;
     610         2241 :         let _guard = self.permit(kind).await;
     611              : 
     612         2241 :         let mut delete_objects = Vec::with_capacity(paths.len());
     613         9029 :         for path in paths {
     614         6788 :             let obj_id = ObjectIdentifier::builder()
     615         6788 :                 .set_key(Some(self.relative_path_to_s3_object(path)))
     616         6788 :                 .build()?;
     617         6788 :             delete_objects.push(obj_id);
     618              :         }
     619              : 
     620         9319 :         self.delete_oids(kind, &delete_objects).await
     621         2241 :     }
     622              : 
     623         2106 :     async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
     624         2016 :         let paths = std::array::from_ref(path);
     625         8284 :         self.delete_objects(paths).await
     626         2016 :     }
     627              : 
     628            7 :     async fn time_travel_recover(
     629            7 :         &self,
     630            7 :         prefix: Option<&RemotePath>,
     631            7 :         timestamp: SystemTime,
     632            7 :         done_if_after: SystemTime,
     633            7 :         cancel: &CancellationToken,
     634            7 :     ) -> Result<(), TimeTravelError> {
     635            1 :         let kind = RequestKind::TimeTravel;
     636            1 :         let _guard = self.permit(kind).await;
     637              : 
     638            1 :         let timestamp = DateTime::from(timestamp);
     639            1 :         let done_if_after = DateTime::from(done_if_after);
     640              : 
     641            0 :         tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
     642              : 
     643              :         // get the passed prefix or if it is not set use prefix_in_bucket value
     644            1 :         let prefix = prefix
     645            1 :             .map(|p| self.relative_path_to_s3_object(p))
     646            1 :             .or_else(|| self.prefix_in_bucket.clone());
     647            1 : 
     648            1 :         let warn_threshold = 3;
     649            1 :         let max_retries = 10;
     650            1 :         let is_permanent = |_e: &_| false;
     651              : 
     652            1 :         let mut key_marker = None;
     653            1 :         let mut version_id_marker = None;
     654            1 :         let mut versions_and_deletes = Vec::new();
     655              : 
     656              :         loop {
     657            1 :             let response = backoff::retry(
     658            1 :                 || async {
     659            1 :                     self.client
     660            1 :                         .list_object_versions()
     661            1 :                         .bucket(self.bucket_name.clone())
     662            1 :                         .set_prefix(prefix.clone())
     663            1 :                         .set_key_marker(key_marker.clone())
     664            1 :                         .set_version_id_marker(version_id_marker.clone())
     665            1 :                         .send()
     666           41 :                         .await
     667            1 :                         .map_err(|e| TimeTravelError::Other(e.into()))
     668            1 :                 },
     669            1 :                 is_permanent,
     670            1 :                 warn_threshold,
     671            1 :                 max_retries,
     672            1 :                 "listing object versions for time_travel_recover",
     673            1 :                 cancel,
     674            1 :             )
     675           41 :             .await
     676            1 :             .ok_or_else(|| TimeTravelError::Cancelled)
     677            1 :             .and_then(|x| x)?;
     678              : 
     679            0 :             tracing::trace!(
     680            0 :                 "  Got List response version_id_marker={:?}, key_marker={:?}",
     681            0 :                 response.version_id_marker,
     682            0 :                 response.key_marker
     683            0 :             );
     684            1 :             let versions = response
     685            1 :                 .versions
     686            1 :                 .unwrap_or_default()
     687            1 :                 .into_iter()
     688            1 :                 .map(VerOrDelete::from_version);
     689            1 :             let deletes = response
     690            1 :                 .delete_markers
     691            1 :                 .unwrap_or_default()
     692            1 :                 .into_iter()
     693            1 :                 .map(VerOrDelete::from_delete_marker);
     694            1 :             itertools::process_results(versions.chain(deletes), |n_vds| {
     695            1 :                 versions_and_deletes.extend(n_vds)
     696            1 :             })
     697            1 :             .map_err(TimeTravelError::Other)?;
     698           14 :             fn none_if_empty(v: Option<String>) -> Option<String> {
     699           14 :                 v.filter(|v| !v.is_empty())
     700           14 :             }
     701            1 :             version_id_marker = none_if_empty(response.next_version_id_marker);
     702            1 :             key_marker = none_if_empty(response.next_key_marker);
     703            1 :             if version_id_marker.is_none() {
     704              :                 // The final response is not supposed to be truncated
     705            1 :                 if response.is_truncated.unwrap_or_default() {
     706            0 :                     return Err(TimeTravelError::Other(anyhow::anyhow!(
     707            0 :                         "Received truncated ListObjectVersions response for prefix={prefix:?}"
     708            0 :                     )));
     709            1 :                 }
     710            1 :                 break;
     711            0 :             }
     712            0 :             // Limit the number of versions deletions, mostly so that we don't
     713            0 :             // keep requesting forever if the list is too long, as we'd put the
     714            0 :             // list in RAM.
     715            0 :             // Building a list of 100k entries that reaches the limit roughly takes
     716            0 :             // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
     717            0 :             const COMPLEXITY_LIMIT: usize = 100_000;
     718            0 :             if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
     719            0 :                 return Err(TimeTravelError::TooManyVersions);
     720            0 :             }
     721              :         }
     722              : 
     723            1 :         tracing::info!(
     724            1 :             "Built list for time travel with {} versions and deletions",
     725            1 :             versions_and_deletes.len()
     726            1 :         );
     727              : 
     728              :         // Work on the list of references instead of the objects directly,
     729              :         // otherwise we get lifetime errors in the sort_by_key call below.
     730            1 :         let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
     731            1 : 
     732         2214 :         versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
     733            1 : 
     734            1 :         let mut vds_for_key = HashMap::<_, Vec<_>>::new();
     735              : 
     736          268 :         for vd in &versions_and_deletes {
     737              :             let VerOrDelete {
     738          267 :                 version_id, key, ..
     739          267 :             } = &vd;
     740          267 :             if version_id == "null" {
     741            0 :                 return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \
     742            0 :                     indicating either disabled versioning, or legacy objects with null version id values")));
     743          267 :             }
     744            0 :             tracing::trace!(
     745            0 :                 "Parsing version key={key} version_id={version_id} kind={:?}",
     746            0 :                 vd.kind
     747            0 :             );
     748              : 
     749          267 :             vds_for_key.entry(key).or_default().push(vd);
     750              :         }
     751           93 :         for (key, versions) in vds_for_key {
     752           92 :             let last_vd = versions.last().unwrap();
     753           92 :             if last_vd.last_modified > done_if_after {
     754            0 :                 tracing::trace!("Key {key} has version later than done_if_after, skipping");
     755            0 :                 continue;
     756           92 :             }
     757              :             // the version we want to restore to.
     758           92 :             let version_to_restore_to =
     759          188 :                 match versions.binary_search_by_key(&timestamp, |tpl| tpl.last_modified) {
     760            0 :                     Ok(v) => v,
     761           92 :                     Err(e) => e,
     762              :                 };
     763           92 :             if version_to_restore_to == versions.len() {
     764            0 :                 tracing::trace!("Key {key} has no changes since timestamp, skipping");
     765            0 :                 continue;
     766           92 :             }
     767           92 :             let mut do_delete = false;
     768           92 :             if version_to_restore_to == 0 {
     769              :                 // All versions more recent, so the key didn't exist at the specified time point.
     770            0 :                 tracing::trace!(
     771            0 :                     "All {} versions more recent for {key}, deleting",
     772            0 :                     versions.len()
     773            0 :                 );
     774            3 :                 do_delete = true;
     775              :             } else {
     776           89 :                 match &versions[version_to_restore_to - 1] {
     777              :                     VerOrDelete {
     778              :                         kind: VerOrDeleteKind::Version,
     779           89 :                         version_id,
     780              :                         ..
     781              :                     } => {
     782            0 :                         tracing::trace!("Copying old version {version_id} for {key}...");
     783              :                         // Restore the state to the last version by copying
     784           89 :                         let source_id =
     785           89 :                             format!("{}/{key}?versionId={version_id}", self.bucket_name);
     786           89 : 
     787           89 :                         backoff::retry(
     788           89 :                             || async {
     789           89 :                                 self.client
     790           89 :                                     .copy_object()
     791           89 :                                     .bucket(self.bucket_name.clone())
     792           89 :                                     .key(key)
     793           89 :                                     .copy_source(&source_id)
     794           89 :                                     .send()
     795          110 :                                     .await
     796           89 :                                     .map_err(|e| TimeTravelError::Other(e.into()))
     797           89 :                             },
     798           89 :                             is_permanent,
     799           89 :                             warn_threshold,
     800           89 :                             max_retries,
     801           89 :                             "copying object version for time_travel_recover",
     802           89 :                             cancel,
     803           89 :                         )
     804          110 :                         .await
     805           89 :                         .ok_or_else(|| TimeTravelError::Cancelled)
     806           89 :                         .and_then(|x| x)?;
     807           89 :                         tracing::info!(%version_id, %key, "Copied old version in S3");
     808              :                     }
     809              :                     VerOrDelete {
     810              :                         kind: VerOrDeleteKind::DeleteMarker,
     811              :                         ..
     812            0 :                     } => {
     813            0 :                         do_delete = true;
     814            0 :                     }
     815              :                 }
     816              :             };
     817           92 :             if do_delete {
     818            3 :                 if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
     819              :                     // Key has since been deleted (but there was some history), no need to do anything
     820            0 :                     tracing::trace!("Key {key} already deleted, skipping.");
     821              :                 } else {
     822            0 :                     tracing::trace!("Deleting {key}...");
     823              : 
     824            0 :                     let oid = ObjectIdentifier::builder()
     825            0 :                         .key(key.to_owned())
     826            0 :                         .build()
     827            0 :                         .map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?;
     828            0 :                     self.delete_oids(kind, &[oid])
     829            0 :                         .await
     830            0 :                         .map_err(TimeTravelError::Other)?;
     831              :                 }
     832           89 :             }
     833              :         }
     834            1 :         Ok(())
     835            1 :     }
     836              : }
     837              : 
     838              : /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].
     839        29733 : fn start_counting_cancelled_wait(
     840        29733 :     kind: RequestKind,
     841        29733 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
     842        29733 :     scopeguard::guard_on_success(std::time::Instant::now(), move |_| {
     843            0 :         metrics::BUCKET_METRICS.cancelled_waits.get(kind).inc()
     844        29733 :     })
     845        29733 : }
     846              : 
     847              : /// On drop (cancellation) add time to [`metrics::BucketMetrics::req_seconds`].
     848        29730 : fn start_measuring_requests(
     849        29730 :     kind: RequestKind,
     850        29730 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
     851        29730 :     scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| {
     852            3 :         metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     853            3 :             kind,
     854            3 :             AttemptOutcome::Cancelled,
     855            3 :             started_at,
     856            3 :         )
     857        29730 :     })
     858        29730 : }
     859              : 
     860              : // Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
     861              : struct VerOrDelete {
     862              :     kind: VerOrDeleteKind,
     863              :     last_modified: DateTime,
     864              :     version_id: String,
     865              :     key: String,
     866              : }
     867              : 
     868            0 : #[derive(Debug)]
     869              : enum VerOrDeleteKind {
     870              :     Version,
     871              :     DeleteMarker,
     872              : }
     873              : 
     874              : impl VerOrDelete {
     875          303 :     fn with_kind(
     876          303 :         kind: VerOrDeleteKind,
     877          303 :         last_modified: Option<DateTime>,
     878          303 :         version_id: Option<String>,
     879          303 :         key: Option<String>,
     880          303 :     ) -> anyhow::Result<Self> {
     881          303 :         let lvk = (last_modified, version_id, key);
     882          303 :         let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
     883            0 :             anyhow::bail!(
     884            0 :                 "One (or more) of last_modified, key, and id is None. \
     885            0 :             Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
     886            0 :                 lvk.0,
     887            0 :                 lvk.1,
     888            0 :                 lvk.2,
     889            0 :             );
     890              :         };
     891          303 :         Ok(Self {
     892          303 :             kind,
     893          303 :             last_modified,
     894          303 :             version_id,
     895          303 :             key,
     896          303 :         })
     897          303 :     }
     898          163 :     fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
     899          163 :         Self::with_kind(
     900          163 :             VerOrDeleteKind::Version,
     901          163 :             v.last_modified,
     902          163 :             v.version_id,
     903          163 :             v.key,
     904          163 :         )
     905          163 :     }
     906          140 :     fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
     907          140 :         Self::with_kind(
     908          140 :             VerOrDeleteKind::DeleteMarker,
     909          140 :             v.last_modified,
     910          140 :             v.version_id,
     911          140 :             v.key,
     912          140 :         )
     913          140 :     }
     914              : }
     915              : 
     916              : #[cfg(test)]
     917              : mod tests {
     918              :     use camino::Utf8Path;
     919              :     use std::num::NonZeroUsize;
     920              : 
     921              :     use crate::{RemotePath, S3Bucket, S3Config};
     922              : 
     923            2 :     #[test]
     924            2 :     fn relative_path() {
     925            2 :         let all_paths = ["", "some/path", "some/path/"];
     926            2 :         let all_paths: Vec<RemotePath> = all_paths
     927            2 :             .iter()
     928            6 :             .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
     929            2 :             .collect();
     930            2 :         let prefixes = [
     931            2 :             None,
     932            2 :             Some(""),
     933            2 :             Some("test/prefix"),
     934            2 :             Some("test/prefix/"),
     935            2 :             Some("/test/prefix/"),
     936            2 :         ];
     937            2 :         let expected_outputs = vec![
     938            2 :             vec!["", "some/path", "some/path"],
     939            2 :             vec!["/", "/some/path", "/some/path"],
     940            2 :             vec![
     941            2 :                 "test/prefix/",
     942            2 :                 "test/prefix/some/path",
     943            2 :                 "test/prefix/some/path",
     944            2 :             ],
     945            2 :             vec![
     946            2 :                 "test/prefix/",
     947            2 :                 "test/prefix/some/path",
     948            2 :                 "test/prefix/some/path",
     949            2 :             ],
     950            2 :             vec![
     951            2 :                 "test/prefix/",
     952            2 :                 "test/prefix/some/path",
     953            2 :                 "test/prefix/some/path",
     954            2 :             ],
     955            2 :         ];
     956              : 
     957           10 :         for (prefix_idx, prefix) in prefixes.iter().enumerate() {
     958           10 :             let config = S3Config {
     959           10 :                 bucket_name: "bucket".to_owned(),
     960           10 :                 bucket_region: "region".to_owned(),
     961           10 :                 prefix_in_bucket: prefix.map(str::to_string),
     962           10 :                 endpoint: None,
     963           10 :                 concurrency_limit: NonZeroUsize::new(100).unwrap(),
     964           10 :                 max_keys_per_list_response: Some(5),
     965           10 :             };
     966           10 :             let storage = S3Bucket::new(&config).expect("remote storage init");
     967           30 :             for (test_path_idx, test_path) in all_paths.iter().enumerate() {
     968           30 :                 let result = storage.relative_path_to_s3_object(test_path);
     969           30 :                 let expected = expected_outputs[prefix_idx][test_path_idx];
     970           30 :                 assert_eq!(result, expected);
     971              :             }
     972              :         }
     973            2 :     }
     974              : }
        

Generated by: LCOV version 2.1-beta