LCOV - code coverage report
Current view: top level - libs/remote_storage/src - s3_bucket.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 88.9 % 862 766
Test Date: 2025-03-12 00:01:28 Functions: 66.0 % 94 62

            Line data    Source code
       1              : //! AWS S3 storage wrapper around `rusoto` library.
       2              : //!
       3              : //! Respects `prefix_in_bucket` property from [`S3Config`],
       4              : //! allowing multiple api users to independently work with the same S3 bucket, if
       5              : //! their bucket prefixes are both specified and different.
       6              : 
       7              : use std::borrow::Cow;
       8              : use std::collections::HashMap;
       9              : use std::num::NonZeroU32;
      10              : use std::pin::Pin;
      11              : use std::sync::Arc;
      12              : use std::task::{Context, Poll};
      13              : use std::time::{Duration, SystemTime};
      14              : 
      15              : use anyhow::{Context as _, anyhow};
      16              : use aws_config::BehaviorVersion;
      17              : use aws_config::default_provider::credentials::DefaultCredentialsChain;
      18              : use aws_config::retry::{RetryConfigBuilder, RetryMode};
      19              : use aws_sdk_s3::Client;
      20              : use aws_sdk_s3::config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep};
      21              : use aws_sdk_s3::error::SdkError;
      22              : use aws_sdk_s3::operation::get_object::GetObjectError;
      23              : use aws_sdk_s3::operation::head_object::HeadObjectError;
      24              : use aws_sdk_s3::types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass};
      25              : use aws_smithy_async::rt::sleep::TokioSleep;
      26              : use aws_smithy_types::DateTime;
      27              : use aws_smithy_types::body::SdkBody;
      28              : use aws_smithy_types::byte_stream::ByteStream;
      29              : use aws_smithy_types::date_time::ConversionError;
      30              : use bytes::Bytes;
      31              : use futures::stream::Stream;
      32              : use futures_util::StreamExt;
      33              : use http_body_util::StreamBody;
      34              : use http_types::StatusCode;
      35              : use hyper::body::Frame;
      36              : use scopeguard::ScopeGuard;
      37              : use tokio_util::sync::CancellationToken;
      38              : use utils::backoff;
      39              : 
      40              : use super::StorageMetadata;
      41              : use crate::config::S3Config;
      42              : use crate::error::Cancelled;
      43              : pub(super) use crate::metrics::RequestKind;
      44              : use crate::metrics::{AttemptOutcome, start_counting_cancelled_wait, start_measuring_requests};
      45              : use crate::support::PermitCarrying;
      46              : use crate::{
      47              :     ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
      48              :     MAX_KEYS_PER_DELETE_S3, REMOTE_STORAGE_PREFIX_SEPARATOR, RemotePath, RemoteStorage,
      49              :     TimeTravelError, TimeoutOrCancel,
      50              : };
      51              : 
      52              : /// AWS S3 storage.
      53              : pub struct S3Bucket {
      54              :     client: Client,
      55              :     bucket_name: String,
      56              :     prefix_in_bucket: Option<String>,
      57              :     max_keys_per_list_response: Option<i32>,
      58              :     upload_storage_class: Option<StorageClass>,
      59              :     concurrency_limiter: ConcurrencyLimiter,
      60              :     // Per-request timeout. Accessible for tests.
      61              :     pub timeout: Duration,
      62              : }
      63              : 
      64              : struct GetObjectRequest {
      65              :     bucket: String,
      66              :     key: String,
      67              :     etag: Option<String>,
      68              :     range: Option<String>,
      69              : }
      70              : impl S3Bucket {
      71              :     /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
      72           41 :     pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
      73           41 :         tracing::debug!(
      74            0 :             "Creating s3 remote storage for S3 bucket {}",
      75              :             remote_storage_config.bucket_name
      76              :         );
      77              : 
      78           41 :         let region = Region::new(remote_storage_config.bucket_region.clone());
      79           41 :         let region_opt = Some(region.clone());
      80              : 
      81              :         // https://docs.aws.amazon.com/sdkref/latest/guide/standardized-credentials.html
      82              :         // https://docs.rs/aws-config/latest/aws_config/default_provider/credentials/struct.DefaultCredentialsChain.html
      83              :         // Incomplete list of auth methods used by this:
      84              :         // * "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
      85              :         // * "AWS_PROFILE" / `aws sso login --profile <profile>`
      86              :         // * "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
      87              :         // * http (ECS/EKS) container credentials
      88              :         // * imds v2
      89           41 :         let credentials_provider = DefaultCredentialsChain::builder()
      90           41 :             .region(region)
      91           41 :             .build()
      92           41 :             .await;
      93              : 
      94              :         // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
      95           41 :         let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
      96           41 : 
      97           41 :         let sdk_config_loader: aws_config::ConfigLoader = aws_config::defaults(
      98           41 :             #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
      99           41 :             BehaviorVersion::v2023_11_09(),
     100           41 :         )
     101           41 :         .region(region_opt)
     102           41 :         .identity_cache(IdentityCache::lazy().build())
     103           41 :         .credentials_provider(credentials_provider)
     104           41 :         .sleep_impl(SharedAsyncSleep::from(sleep_impl));
     105           41 : 
     106           41 :         let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
     107           41 :             s.spawn(|| {
     108           41 :                 // TODO: make this function async.
     109           41 :                 tokio::runtime::Builder::new_current_thread()
     110           41 :                     .enable_all()
     111           41 :                     .build()
     112           41 :                     .unwrap()
     113           41 :                     .block_on(sdk_config_loader.load())
     114           41 :             })
     115           41 :             .join()
     116           41 :             .unwrap()
     117           41 :         });
     118           41 : 
     119           41 :         let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
     120              : 
     121              :         // Technically, the `remote_storage_config.endpoint` field only applies to S3 interactions.
     122              :         // (In case we ever re-use the `sdk_config` for more than just the S3 client in the future)
     123           41 :         if let Some(custom_endpoint) = remote_storage_config.endpoint.clone() {
     124            0 :             s3_config_builder = s3_config_builder
     125            0 :                 .endpoint_url(custom_endpoint)
     126            0 :                 .force_path_style(true);
     127           15 :         }
     128              : 
     129              :         // We do our own retries (see [`backoff::retry`]).  However, for the AWS SDK to enable rate limiting in response to throttling
     130              :         // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config.  We set it to use at most one
     131              :         // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
     132           41 :         let mut retry_config = RetryConfigBuilder::new();
     133           41 :         retry_config
     134           41 :             .set_max_attempts(Some(1))
     135           41 :             .set_mode(Some(RetryMode::Adaptive));
     136           41 :         s3_config_builder = s3_config_builder.retry_config(retry_config.build());
     137           41 : 
     138           41 :         let s3_config = s3_config_builder.build();
     139           41 :         let client = aws_sdk_s3::Client::from_conf(s3_config);
     140           41 : 
     141           41 :         let prefix_in_bucket = remote_storage_config
     142           41 :             .prefix_in_bucket
     143           41 :             .as_deref()
     144           41 :             .map(|prefix| {
     145           38 :                 let mut prefix = prefix;
     146           41 :                 while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     147            3 :                     prefix = &prefix[1..]
     148              :                 }
     149              : 
     150           38 :                 let mut prefix = prefix.to_string();
     151           70 :                 while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     152           32 :                     prefix.pop();
     153           32 :                 }
     154           38 :                 prefix
     155           41 :             });
     156           41 : 
     157           41 :         Ok(Self {
     158           41 :             client,
     159           41 :             bucket_name: remote_storage_config.bucket_name.clone(),
     160           41 :             max_keys_per_list_response: remote_storage_config.max_keys_per_list_response,
     161           41 :             prefix_in_bucket,
     162           41 :             concurrency_limiter: ConcurrencyLimiter::new(
     163           41 :                 remote_storage_config.concurrency_limit.get(),
     164           41 :             ),
     165           41 :             upload_storage_class: remote_storage_config.upload_storage_class.clone(),
     166           41 :             timeout,
     167           41 :         })
     168           41 :     }
     169              : 
     170          519 :     fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
     171          519 :         let relative_path =
     172          519 :             match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
     173          519 :                 Some(stripped) => stripped,
     174              :                 // we rely on AWS to return properly prefixed paths
     175              :                 // for requests with a certain prefix
     176            0 :                 None => panic!(
     177            0 :                     "Key {} does not start with bucket prefix {:?}",
     178            0 :                     key, self.prefix_in_bucket
     179            0 :                 ),
     180              :             };
     181          519 :         RemotePath(
     182          519 :             relative_path
     183          519 :                 .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     184          519 :                 .collect(),
     185          519 :         )
     186          519 :     }
     187              : 
     188          556 :     pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
     189          556 :         assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
     190          556 :         let path_string = path.get_path().as_str();
     191          556 :         match &self.prefix_in_bucket {
     192          547 :             Some(prefix) => prefix.clone() + "/" + path_string,
     193            9 :             None => path_string.to_string(),
     194              :         }
     195          556 :     }
     196              : 
     197          472 :     async fn permit(
     198          472 :         &self,
     199          472 :         kind: RequestKind,
     200          472 :         cancel: &CancellationToken,
     201          472 :     ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
     202          472 :         let started_at = start_counting_cancelled_wait(kind);
     203          472 :         let acquire = self.concurrency_limiter.acquire(kind);
     204              : 
     205          472 :         let permit = tokio::select! {
     206          472 :             permit = acquire => permit.expect("semaphore is never closed"),
     207          472 :             _ = cancel.cancelled() => return Err(Cancelled),
     208              :         };
     209              : 
     210          472 :         let started_at = ScopeGuard::into_inner(started_at);
     211          472 :         crate::metrics::BUCKET_METRICS
     212          472 :             .wait_seconds
     213          472 :             .observe_elapsed(kind, started_at);
     214          472 : 
     215          472 :         Ok(permit)
     216            0 :     }
     217              : 
     218           33 :     async fn owned_permit(
     219           33 :         &self,
     220           33 :         kind: RequestKind,
     221           33 :         cancel: &CancellationToken,
     222           33 :     ) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
     223           33 :         let started_at = start_counting_cancelled_wait(kind);
     224           33 :         let acquire = self.concurrency_limiter.acquire_owned(kind);
     225              : 
     226           33 :         let permit = tokio::select! {
     227           33 :             permit = acquire => permit.expect("semaphore is never closed"),
     228           33 :             _ = cancel.cancelled() => return Err(Cancelled),
     229              :         };
     230              : 
     231           33 :         let started_at = ScopeGuard::into_inner(started_at);
     232           33 :         crate::metrics::BUCKET_METRICS
     233           33 :             .wait_seconds
     234           33 :             .observe_elapsed(kind, started_at);
     235           33 :         Ok(permit)
     236            0 :     }
     237              : 
     238           33 :     async fn download_object(
     239           33 :         &self,
     240           33 :         request: GetObjectRequest,
     241           33 :         cancel: &CancellationToken,
     242           33 :     ) -> Result<Download, DownloadError> {
     243           33 :         let kind = RequestKind::Get;
     244              : 
     245           33 :         let permit = self.owned_permit(kind, cancel).await?;
     246              : 
     247           33 :         let started_at = start_measuring_requests(kind);
     248           33 : 
     249           33 :         let mut builder = self
     250           33 :             .client
     251           33 :             .get_object()
     252           33 :             .bucket(request.bucket)
     253           33 :             .key(request.key)
     254           33 :             .set_range(request.range);
     255              : 
     256           33 :         if let Some(etag) = request.etag {
     257            6 :             builder = builder.if_none_match(etag);
     258            6 :         }
     259              : 
     260           33 :         let get_object = builder.send();
     261              : 
     262           33 :         let get_object = tokio::select! {
     263           33 :             res = get_object => res,
     264           33 :             _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
     265           33 :             _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
     266              :         };
     267              : 
     268           33 :         let started_at = ScopeGuard::into_inner(started_at);
     269              : 
     270           28 :         let object_output = match get_object {
     271           28 :             Ok(object_output) => object_output,
     272            4 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
     273              :                 // Count this in the AttemptOutcome::Ok bucket, because 404 is not
     274              :                 // an error: we expect to sometimes fetch an object and find it missing,
     275              :                 // e.g. when probing for timeline indices.
     276            0 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     277            0 :                     kind,
     278            0 :                     AttemptOutcome::Ok,
     279            0 :                     started_at,
     280            0 :                 );
     281            0 :                 return Err(DownloadError::NotFound);
     282              :             }
     283            4 :             Err(SdkError::ServiceError(e))
     284              :                 // aws_smithy_runtime_api::http::response::StatusCode isn't
     285              :                 // re-exported by any aws crates, so just check the numeric
     286              :                 // status against http_types::StatusCode instead of pulling it.
     287            4 :                 if e.raw().status().as_u16() == StatusCode::NotModified =>
     288            4 :             {
     289            4 :                 // Count an unmodified file as a success.
     290            4 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     291            4 :                     kind,
     292            4 :                     AttemptOutcome::Ok,
     293            4 :                     started_at,
     294            4 :                 );
     295            4 :                 return Err(DownloadError::Unmodified);
     296              :             }
     297            1 :             Err(e) => {
     298            1 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     299            1 :                     kind,
     300            1 :                     AttemptOutcome::Err,
     301            1 :                     started_at,
     302            1 :                 );
     303            1 : 
     304            1 :                 return Err(DownloadError::Other(
     305            1 :                     anyhow::Error::new(e).context("download s3 object"),
     306            1 :                 ));
     307              :             }
     308              :         };
     309              : 
     310              :         // even if we would have no timeout left, continue anyways. the caller can decide to ignore
     311              :         // the errors considering timeouts and cancellation.
     312           28 :         let remaining = self.timeout.saturating_sub(started_at.elapsed());
     313           28 : 
     314           28 :         let metadata = object_output.metadata().cloned().map(StorageMetadata);
     315           28 :         let etag = object_output
     316           28 :             .e_tag
     317           28 :             .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?
     318           28 :             .into();
     319           28 :         let last_modified = object_output
     320           28 :             .last_modified
     321           28 :             .ok_or(DownloadError::Other(anyhow::anyhow!(
     322           28 :                 "Missing LastModified header"
     323           28 :             )))?
     324           28 :             .try_into()
     325           28 :             .map_err(|e: ConversionError| DownloadError::Other(e.into()))?;
     326              : 
     327           28 :         let body = object_output.body;
     328           28 :         let body = ByteStreamAsStream::from(body);
     329           28 :         let body = PermitCarrying::new(permit, body);
     330           28 :         let body = TimedDownload::new(started_at, body);
     331           28 : 
     332           28 :         let cancel_or_timeout = crate::support::cancel_or_timeout(remaining, cancel.clone());
     333           28 :         let body = crate::support::DownloadStream::new(cancel_or_timeout, body);
     334           28 : 
     335           28 :         Ok(Download {
     336           28 :             metadata,
     337           28 :             etag,
     338           28 :             last_modified,
     339           28 :             download_stream: Box::pin(body),
     340           28 :         })
     341            0 :     }
     342              : 
     343          198 :     async fn delete_oids(
     344          198 :         &self,
     345          198 :         _permit: &tokio::sync::SemaphorePermit<'_>,
     346          198 :         delete_objects: &[ObjectIdentifier],
     347          198 :         cancel: &CancellationToken,
     348          198 :     ) -> anyhow::Result<()> {
     349          198 :         let kind = RequestKind::Delete;
     350          198 :         let mut cancel = std::pin::pin!(cancel.cancelled());
     351              : 
     352          198 :         for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE_S3) {
     353          198 :             let started_at = start_measuring_requests(kind);
     354              : 
     355          198 :             let req = self
     356          198 :                 .client
     357          198 :                 .delete_objects()
     358          198 :                 .bucket(self.bucket_name.clone())
     359          198 :                 .delete(
     360          198 :                     Delete::builder()
     361          198 :                         .set_objects(Some(chunk.to_vec()))
     362          198 :                         .build()
     363          198 :                         .context("build request")?,
     364              :                 )
     365          198 :                 .send();
     366              : 
     367          198 :             let resp = tokio::select! {
     368          198 :                 resp = req => resp,
     369          198 :                 _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
     370          198 :                 _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
     371              :             };
     372              : 
     373          198 :             let started_at = ScopeGuard::into_inner(started_at);
     374          198 :             crate::metrics::BUCKET_METRICS
     375          198 :                 .req_seconds
     376          198 :                 .observe_elapsed(kind, &resp, started_at);
     377              : 
     378          198 :             let resp = resp.context("request deletion")?;
     379          198 :             crate::metrics::BUCKET_METRICS
     380          198 :                 .deleted_objects_total
     381          198 :                 .inc_by(chunk.len() as u64);
     382              : 
     383          198 :             if let Some(errors) = resp.errors {
     384              :                 // Log a bounded number of the errors within the response:
     385              :                 // these requests can carry 1000 keys so logging each one
     386              :                 // would be too verbose, especially as errors may lead us
     387              :                 // to retry repeatedly.
     388              :                 const LOG_UP_TO_N_ERRORS: usize = 10;
     389            0 :                 for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
     390            0 :                     tracing::warn!(
     391            0 :                         "DeleteObjects key {} failed: {}: {}",
     392            0 :                         e.key.as_ref().map(Cow::from).unwrap_or("".into()),
     393            0 :                         e.code.as_ref().map(Cow::from).unwrap_or("".into()),
     394            0 :                         e.message.as_ref().map(Cow::from).unwrap_or("".into())
     395              :                     );
     396              :                 }
     397              : 
     398            0 :                 return Err(anyhow::anyhow!(
     399            0 :                     "Failed to delete {}/{} objects",
     400            0 :                     errors.len(),
     401            0 :                     chunk.len(),
     402            0 :                 ));
     403            0 :             }
     404              :         }
     405          198 :         Ok(())
     406            0 :     }
     407              : 
     408            6 :     pub fn bucket_name(&self) -> &str {
     409            6 :         &self.bucket_name
     410            6 :     }
     411              : }
     412              : 
     413              : pin_project_lite::pin_project! {
     414              :     struct ByteStreamAsStream {
     415              :         #[pin]
     416              :         inner: aws_smithy_types::byte_stream::ByteStream
     417              :     }
     418              : }
     419              : 
     420              : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
     421           28 :     fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
     422           28 :         ByteStreamAsStream { inner }
     423           28 :     }
     424              : }
     425              : 
     426              : impl Stream for ByteStreamAsStream {
     427              :     type Item = std::io::Result<Bytes>;
     428              : 
     429           48 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     430           48 :         // this does the std::io::ErrorKind::Other conversion
     431           48 :         self.project().inner.poll_next(cx).map_err(|x| x.into())
     432           48 :     }
     433              : 
     434              :     // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
     435              :     // sense and Stream::size_hint does not really
     436              : }
     437              : 
     438              : pin_project_lite::pin_project! {
     439              :     /// Times and tracks the outcome of the request.
     440              :     struct TimedDownload<S> {
     441              :         started_at: std::time::Instant,
     442              :         outcome: AttemptOutcome,
     443              :         #[pin]
     444              :         inner: S
     445              :     }
     446              : 
     447              :     impl<S> PinnedDrop for TimedDownload<S> {
     448              :         fn drop(mut this: Pin<&mut Self>) {
     449              :             crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
     450              :         }
     451              :     }
     452              : }
     453              : 
     454              : impl<S> TimedDownload<S> {
     455           28 :     fn new(started_at: std::time::Instant, inner: S) -> Self {
     456           28 :         TimedDownload {
     457           28 :             started_at,
     458           28 :             outcome: AttemptOutcome::Cancelled,
     459           28 :             inner,
     460           28 :         }
     461           28 :     }
     462              : }
     463              : 
     464              : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
     465              :     type Item = <S as Stream>::Item;
     466              : 
     467           48 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     468              :         use std::task::ready;
     469              : 
     470           48 :         let this = self.project();
     471              : 
     472           48 :         let res = ready!(this.inner.poll_next(cx));
     473           22 :         match &res {
     474           22 :             Some(Ok(_)) => {}
     475            0 :             Some(Err(_)) => *this.outcome = AttemptOutcome::Err,
     476           18 :             None => *this.outcome = AttemptOutcome::Ok,
     477              :         }
     478              : 
     479           40 :         Poll::Ready(res)
     480            0 :     }
     481              : 
     482            0 :     fn size_hint(&self) -> (usize, Option<usize>) {
     483            0 :         self.inner.size_hint()
     484            0 :     }
     485              : }
     486              : 
     487              : impl RemoteStorage for S3Bucket {
     488           64 :     fn list_streaming(
     489           64 :         &self,
     490           64 :         prefix: Option<&RemotePath>,
     491           64 :         mode: ListingMode,
     492           64 :         max_keys: Option<NonZeroU32>,
     493           64 :         cancel: &CancellationToken,
     494           64 :     ) -> impl Stream<Item = Result<Listing, DownloadError>> {
     495           64 :         let kind = RequestKind::List;
     496           64 :         // s3 sdk wants i32
     497           64 :         let mut max_keys = max_keys.map(|mk| mk.get() as i32);
     498           64 : 
     499           64 :         // get the passed prefix or if it is not set use prefix_in_bucket value
     500           64 :         let list_prefix = prefix
     501           64 :             .map(|p| self.relative_path_to_s3_object(p))
     502           64 :             .or_else(|| {
     503           32 :                 self.prefix_in_bucket.clone().map(|mut s| {
     504           32 :                     s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
     505           32 :                     s
     506           32 :                 })
     507           64 :             });
     508           64 : 
     509           64 :         async_stream::stream! {
     510           64 :             let _permit = self.permit(kind, cancel).await?;
     511           64 : 
     512           64 :             let mut continuation_token = None;
     513           64 :             'outer: loop {
     514           64 :                 let started_at = start_measuring_requests(kind);
     515           64 : 
     516           64 :                 // min of two Options, returning Some if one is value and another is
     517           64 :                 // None (None is smaller than anything, so plain min doesn't work).
     518           64 :                 let request_max_keys = self
     519           64 :                     .max_keys_per_list_response
     520           64 :                     .into_iter()
     521           64 :                     .chain(max_keys.into_iter())
     522           64 :                     .min();
     523           64 :                 let mut request = self
     524           64 :                     .client
     525           64 :                     .list_objects_v2()
     526           64 :                     .bucket(self.bucket_name.clone())
     527           64 :                     .set_prefix(list_prefix.clone())
     528           64 :                     .set_continuation_token(continuation_token.clone())
     529           64 :                     .set_max_keys(request_max_keys);
     530           64 : 
     531           64 :                 if let ListingMode::WithDelimiter = mode {
     532           64 :                     request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
     533           64 :                 }
     534           64 : 
     535           64 :                 let request = request.send();
     536           64 : 
     537           64 :                 let response = tokio::select! {
     538           64 :                     res = request => Ok(res),
     539           64 :                     _ = tokio::time::sleep(self.timeout) => Err(DownloadError::Timeout),
     540           64 :                     _ = cancel.cancelled() => Err(DownloadError::Cancelled),
     541           64 :                 }?;
     542           64 : 
     543           64 :                 let response = response
     544           64 :                     .context("Failed to list S3 prefixes")
     545           64 :                     .map_err(DownloadError::Other);
     546           64 : 
     547           64 :                 let started_at = ScopeGuard::into_inner(started_at);
     548           64 : 
     549           64 :                 crate::metrics::BUCKET_METRICS
     550           64 :                     .req_seconds
     551           64 :                     .observe_elapsed(kind, &response, started_at);
     552           64 : 
     553           64 :                 let response = match response {
     554           64 :                     Ok(response) => response,
     555           64 :                     Err(e) => {
     556           64 :                         // The error is potentially retryable, so we must rewind the loop after yielding.
     557           64 :                         yield Err(e);
     558           64 :                         continue 'outer;
     559           64 :                     },
     560           64 :                 };
     561           64 : 
     562           64 :                 let keys = response.contents();
     563           64 :                 let prefixes = response.common_prefixes.as_deref().unwrap_or_default();
     564           64 : 
     565           64 :                 tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
     566           64 :                 let mut result = Listing::default();
     567           64 : 
     568           64 :                 for object in keys {
     569           64 :                     let key = object.key().expect("response does not contain a key");
     570           64 :                     let key = self.s3_object_to_relative_path(key);
     571           64 : 
     572           64 :                     let last_modified = match object.last_modified.map(SystemTime::try_from) {
     573           64 :                         Some(Ok(t)) => t,
     574           64 :                         Some(Err(_)) => {
     575           64 :                             tracing::warn!("Remote storage last_modified {:?} for {} is out of bounds",
     576           64 :                                 object.last_modified, key
     577           64 :                         );
     578           64 :                             SystemTime::now()
     579           64 :                         },
     580           64 :                         None => {
     581           64 :                             SystemTime::now()
     582           64 :                         }
     583           64 :                     };
     584           64 : 
     585           64 :                     let size = object.size.unwrap_or(0) as u64;
     586           64 : 
     587           64 :                     result.keys.push(ListingObject{
     588           64 :                         key,
     589           64 :                         last_modified,
     590           64 :                         size,
     591           64 :                     });
     592           64 :                     if let Some(mut mk) = max_keys {
     593           64 :                         assert!(mk > 0);
     594           64 :                         mk -= 1;
     595           64 :                         if mk == 0 {
     596           64 :                             // limit reached
     597           64 :                             yield Ok(result);
     598           64 :                             break 'outer;
     599           64 :                         }
     600           64 :                         max_keys = Some(mk);
     601           64 :                     }
     602           64 :                 }
     603           64 : 
     604           64 :                 // S3 gives us prefixes like "foo/", we return them like "foo"
     605          104 :                 result.prefixes.extend(prefixes.iter().filter_map(|o| {
     606          104 :                     Some(
     607          104 :                         self.s3_object_to_relative_path(
     608          104 :                             o.prefix()?
     609          104 :                                 .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR),
     610           64 :                         ),
     611           64 :                     )
     612           64 :                 }));
     613           64 : 
     614           64 :                 yield Ok(result);
     615           64 : 
     616           64 :                 continuation_token = match response.next_continuation_token {
     617           64 :                     Some(new_token) => Some(new_token),
     618           64 :                     None => break,
     619           64 :                 };
     620           64 :             }
     621           64 :         }
     622           64 :     }
     623              : 
     624            6 :     async fn head_object(
     625            6 :         &self,
     626            6 :         key: &RemotePath,
     627            6 :         cancel: &CancellationToken,
     628            6 :     ) -> Result<ListingObject, DownloadError> {
     629            6 :         let kind = RequestKind::Head;
     630            6 :         let _permit = self.permit(kind, cancel).await?;
     631              : 
     632            6 :         let started_at = start_measuring_requests(kind);
     633            6 : 
     634            6 :         let head_future = self
     635            6 :             .client
     636            6 :             .head_object()
     637            6 :             .bucket(self.bucket_name())
     638            6 :             .key(self.relative_path_to_s3_object(key))
     639            6 :             .send();
     640            6 : 
     641            6 :         let head_future = tokio::time::timeout(self.timeout, head_future);
     642              : 
     643            6 :         let res = tokio::select! {
     644            6 :             res = head_future => res,
     645            6 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     646              :         };
     647              : 
     648            6 :         let res = res.map_err(|_e| DownloadError::Timeout)?;
     649              : 
     650              :         // do not incl. timeouts as errors in metrics but cancellations
     651            6 :         let started_at = ScopeGuard::into_inner(started_at);
     652            6 :         crate::metrics::BUCKET_METRICS
     653            6 :             .req_seconds
     654            6 :             .observe_elapsed(kind, &res, started_at);
     655              : 
     656            4 :         let data = match res {
     657            4 :             Ok(object_output) => object_output,
     658            2 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), HeadObjectError::NotFound(_)) => {
     659              :                 // Count this in the AttemptOutcome::Ok bucket, because 404 is not
     660              :                 // an error: we expect to sometimes fetch an object and find it missing,
     661              :                 // e.g. when probing for timeline indices.
     662            2 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     663            2 :                     kind,
     664            2 :                     AttemptOutcome::Ok,
     665            2 :                     started_at,
     666            2 :                 );
     667            2 :                 return Err(DownloadError::NotFound);
     668              :             }
     669            0 :             Err(e) => {
     670            0 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     671            0 :                     kind,
     672            0 :                     AttemptOutcome::Err,
     673            0 :                     started_at,
     674            0 :                 );
     675            0 : 
     676            0 :                 return Err(DownloadError::Other(
     677            0 :                     anyhow::Error::new(e).context("s3 head object"),
     678            0 :                 ));
     679              :             }
     680              :         };
     681              : 
     682            4 :         let (Some(last_modified), Some(size)) = (data.last_modified, data.content_length) else {
     683            0 :             return Err(DownloadError::Other(anyhow!(
     684            0 :                 "head_object doesn't contain last_modified or content_length"
     685            0 :             )))?;
     686              :         };
     687              :         Ok(ListingObject {
     688            4 :             key: key.to_owned(),
     689            4 :             last_modified: SystemTime::try_from(last_modified).map_err(|e| {
     690            0 :                 DownloadError::Other(anyhow!("can't convert time '{last_modified}': {e}"))
     691            4 :             })?,
     692            4 :             size: size as u64,
     693              :         })
     694            0 :     }
     695              : 
     696          200 :     async fn upload(
     697          200 :         &self,
     698          200 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     699          200 :         from_size_bytes: usize,
     700          200 :         to: &RemotePath,
     701          200 :         metadata: Option<StorageMetadata>,
     702          200 :         cancel: &CancellationToken,
     703          200 :     ) -> anyhow::Result<()> {
     704          200 :         let kind = RequestKind::Put;
     705          200 :         let _permit = self.permit(kind, cancel).await?;
     706              : 
     707          200 :         let started_at = start_measuring_requests(kind);
     708          200 : 
     709          712 :         let body = StreamBody::new(from.map(|x| x.map(Frame::data)));
     710          200 :         let bytes_stream = ByteStream::new(SdkBody::from_body_1_x(body));
     711              : 
     712          200 :         let upload = self
     713          200 :             .client
     714          200 :             .put_object()
     715          200 :             .bucket(self.bucket_name.clone())
     716          200 :             .key(self.relative_path_to_s3_object(to))
     717          200 :             .set_metadata(metadata.map(|m| m.0))
     718          200 :             .set_storage_class(self.upload_storage_class.clone())
     719          200 :             .content_length(from_size_bytes.try_into()?)
     720          200 :             .body(bytes_stream)
     721          200 :             .send();
     722          200 : 
     723          200 :         let upload = tokio::time::timeout(self.timeout, upload);
     724              : 
     725          200 :         let res = tokio::select! {
     726          200 :             res = upload => res,
     727          200 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     728              :         };
     729              : 
     730          200 :         if let Ok(inner) = &res {
     731          200 :             // do not incl. timeouts as errors in metrics but cancellations
     732          200 :             let started_at = ScopeGuard::into_inner(started_at);
     733          200 :             crate::metrics::BUCKET_METRICS
     734          200 :                 .req_seconds
     735          200 :                 .observe_elapsed(kind, inner, started_at);
     736          200 :         }
     737              : 
     738          200 :         match res {
     739          198 :             Ok(Ok(_put)) => Ok(()),
     740            2 :             Ok(Err(sdk)) => Err(sdk.into()),
     741            0 :             Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
     742              :         }
     743            0 :     }
     744              : 
     745            2 :     async fn copy(
     746            2 :         &self,
     747            2 :         from: &RemotePath,
     748            2 :         to: &RemotePath,
     749            2 :         cancel: &CancellationToken,
     750            2 :     ) -> anyhow::Result<()> {
     751            2 :         let kind = RequestKind::Copy;
     752            2 :         let _permit = self.permit(kind, cancel).await?;
     753              : 
     754            2 :         let timeout = tokio::time::sleep(self.timeout);
     755            2 : 
     756            2 :         let started_at = start_measuring_requests(kind);
     757            2 : 
     758            2 :         // we need to specify bucket_name as a prefix
     759            2 :         let copy_source = format!(
     760            2 :             "{}/{}",
     761            2 :             self.bucket_name,
     762            2 :             self.relative_path_to_s3_object(from)
     763            2 :         );
     764            2 : 
     765            2 :         let op = self
     766            2 :             .client
     767            2 :             .copy_object()
     768            2 :             .bucket(self.bucket_name.clone())
     769            2 :             .key(self.relative_path_to_s3_object(to))
     770            2 :             .set_storage_class(self.upload_storage_class.clone())
     771            2 :             .copy_source(copy_source)
     772            2 :             .send();
     773              : 
     774            2 :         let res = tokio::select! {
     775            2 :             res = op => res,
     776            2 :             _ = timeout => return Err(TimeoutOrCancel::Timeout.into()),
     777            2 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     778              :         };
     779              : 
     780            2 :         let started_at = ScopeGuard::into_inner(started_at);
     781            2 :         crate::metrics::BUCKET_METRICS
     782            2 :             .req_seconds
     783            2 :             .observe_elapsed(kind, &res, started_at);
     784            2 : 
     785            2 :         res?;
     786              : 
     787            2 :         Ok(())
     788            0 :     }
     789              : 
     790           33 :     async fn download(
     791           33 :         &self,
     792           33 :         from: &RemotePath,
     793           33 :         opts: &DownloadOpts,
     794           33 :         cancel: &CancellationToken,
     795           33 :     ) -> Result<Download, DownloadError> {
     796           33 :         // if prefix is not none then download file `prefix/from`
     797           33 :         // if prefix is none then download file `from`
     798           33 :         self.download_object(
     799           33 :             GetObjectRequest {
     800           33 :                 bucket: self.bucket_name.clone(),
     801           33 :                 key: self.relative_path_to_s3_object(from),
     802           33 :                 etag: opts.etag.as_ref().map(|e| e.to_string()),
     803           33 :                 range: opts.byte_range_header(),
     804           33 :             },
     805           33 :             cancel,
     806           33 :         )
     807           33 :         .await
     808            0 :     }
     809              : 
     810          194 :     async fn delete_objects(
     811          194 :         &self,
     812          194 :         paths: &[RemotePath],
     813          194 :         cancel: &CancellationToken,
     814          194 :     ) -> anyhow::Result<()> {
     815          194 :         let kind = RequestKind::Delete;
     816          194 :         let permit = self.permit(kind, cancel).await?;
     817          194 :         let mut delete_objects = Vec::with_capacity(paths.len());
     818          430 :         for path in paths {
     819          236 :             let obj_id = ObjectIdentifier::builder()
     820          236 :                 .set_key(Some(self.relative_path_to_s3_object(path)))
     821          236 :                 .build()
     822          236 :                 .context("convert path to oid")?;
     823          236 :             delete_objects.push(obj_id);
     824              :         }
     825              : 
     826          194 :         self.delete_oids(&permit, &delete_objects, cancel).await
     827            0 :     }
     828              : 
     829            0 :     fn max_keys_per_delete(&self) -> usize {
     830            0 :         MAX_KEYS_PER_DELETE_S3
     831            0 :     }
     832              : 
     833          174 :     async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
     834          174 :         let paths = std::array::from_ref(path);
     835          174 :         self.delete_objects(paths, cancel).await
     836            0 :     }
     837              : 
     838            6 :     async fn time_travel_recover(
     839            6 :         &self,
     840            6 :         prefix: Option<&RemotePath>,
     841            6 :         timestamp: SystemTime,
     842            6 :         done_if_after: SystemTime,
     843            6 :         cancel: &CancellationToken,
     844            6 :     ) -> Result<(), TimeTravelError> {
     845            6 :         let kind = RequestKind::TimeTravel;
     846            6 :         let permit = self.permit(kind, cancel).await?;
     847              : 
     848            6 :         let timestamp = DateTime::from(timestamp);
     849            6 :         let done_if_after = DateTime::from(done_if_after);
     850            6 : 
     851            6 :         tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
     852              : 
     853              :         // get the passed prefix or if it is not set use prefix_in_bucket value
     854            6 :         let prefix = prefix
     855            6 :             .map(|p| self.relative_path_to_s3_object(p))
     856            6 :             .or_else(|| self.prefix_in_bucket.clone());
     857            6 : 
     858            6 :         let warn_threshold = 3;
     859            6 :         let max_retries = 10;
     860            6 :         let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
     861              : 
     862            6 :         let mut key_marker = None;
     863            6 :         let mut version_id_marker = None;
     864            6 :         let mut versions_and_deletes = Vec::new();
     865              : 
     866              :         loop {
     867            6 :             let response = backoff::retry(
     868            7 :                 || async {
     869            7 :                     let op = self
     870            7 :                         .client
     871            7 :                         .list_object_versions()
     872            7 :                         .bucket(self.bucket_name.clone())
     873            7 :                         .set_prefix(prefix.clone())
     874            7 :                         .set_key_marker(key_marker.clone())
     875            7 :                         .set_version_id_marker(version_id_marker.clone())
     876            7 :                         .send();
     877            7 : 
     878            7 :                     tokio::select! {
     879            7 :                         res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
     880            7 :                         _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
     881              :                     }
     882            6 :                 },
     883            6 :                 is_permanent,
     884            6 :                 warn_threshold,
     885            6 :                 max_retries,
     886            6 :                 "listing object versions for time_travel_recover",
     887            6 :                 cancel,
     888            6 :             )
     889            6 :             .await
     890            6 :             .ok_or_else(|| TimeTravelError::Cancelled)
     891            6 :             .and_then(|x| x)?;
     892              : 
     893            6 :             tracing::trace!(
     894            0 :                 "  Got List response version_id_marker={:?}, key_marker={:?}",
     895              :                 response.version_id_marker,
     896              :                 response.key_marker
     897              :             );
     898            6 :             let versions = response
     899            6 :                 .versions
     900            6 :                 .unwrap_or_default()
     901            6 :                 .into_iter()
     902            6 :                 .map(VerOrDelete::from_version);
     903            6 :             let deletes = response
     904            6 :                 .delete_markers
     905            6 :                 .unwrap_or_default()
     906            6 :                 .into_iter()
     907            6 :                 .map(VerOrDelete::from_delete_marker);
     908            6 :             itertools::process_results(versions.chain(deletes), |n_vds| {
     909            6 :                 versions_and_deletes.extend(n_vds)
     910            6 :             })
     911            6 :             .map_err(TimeTravelError::Other)?;
     912           12 :             fn none_if_empty(v: Option<String>) -> Option<String> {
     913           12 :                 v.filter(|v| !v.is_empty())
     914           12 :             }
     915            6 :             version_id_marker = none_if_empty(response.next_version_id_marker);
     916            6 :             key_marker = none_if_empty(response.next_key_marker);
     917            6 :             if version_id_marker.is_none() {
     918              :                 // The final response is not supposed to be truncated
     919            6 :                 if response.is_truncated.unwrap_or_default() {
     920            0 :                     return Err(TimeTravelError::Other(anyhow::anyhow!(
     921            0 :                         "Received truncated ListObjectVersions response for prefix={prefix:?}"
     922            0 :                     )));
     923            6 :                 }
     924            6 :                 break;
     925            0 :             }
     926              :             // Limit the number of versions deletions, mostly so that we don't
     927              :             // keep requesting forever if the list is too long, as we'd put the
     928              :             // list in RAM.
     929              :             // Building a list of 100k entries that reaches the limit roughly takes
     930              :             // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
     931              :             const COMPLEXITY_LIMIT: usize = 100_000;
     932            0 :             if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
     933            0 :                 return Err(TimeTravelError::TooManyVersions);
     934            0 :             }
     935              :         }
     936              : 
     937            6 :         tracing::info!(
     938            0 :             "Built list for time travel with {} versions and deletions",
     939            0 :             versions_and_deletes.len()
     940              :         );
     941              : 
     942              :         // Work on the list of references instead of the objects directly,
     943              :         // otherwise we get lifetime errors in the sort_by_key call below.
     944            6 :         let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
     945            6 : 
     946          124 :         versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
     947            6 : 
     948            6 :         let mut vds_for_key = HashMap::<_, Vec<_>>::new();
     949              : 
     950           42 :         for vd in &versions_and_deletes {
     951              :             let VerOrDelete {
     952           36 :                 version_id, key, ..
     953           36 :             } = &vd;
     954           36 :             if version_id == "null" {
     955            0 :                 return Err(TimeTravelError::Other(anyhow!(
     956            0 :                     "Received ListVersions response for key={key} with version_id='null', \
     957            0 :                     indicating either disabled versioning, or legacy objects with null version id values"
     958            0 :                 )));
     959           36 :             }
     960           36 :             tracing::trace!(
     961            0 :                 "Parsing version key={key} version_id={version_id} kind={:?}",
     962              :                 vd.kind
     963              :             );
     964              : 
     965           36 :             vds_for_key.entry(key).or_default().push(vd);
     966              :         }
     967           24 :         for (key, versions) in vds_for_key {
     968           18 :             let last_vd = versions.last().unwrap();
     969           18 :             if last_vd.last_modified > done_if_after {
     970            0 :                 tracing::trace!("Key {key} has version later than done_if_after, skipping");
     971            0 :                 continue;
     972            0 :             }
     973              :             // the version we want to restore to.
     974           18 :             let version_to_restore_to =
     975           36 :                 match versions.binary_search_by_key(&timestamp, |tpl| tpl.last_modified) {
     976            0 :                     Ok(v) => v,
     977           18 :                     Err(e) => e,
     978              :                 };
     979           18 :             if version_to_restore_to == versions.len() {
     980            6 :                 tracing::trace!("Key {key} has no changes since timestamp, skipping");
     981            6 :                 continue;
     982           12 :             }
     983           12 :             let mut do_delete = false;
     984           12 :             if version_to_restore_to == 0 {
     985              :                 // All versions more recent, so the key didn't exist at the specified time point.
     986            6 :                 tracing::trace!(
     987            0 :                     "All {} versions more recent for {key}, deleting",
     988            0 :                     versions.len()
     989              :                 );
     990            6 :                 do_delete = true;
     991              :             } else {
     992            6 :                 match &versions[version_to_restore_to - 1] {
     993              :                     VerOrDelete {
     994              :                         kind: VerOrDeleteKind::Version,
     995            6 :                         version_id,
     996            6 :                         ..
     997            6 :                     } => {
     998            6 :                         tracing::trace!("Copying old version {version_id} for {key}...");
     999              :                         // Restore the state to the last version by copying
    1000            6 :                         let source_id =
    1001            6 :                             format!("{}/{key}?versionId={version_id}", self.bucket_name);
    1002            6 : 
    1003            6 :                         backoff::retry(
    1004            6 :                             || async {
    1005            6 :                                 let op = self
    1006            6 :                                     .client
    1007            6 :                                     .copy_object()
    1008            6 :                                     .bucket(self.bucket_name.clone())
    1009            6 :                                     .key(key)
    1010            6 :                                     .set_storage_class(self.upload_storage_class.clone())
    1011            6 :                                     .copy_source(&source_id)
    1012            6 :                                     .send();
    1013            6 : 
    1014            6 :                                 tokio::select! {
    1015            6 :                                     res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
    1016            6 :                                     _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
    1017              :                                 }
    1018            6 :                             },
    1019            6 :                             is_permanent,
    1020            6 :                             warn_threshold,
    1021            6 :                             max_retries,
    1022            6 :                             "copying object version for time_travel_recover",
    1023            6 :                             cancel,
    1024            6 :                         )
    1025            6 :                         .await
    1026            6 :                         .ok_or_else(|| TimeTravelError::Cancelled)
    1027            6 :                         .and_then(|x| x)?;
    1028            6 :                         tracing::info!(%version_id, %key, "Copied old version in S3");
    1029              :                     }
    1030              :                     VerOrDelete {
    1031              :                         kind: VerOrDeleteKind::DeleteMarker,
    1032              :                         ..
    1033            0 :                     } => {
    1034            0 :                         do_delete = true;
    1035            0 :                     }
    1036              :                 }
    1037              :             };
    1038           12 :             if do_delete {
    1039            6 :                 if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
    1040              :                     // Key has since been deleted (but there was some history), no need to do anything
    1041            2 :                     tracing::trace!("Key {key} already deleted, skipping.");
    1042              :                 } else {
    1043            4 :                     tracing::trace!("Deleting {key}...");
    1044              : 
    1045            4 :                     let oid = ObjectIdentifier::builder()
    1046            4 :                         .key(key.to_owned())
    1047            4 :                         .build()
    1048            4 :                         .map_err(|e| TimeTravelError::Other(e.into()))?;
    1049              : 
    1050            4 :                     self.delete_oids(&permit, &[oid], cancel)
    1051            4 :                         .await
    1052            4 :                         .map_err(|e| {
    1053            0 :                             // delete_oid0 will use TimeoutOrCancel
    1054            0 :                             if TimeoutOrCancel::caused_by_cancel(&e) {
    1055            0 :                                 TimeTravelError::Cancelled
    1056              :                             } else {
    1057            0 :                                 TimeTravelError::Other(e)
    1058              :                             }
    1059            4 :                         })?;
    1060              :                 }
    1061            0 :             }
    1062              :         }
    1063            6 :         Ok(())
    1064            0 :     }
    1065              : }
    1066              : 
    1067              : // Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
    1068              : struct VerOrDelete {
    1069              :     kind: VerOrDeleteKind,
    1070              :     last_modified: DateTime,
    1071              :     version_id: String,
    1072              :     key: String,
    1073              : }
    1074              : 
    1075              : #[derive(Debug)]
    1076              : enum VerOrDeleteKind {
    1077              :     Version,
    1078              :     DeleteMarker,
    1079              : }
    1080              : 
    1081              : impl VerOrDelete {
    1082           36 :     fn with_kind(
    1083           36 :         kind: VerOrDeleteKind,
    1084           36 :         last_modified: Option<DateTime>,
    1085           36 :         version_id: Option<String>,
    1086           36 :         key: Option<String>,
    1087           36 :     ) -> anyhow::Result<Self> {
    1088           36 :         let lvk = (last_modified, version_id, key);
    1089           36 :         let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
    1090            0 :             anyhow::bail!(
    1091            0 :                 "One (or more) of last_modified, key, and id is None. \
    1092            0 :             Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
    1093            0 :                 lvk.0,
    1094            0 :                 lvk.1,
    1095            0 :                 lvk.2,
    1096            0 :             );
    1097              :         };
    1098           36 :         Ok(Self {
    1099           36 :             kind,
    1100           36 :             last_modified,
    1101           36 :             version_id,
    1102           36 :             key,
    1103           36 :         })
    1104           36 :     }
    1105           28 :     fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
    1106           28 :         Self::with_kind(
    1107           28 :             VerOrDeleteKind::Version,
    1108           28 :             v.last_modified,
    1109           28 :             v.version_id,
    1110           28 :             v.key,
    1111           28 :         )
    1112           28 :     }
    1113            8 :     fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
    1114            8 :         Self::with_kind(
    1115            8 :             VerOrDeleteKind::DeleteMarker,
    1116            8 :             v.last_modified,
    1117            8 :             v.version_id,
    1118            8 :             v.key,
    1119            8 :         )
    1120            8 :     }
    1121              : }
    1122              : 
    1123              : #[cfg(test)]
    1124              : mod tests {
    1125              :     use std::num::NonZeroUsize;
    1126              : 
    1127              :     use camino::Utf8Path;
    1128              : 
    1129              :     use crate::{RemotePath, S3Bucket, S3Config};
    1130              : 
    1131              :     #[tokio::test]
    1132            3 :     async fn relative_path() {
    1133            3 :         let all_paths = ["", "some/path", "some/path/"];
    1134            3 :         let all_paths: Vec<RemotePath> = all_paths
    1135            3 :             .iter()
    1136            9 :             .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
    1137            3 :             .collect();
    1138            3 :         let prefixes = [
    1139            3 :             None,
    1140            3 :             Some(""),
    1141            3 :             Some("test/prefix"),
    1142            3 :             Some("test/prefix/"),
    1143            3 :             Some("/test/prefix/"),
    1144            3 :         ];
    1145            3 :         let expected_outputs = [
    1146            3 :             vec!["", "some/path", "some/path/"],
    1147            3 :             vec!["/", "/some/path", "/some/path/"],
    1148            3 :             vec![
    1149            3 :                 "test/prefix/",
    1150            3 :                 "test/prefix/some/path",
    1151            3 :                 "test/prefix/some/path/",
    1152            3 :             ],
    1153            3 :             vec![
    1154            3 :                 "test/prefix/",
    1155            3 :                 "test/prefix/some/path",
    1156            3 :                 "test/prefix/some/path/",
    1157            3 :             ],
    1158            3 :             vec![
    1159            3 :                 "test/prefix/",
    1160            3 :                 "test/prefix/some/path",
    1161            3 :                 "test/prefix/some/path/",
    1162            3 :             ],
    1163            3 :         ];
    1164            3 : 
    1165           15 :         for (prefix_idx, prefix) in prefixes.iter().enumerate() {
    1166           15 :             let config = S3Config {
    1167           15 :                 bucket_name: "bucket".to_owned(),
    1168           15 :                 bucket_region: "region".to_owned(),
    1169           15 :                 prefix_in_bucket: prefix.map(str::to_string),
    1170           15 :                 endpoint: None,
    1171           15 :                 concurrency_limit: NonZeroUsize::new(100).unwrap(),
    1172           15 :                 max_keys_per_list_response: Some(5),
    1173           15 :                 upload_storage_class: None,
    1174           15 :             };
    1175           15 :             let storage = S3Bucket::new(&config, std::time::Duration::ZERO)
    1176           15 :                 .await
    1177           15 :                 .expect("remote storage init");
    1178           45 :             for (test_path_idx, test_path) in all_paths.iter().enumerate() {
    1179           45 :                 let result = storage.relative_path_to_s3_object(test_path);
    1180           45 :                 let expected = expected_outputs[prefix_idx][test_path_idx];
    1181           45 :                 assert_eq!(result, expected);
    1182            3 :             }
    1183            3 :         }
    1184            3 :     }
    1185              : }
        

Generated by: LCOV version 2.1-beta