LCOV - code coverage report
Current view: top level - libs/remote_storage/src - s3_bucket.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 90.8 % 860 781
Test Date: 2025-02-20 13:11:02 Functions: 66.0 % 94 62

            Line data    Source code
       1              : //! AWS S3 storage wrapper around `rusoto` library.
       2              : //!
       3              : //! Respects `prefix_in_bucket` property from [`S3Config`],
       4              : //! allowing multiple api users to independently work with the same S3 bucket, if
       5              : //! their bucket prefixes are both specified and different.
       6              : 
       7              : use std::{
       8              :     borrow::Cow,
       9              :     collections::HashMap,
      10              :     num::NonZeroU32,
      11              :     pin::Pin,
      12              :     sync::Arc,
      13              :     task::{Context, Poll},
      14              :     time::{Duration, SystemTime},
      15              : };
      16              : 
      17              : use anyhow::{anyhow, Context as _};
      18              : use aws_config::{
      19              :     default_provider::credentials::DefaultCredentialsChain,
      20              :     retry::{RetryConfigBuilder, RetryMode},
      21              :     BehaviorVersion,
      22              : };
      23              : use aws_sdk_s3::{
      24              :     config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
      25              :     error::SdkError,
      26              :     operation::{get_object::GetObjectError, head_object::HeadObjectError},
      27              :     types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass},
      28              :     Client,
      29              : };
      30              : use aws_smithy_async::rt::sleep::TokioSleep;
      31              : use http_body_util::StreamBody;
      32              : use http_types::StatusCode;
      33              : 
      34              : use aws_smithy_types::{body::SdkBody, DateTime};
      35              : use aws_smithy_types::{byte_stream::ByteStream, date_time::ConversionError};
      36              : use bytes::Bytes;
      37              : use futures::stream::Stream;
      38              : use futures_util::StreamExt;
      39              : use hyper::body::Frame;
      40              : use scopeguard::ScopeGuard;
      41              : use tokio_util::sync::CancellationToken;
      42              : use utils::backoff;
      43              : 
      44              : use super::StorageMetadata;
      45              : use crate::{
      46              :     config::S3Config,
      47              :     error::Cancelled,
      48              :     metrics::{start_counting_cancelled_wait, start_measuring_requests},
      49              :     support::PermitCarrying,
      50              :     ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
      51              :     RemotePath, RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE_S3,
      52              :     REMOTE_STORAGE_PREFIX_SEPARATOR,
      53              : };
      54              : 
      55              : use crate::metrics::AttemptOutcome;
      56              : pub(super) use crate::metrics::RequestKind;
      57              : 
      58              : /// AWS S3 storage.
      59              : pub struct S3Bucket {
      60              :     client: Client,
      61              :     bucket_name: String,
      62              :     prefix_in_bucket: Option<String>,
      63              :     max_keys_per_list_response: Option<i32>,
      64              :     upload_storage_class: Option<StorageClass>,
      65              :     concurrency_limiter: ConcurrencyLimiter,
      66              :     // Per-request timeout. Accessible for tests.
      67              :     pub timeout: Duration,
      68              : }
      69              : 
      70              : struct GetObjectRequest {
      71              :     bucket: String,
      72              :     key: String,
      73              :     etag: Option<String>,
      74              :     range: Option<String>,
      75              : }
      76              : impl S3Bucket {
      77              :     /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
      78           41 :     pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
      79           41 :         tracing::debug!(
      80            0 :             "Creating s3 remote storage for S3 bucket {}",
      81              :             remote_storage_config.bucket_name
      82              :         );
      83              : 
      84           41 :         let region = Region::new(remote_storage_config.bucket_region.clone());
      85           41 :         let region_opt = Some(region.clone());
      86              : 
      87              :         // https://docs.aws.amazon.com/sdkref/latest/guide/standardized-credentials.html
      88              :         // https://docs.rs/aws-config/latest/aws_config/default_provider/credentials/struct.DefaultCredentialsChain.html
      89              :         // Incomplete list of auth methods used by this:
      90              :         // * "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
      91              :         // * "AWS_PROFILE" / `aws sso login --profile <profile>`
      92              :         // * "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
      93              :         // * http (ECS/EKS) container credentials
      94              :         // * imds v2
      95           41 :         let credentials_provider = DefaultCredentialsChain::builder()
      96           41 :             .region(region)
      97           41 :             .build()
      98           41 :             .await;
      99              : 
     100              :         // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
     101           41 :         let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
     102           41 : 
     103           41 :         let sdk_config_loader: aws_config::ConfigLoader = aws_config::defaults(
     104           41 :             #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
     105           41 :             BehaviorVersion::v2023_11_09(),
     106           41 :         )
     107           41 :         .region(region_opt)
     108           41 :         .identity_cache(IdentityCache::lazy().build())
     109           41 :         .credentials_provider(credentials_provider)
     110           41 :         .sleep_impl(SharedAsyncSleep::from(sleep_impl));
     111           41 : 
     112           41 :         let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
     113           41 :             s.spawn(|| {
     114           41 :                 // TODO: make this function async.
     115           41 :                 tokio::runtime::Builder::new_current_thread()
     116           41 :                     .enable_all()
     117           41 :                     .build()
     118           41 :                     .unwrap()
     119           41 :                     .block_on(sdk_config_loader.load())
     120           41 :             })
     121           41 :             .join()
     122           41 :             .unwrap()
     123           41 :         });
     124           41 : 
     125           41 :         let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
     126              : 
     127              :         // Technically, the `remote_storage_config.endpoint` field only applies to S3 interactions.
     128              :         // (In case we ever re-use the `sdk_config` for more than just the S3 client in the future)
     129           41 :         if let Some(custom_endpoint) = remote_storage_config.endpoint.clone() {
     130            0 :             s3_config_builder = s3_config_builder
     131            0 :                 .endpoint_url(custom_endpoint)
     132            0 :                 .force_path_style(true);
     133           41 :         }
     134              : 
     135              :         // We do our own retries (see [`backoff::retry`]).  However, for the AWS SDK to enable rate limiting in response to throttling
     136              :         // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config.  We set it to use at most one
     137              :         // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
     138           41 :         let mut retry_config = RetryConfigBuilder::new();
     139           41 :         retry_config
     140           41 :             .set_max_attempts(Some(1))
     141           41 :             .set_mode(Some(RetryMode::Adaptive));
     142           41 :         s3_config_builder = s3_config_builder.retry_config(retry_config.build());
     143           41 : 
     144           41 :         let s3_config = s3_config_builder.build();
     145           41 :         let client = aws_sdk_s3::Client::from_conf(s3_config);
     146           41 : 
     147           41 :         let prefix_in_bucket = remote_storage_config
     148           41 :             .prefix_in_bucket
     149           41 :             .as_deref()
     150           41 :             .map(|prefix| {
     151           38 :                 let mut prefix = prefix;
     152           41 :                 while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     153            3 :                     prefix = &prefix[1..]
     154              :                 }
     155              : 
     156           38 :                 let mut prefix = prefix.to_string();
     157           70 :                 while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     158           32 :                     prefix.pop();
     159           32 :                 }
     160           38 :                 prefix
     161           41 :             });
     162           41 : 
     163           41 :         Ok(Self {
     164           41 :             client,
     165           41 :             bucket_name: remote_storage_config.bucket_name.clone(),
     166           41 :             max_keys_per_list_response: remote_storage_config.max_keys_per_list_response,
     167           41 :             prefix_in_bucket,
     168           41 :             concurrency_limiter: ConcurrencyLimiter::new(
     169           41 :                 remote_storage_config.concurrency_limit.get(),
     170           41 :             ),
     171           41 :             upload_storage_class: remote_storage_config.upload_storage_class.clone(),
     172           41 :             timeout,
     173           41 :         })
     174           41 :     }
     175              : 
     176          519 :     fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
     177          519 :         let relative_path =
     178          519 :             match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
     179          519 :                 Some(stripped) => stripped,
     180              :                 // we rely on AWS to return properly prefixed paths
     181              :                 // for requests with a certain prefix
     182            0 :                 None => panic!(
     183            0 :                     "Key {} does not start with bucket prefix {:?}",
     184            0 :                     key, self.prefix_in_bucket
     185            0 :                 ),
     186              :             };
     187          519 :         RemotePath(
     188          519 :             relative_path
     189          519 :                 .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     190          519 :                 .collect(),
     191          519 :         )
     192          519 :     }
     193              : 
     194          555 :     pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
     195          555 :         assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
     196          555 :         let path_string = path.get_path().as_str();
     197          555 :         match &self.prefix_in_bucket {
     198          546 :             Some(prefix) => prefix.clone() + "/" + path_string,
     199            9 :             None => path_string.to_string(),
     200              :         }
     201          555 :     }
     202              : 
     203          471 :     async fn permit(
     204          471 :         &self,
     205          471 :         kind: RequestKind,
     206          471 :         cancel: &CancellationToken,
     207          471 :     ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
     208          471 :         let started_at = start_counting_cancelled_wait(kind);
     209          471 :         let acquire = self.concurrency_limiter.acquire(kind);
     210              : 
     211          471 :         let permit = tokio::select! {
     212          471 :             permit = acquire => permit.expect("semaphore is never closed"),
     213          471 :             _ = cancel.cancelled() => return Err(Cancelled),
     214              :         };
     215              : 
     216          471 :         let started_at = ScopeGuard::into_inner(started_at);
     217          471 :         crate::metrics::BUCKET_METRICS
     218          471 :             .wait_seconds
     219          471 :             .observe_elapsed(kind, started_at);
     220          471 : 
     221          471 :         Ok(permit)
     222          471 :     }
     223              : 
     224           33 :     async fn owned_permit(
     225           33 :         &self,
     226           33 :         kind: RequestKind,
     227           33 :         cancel: &CancellationToken,
     228           33 :     ) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
     229           33 :         let started_at = start_counting_cancelled_wait(kind);
     230           33 :         let acquire = self.concurrency_limiter.acquire_owned(kind);
     231              : 
     232           33 :         let permit = tokio::select! {
     233           33 :             permit = acquire => permit.expect("semaphore is never closed"),
     234           33 :             _ = cancel.cancelled() => return Err(Cancelled),
     235              :         };
     236              : 
     237           33 :         let started_at = ScopeGuard::into_inner(started_at);
     238           33 :         crate::metrics::BUCKET_METRICS
     239           33 :             .wait_seconds
     240           33 :             .observe_elapsed(kind, started_at);
     241           33 :         Ok(permit)
     242           33 :     }
     243              : 
     244           33 :     async fn download_object(
     245           33 :         &self,
     246           33 :         request: GetObjectRequest,
     247           33 :         cancel: &CancellationToken,
     248           33 :     ) -> Result<Download, DownloadError> {
     249           33 :         let kind = RequestKind::Get;
     250              : 
     251           33 :         let permit = self.owned_permit(kind, cancel).await?;
     252              : 
     253           33 :         let started_at = start_measuring_requests(kind);
     254           33 : 
     255           33 :         let mut builder = self
     256           33 :             .client
     257           33 :             .get_object()
     258           33 :             .bucket(request.bucket)
     259           33 :             .key(request.key)
     260           33 :             .set_range(request.range);
     261              : 
     262           33 :         if let Some(etag) = request.etag {
     263            6 :             builder = builder.if_none_match(etag);
     264           27 :         }
     265              : 
     266           33 :         let get_object = builder.send();
     267              : 
     268           33 :         let get_object = tokio::select! {
     269           33 :             res = get_object => res,
     270           33 :             _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
     271           33 :             _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
     272              :         };
     273              : 
     274           33 :         let started_at = ScopeGuard::into_inner(started_at);
     275              : 
     276           28 :         let object_output = match get_object {
     277           28 :             Ok(object_output) => object_output,
     278            4 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
     279              :                 // Count this in the AttemptOutcome::Ok bucket, because 404 is not
     280              :                 // an error: we expect to sometimes fetch an object and find it missing,
     281              :                 // e.g. when probing for timeline indices.
     282            0 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     283            0 :                     kind,
     284            0 :                     AttemptOutcome::Ok,
     285            0 :                     started_at,
     286            0 :                 );
     287            0 :                 return Err(DownloadError::NotFound);
     288              :             }
     289            4 :             Err(SdkError::ServiceError(e))
     290              :                 // aws_smithy_runtime_api::http::response::StatusCode isn't
     291              :                 // re-exported by any aws crates, so just check the numeric
     292              :                 // status against http_types::StatusCode instead of pulling it.
     293            4 :                 if e.raw().status().as_u16() == StatusCode::NotModified =>
     294            4 :             {
     295            4 :                 // Count an unmodified file as a success.
     296            4 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     297            4 :                     kind,
     298            4 :                     AttemptOutcome::Ok,
     299            4 :                     started_at,
     300            4 :                 );
     301            4 :                 return Err(DownloadError::Unmodified);
     302              :             }
     303            1 :             Err(e) => {
     304            1 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     305            1 :                     kind,
     306            1 :                     AttemptOutcome::Err,
     307            1 :                     started_at,
     308            1 :                 );
     309            1 : 
     310            1 :                 return Err(DownloadError::Other(
     311            1 :                     anyhow::Error::new(e).context("download s3 object"),
     312            1 :                 ));
     313              :             }
     314              :         };
     315              : 
     316              :         // even if we would have no timeout left, continue anyways. the caller can decide to ignore
     317              :         // the errors considering timeouts and cancellation.
     318           28 :         let remaining = self.timeout.saturating_sub(started_at.elapsed());
     319           28 : 
     320           28 :         let metadata = object_output.metadata().cloned().map(StorageMetadata);
     321           28 :         let etag = object_output
     322           28 :             .e_tag
     323           28 :             .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?
     324           28 :             .into();
     325           28 :         let last_modified = object_output
     326           28 :             .last_modified
     327           28 :             .ok_or(DownloadError::Other(anyhow::anyhow!(
     328           28 :                 "Missing LastModified header"
     329           28 :             )))?
     330           28 :             .try_into()
     331           28 :             .map_err(|e: ConversionError| DownloadError::Other(e.into()))?;
     332              : 
     333           28 :         let body = object_output.body;
     334           28 :         let body = ByteStreamAsStream::from(body);
     335           28 :         let body = PermitCarrying::new(permit, body);
     336           28 :         let body = TimedDownload::new(started_at, body);
     337           28 : 
     338           28 :         let cancel_or_timeout = crate::support::cancel_or_timeout(remaining, cancel.clone());
     339           28 :         let body = crate::support::DownloadStream::new(cancel_or_timeout, body);
     340           28 : 
     341           28 :         Ok(Download {
     342           28 :             metadata,
     343           28 :             etag,
     344           28 :             last_modified,
     345           28 :             download_stream: Box::pin(body),
     346           28 :         })
     347           33 :     }
     348              : 
     349          198 :     async fn delete_oids(
     350          198 :         &self,
     351          198 :         _permit: &tokio::sync::SemaphorePermit<'_>,
     352          198 :         delete_objects: &[ObjectIdentifier],
     353          198 :         cancel: &CancellationToken,
     354          198 :     ) -> anyhow::Result<()> {
     355          198 :         let kind = RequestKind::Delete;
     356          198 :         let mut cancel = std::pin::pin!(cancel.cancelled());
     357              : 
     358          198 :         for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE_S3) {
     359          198 :             let started_at = start_measuring_requests(kind);
     360              : 
     361          198 :             let req = self
     362          198 :                 .client
     363          198 :                 .delete_objects()
     364          198 :                 .bucket(self.bucket_name.clone())
     365          198 :                 .delete(
     366          198 :                     Delete::builder()
     367          198 :                         .set_objects(Some(chunk.to_vec()))
     368          198 :                         .build()
     369          198 :                         .context("build request")?,
     370              :                 )
     371          198 :                 .send();
     372              : 
     373          198 :             let resp = tokio::select! {
     374          198 :                 resp = req => resp,
     375          198 :                 _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
     376          198 :                 _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
     377              :             };
     378              : 
     379          198 :             let started_at = ScopeGuard::into_inner(started_at);
     380          198 :             crate::metrics::BUCKET_METRICS
     381          198 :                 .req_seconds
     382          198 :                 .observe_elapsed(kind, &resp, started_at);
     383              : 
     384          198 :             let resp = resp.context("request deletion")?;
     385          198 :             crate::metrics::BUCKET_METRICS
     386          198 :                 .deleted_objects_total
     387          198 :                 .inc_by(chunk.len() as u64);
     388              : 
     389          198 :             if let Some(errors) = resp.errors {
     390              :                 // Log a bounded number of the errors within the response:
     391              :                 // these requests can carry 1000 keys so logging each one
     392              :                 // would be too verbose, especially as errors may lead us
     393              :                 // to retry repeatedly.
     394              :                 const LOG_UP_TO_N_ERRORS: usize = 10;
     395            0 :                 for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
     396            0 :                     tracing::warn!(
     397            0 :                         "DeleteObjects key {} failed: {}: {}",
     398            0 :                         e.key.as_ref().map(Cow::from).unwrap_or("".into()),
     399            0 :                         e.code.as_ref().map(Cow::from).unwrap_or("".into()),
     400            0 :                         e.message.as_ref().map(Cow::from).unwrap_or("".into())
     401              :                     );
     402              :                 }
     403              : 
     404            0 :                 return Err(anyhow::anyhow!(
     405            0 :                     "Failed to delete {}/{} objects",
     406            0 :                     errors.len(),
     407            0 :                     chunk.len(),
     408            0 :                 ));
     409          198 :             }
     410              :         }
     411          198 :         Ok(())
     412          198 :     }
     413              : 
     414            6 :     pub fn bucket_name(&self) -> &str {
     415            6 :         &self.bucket_name
     416            6 :     }
     417              : }
     418              : 
     419              : pin_project_lite::pin_project! {
     420              :     struct ByteStreamAsStream {
     421              :         #[pin]
     422              :         inner: aws_smithy_types::byte_stream::ByteStream
     423              :     }
     424              : }
     425              : 
     426              : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
     427           28 :     fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
     428           28 :         ByteStreamAsStream { inner }
     429           28 :     }
     430              : }
     431              : 
     432              : impl Stream for ByteStreamAsStream {
     433              :     type Item = std::io::Result<Bytes>;
     434              : 
     435           47 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     436           47 :         // this does the std::io::ErrorKind::Other conversion
     437           47 :         self.project().inner.poll_next(cx).map_err(|x| x.into())
     438           47 :     }
     439              : 
     440              :     // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
     441              :     // sense and Stream::size_hint does not really
     442              : }
     443              : 
     444              : pin_project_lite::pin_project! {
     445              :     /// Times and tracks the outcome of the request.
     446              :     struct TimedDownload<S> {
     447              :         started_at: std::time::Instant,
     448              :         outcome: AttemptOutcome,
     449              :         #[pin]
     450              :         inner: S
     451              :     }
     452              : 
     453              :     impl<S> PinnedDrop for TimedDownload<S> {
     454              :         fn drop(mut this: Pin<&mut Self>) {
     455              :             crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
     456              :         }
     457              :     }
     458              : }
     459              : 
     460              : impl<S> TimedDownload<S> {
     461           28 :     fn new(started_at: std::time::Instant, inner: S) -> Self {
     462           28 :         TimedDownload {
     463           28 :             started_at,
     464           28 :             outcome: AttemptOutcome::Cancelled,
     465           28 :             inner,
     466           28 :         }
     467           28 :     }
     468              : }
     469              : 
     470              : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
     471              :     type Item = <S as Stream>::Item;
     472              : 
     473           47 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     474              :         use std::task::ready;
     475              : 
     476           47 :         let this = self.project();
     477              : 
     478           47 :         let res = ready!(this.inner.poll_next(cx));
     479           22 :         match &res {
     480           22 :             Some(Ok(_)) => {}
     481            0 :             Some(Err(_)) => *this.outcome = AttemptOutcome::Err,
     482           18 :             None => *this.outcome = AttemptOutcome::Ok,
     483              :         }
     484              : 
     485           40 :         Poll::Ready(res)
     486           47 :     }
     487              : 
     488            0 :     fn size_hint(&self) -> (usize, Option<usize>) {
     489            0 :         self.inner.size_hint()
     490            0 :     }
     491              : }
     492              : 
     493              : impl RemoteStorage for S3Bucket {
     494           64 :     fn list_streaming(
     495           64 :         &self,
     496           64 :         prefix: Option<&RemotePath>,
     497           64 :         mode: ListingMode,
     498           64 :         max_keys: Option<NonZeroU32>,
     499           64 :         cancel: &CancellationToken,
     500           64 :     ) -> impl Stream<Item = Result<Listing, DownloadError>> {
     501           64 :         let kind = RequestKind::List;
     502           64 :         // s3 sdk wants i32
     503           64 :         let mut max_keys = max_keys.map(|mk| mk.get() as i32);
     504           64 : 
     505           64 :         // get the passed prefix or if it is not set use prefix_in_bucket value
     506           64 :         let list_prefix = prefix
     507           64 :             .map(|p| self.relative_path_to_s3_object(p))
     508           64 :             .or_else(|| {
     509           32 :                 self.prefix_in_bucket.clone().map(|mut s| {
     510           32 :                     s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
     511           32 :                     s
     512           32 :                 })
     513           64 :             });
     514           64 : 
     515           64 :         async_stream::stream! {
     516           64 :             let _permit = self.permit(kind, cancel).await?;
     517           64 : 
     518           64 :             let mut continuation_token = None;
     519           64 :             'outer: loop {
     520           64 :                 let started_at = start_measuring_requests(kind);
     521           64 : 
     522           64 :                 // min of two Options, returning Some if one is value and another is
     523           64 :                 // None (None is smaller than anything, so plain min doesn't work).
     524           64 :                 let request_max_keys = self
     525           64 :                     .max_keys_per_list_response
     526           64 :                     .into_iter()
     527           64 :                     .chain(max_keys.into_iter())
     528           64 :                     .min();
     529           64 :                 let mut request = self
     530           64 :                     .client
     531           64 :                     .list_objects_v2()
     532           64 :                     .bucket(self.bucket_name.clone())
     533           64 :                     .set_prefix(list_prefix.clone())
     534           64 :                     .set_continuation_token(continuation_token.clone())
     535           64 :                     .set_max_keys(request_max_keys);
     536           64 : 
     537           64 :                 if let ListingMode::WithDelimiter = mode {
     538           64 :                     request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
     539           64 :                 }
     540           64 : 
     541           64 :                 let request = request.send();
     542           64 : 
     543           64 :                 let response = tokio::select! {
     544           64 :                     res = request => Ok(res),
     545           64 :                     _ = tokio::time::sleep(self.timeout) => Err(DownloadError::Timeout),
     546           64 :                     _ = cancel.cancelled() => Err(DownloadError::Cancelled),
     547           64 :                 }?;
     548           64 : 
     549           64 :                 let response = response
     550           64 :                     .context("Failed to list S3 prefixes")
     551           64 :                     .map_err(DownloadError::Other);
     552           64 : 
     553           64 :                 let started_at = ScopeGuard::into_inner(started_at);
     554           64 : 
     555           64 :                 crate::metrics::BUCKET_METRICS
     556           64 :                     .req_seconds
     557           64 :                     .observe_elapsed(kind, &response, started_at);
     558           64 : 
     559           64 :                 let response = match response {
     560           64 :                     Ok(response) => response,
     561           64 :                     Err(e) => {
     562           64 :                         // The error is potentially retryable, so we must rewind the loop after yielding.
     563           64 :                         yield Err(e);
     564           64 :                         continue 'outer;
     565           64 :                     },
     566           64 :                 };
     567           64 : 
     568           64 :                 let keys = response.contents();
     569           64 :                 let prefixes = response.common_prefixes.as_deref().unwrap_or_default();
     570           64 : 
     571           64 :                 tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
     572           64 :                 let mut result = Listing::default();
     573           64 : 
     574           64 :                 for object in keys {
     575           64 :                     let key = object.key().expect("response does not contain a key");
     576           64 :                     let key = self.s3_object_to_relative_path(key);
     577           64 : 
     578           64 :                     let last_modified = match object.last_modified.map(SystemTime::try_from) {
     579           64 :                         Some(Ok(t)) => t,
     580           64 :                         Some(Err(_)) => {
     581           64 :                             tracing::warn!("Remote storage last_modified {:?} for {} is out of bounds",
     582           64 :                                 object.last_modified, key
     583           64 :                         );
     584           64 :                             SystemTime::now()
     585           64 :                         },
     586           64 :                         None => {
     587           64 :                             SystemTime::now()
     588           64 :                         }
     589           64 :                     };
     590           64 : 
     591           64 :                     let size = object.size.unwrap_or(0) as u64;
     592           64 : 
     593           64 :                     result.keys.push(ListingObject{
     594           64 :                         key,
     595           64 :                         last_modified,
     596           64 :                         size,
     597           64 :                     });
     598           64 :                     if let Some(mut mk) = max_keys {
     599           64 :                         assert!(mk > 0);
     600           64 :                         mk -= 1;
     601           64 :                         if mk == 0 {
     602           64 :                             // limit reached
     603           64 :                             yield Ok(result);
     604           64 :                             break 'outer;
     605           64 :                         }
     606           64 :                         max_keys = Some(mk);
     607           64 :                     }
     608           64 :                 }
     609           64 : 
     610           64 :                 // S3 gives us prefixes like "foo/", we return them like "foo"
     611          104 :                 result.prefixes.extend(prefixes.iter().filter_map(|o| {
     612          104 :                     Some(
     613          104 :                         self.s3_object_to_relative_path(
     614          104 :                             o.prefix()?
     615          104 :                                 .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR),
     616           64 :                         ),
     617           64 :                     )
     618          104 :                 }));
     619           64 : 
     620           64 :                 yield Ok(result);
     621           64 : 
     622           64 :                 continuation_token = match response.next_continuation_token {
     623           64 :                     Some(new_token) => Some(new_token),
     624           64 :                     None => break,
     625           64 :                 };
     626           64 :             }
     627           64 :         }
     628           64 :     }
     629              : 
     630            6 :     async fn head_object(
     631            6 :         &self,
     632            6 :         key: &RemotePath,
     633            6 :         cancel: &CancellationToken,
     634            6 :     ) -> Result<ListingObject, DownloadError> {
     635            6 :         let kind = RequestKind::Head;
     636            6 :         let _permit = self.permit(kind, cancel).await?;
     637              : 
     638            6 :         let started_at = start_measuring_requests(kind);
     639            6 : 
     640            6 :         let head_future = self
     641            6 :             .client
     642            6 :             .head_object()
     643            6 :             .bucket(self.bucket_name())
     644            6 :             .key(self.relative_path_to_s3_object(key))
     645            6 :             .send();
     646            6 : 
     647            6 :         let head_future = tokio::time::timeout(self.timeout, head_future);
     648              : 
     649            6 :         let res = tokio::select! {
     650            6 :             res = head_future => res,
     651            6 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     652              :         };
     653              : 
     654            6 :         let res = res.map_err(|_e| DownloadError::Timeout)?;
     655              : 
     656              :         // do not incl. timeouts as errors in metrics but cancellations
     657            6 :         let started_at = ScopeGuard::into_inner(started_at);
     658            6 :         crate::metrics::BUCKET_METRICS
     659            6 :             .req_seconds
     660            6 :             .observe_elapsed(kind, &res, started_at);
     661              : 
     662            4 :         let data = match res {
     663            4 :             Ok(object_output) => object_output,
     664            2 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), HeadObjectError::NotFound(_)) => {
     665              :                 // Count this in the AttemptOutcome::Ok bucket, because 404 is not
     666              :                 // an error: we expect to sometimes fetch an object and find it missing,
     667              :                 // e.g. when probing for timeline indices.
     668            2 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     669            2 :                     kind,
     670            2 :                     AttemptOutcome::Ok,
     671            2 :                     started_at,
     672            2 :                 );
     673            2 :                 return Err(DownloadError::NotFound);
     674              :             }
     675            0 :             Err(e) => {
     676            0 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     677            0 :                     kind,
     678            0 :                     AttemptOutcome::Err,
     679            0 :                     started_at,
     680            0 :                 );
     681            0 : 
     682            0 :                 return Err(DownloadError::Other(
     683            0 :                     anyhow::Error::new(e).context("s3 head object"),
     684            0 :                 ));
     685              :             }
     686              :         };
     687              : 
     688            4 :         let (Some(last_modified), Some(size)) = (data.last_modified, data.content_length) else {
     689            0 :             return Err(DownloadError::Other(anyhow!(
     690            0 :                 "head_object doesn't contain last_modified or content_length"
     691            0 :             )))?;
     692              :         };
     693              :         Ok(ListingObject {
     694            4 :             key: key.to_owned(),
     695            4 :             last_modified: SystemTime::try_from(last_modified).map_err(|e| {
     696            0 :                 DownloadError::Other(anyhow!("can't convert time '{last_modified}': {e}"))
     697            4 :             })?,
     698            4 :             size: size as u64,
     699              :         })
     700            6 :     }
     701              : 
     702          199 :     async fn upload(
     703          199 :         &self,
     704          199 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     705          199 :         from_size_bytes: usize,
     706          199 :         to: &RemotePath,
     707          199 :         metadata: Option<StorageMetadata>,
     708          199 :         cancel: &CancellationToken,
     709          199 :     ) -> anyhow::Result<()> {
     710          199 :         let kind = RequestKind::Put;
     711          199 :         let _permit = self.permit(kind, cancel).await?;
     712              : 
     713          199 :         let started_at = start_measuring_requests(kind);
     714          199 : 
     715          711 :         let body = StreamBody::new(from.map(|x| x.map(Frame::data)));
     716          199 :         let bytes_stream = ByteStream::new(SdkBody::from_body_1_x(body));
     717              : 
     718          199 :         let upload = self
     719          199 :             .client
     720          199 :             .put_object()
     721          199 :             .bucket(self.bucket_name.clone())
     722          199 :             .key(self.relative_path_to_s3_object(to))
     723          199 :             .set_metadata(metadata.map(|m| m.0))
     724          199 :             .set_storage_class(self.upload_storage_class.clone())
     725          199 :             .content_length(from_size_bytes.try_into()?)
     726          199 :             .body(bytes_stream)
     727          199 :             .send();
     728          199 : 
     729          199 :         let upload = tokio::time::timeout(self.timeout, upload);
     730              : 
     731          199 :         let res = tokio::select! {
     732          199 :             res = upload => res,
     733          199 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     734              :         };
     735              : 
     736          199 :         if let Ok(inner) = &res {
     737          199 :             // do not incl. timeouts as errors in metrics but cancellations
     738          199 :             let started_at = ScopeGuard::into_inner(started_at);
     739          199 :             crate::metrics::BUCKET_METRICS
     740          199 :                 .req_seconds
     741          199 :                 .observe_elapsed(kind, inner, started_at);
     742          199 :         }
     743              : 
     744          199 :         match res {
     745          198 :             Ok(Ok(_put)) => Ok(()),
     746            1 :             Ok(Err(sdk)) => Err(sdk.into()),
     747            0 :             Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
     748              :         }
     749          199 :     }
     750              : 
     751            2 :     async fn copy(
     752            2 :         &self,
     753            2 :         from: &RemotePath,
     754            2 :         to: &RemotePath,
     755            2 :         cancel: &CancellationToken,
     756            2 :     ) -> anyhow::Result<()> {
     757            2 :         let kind = RequestKind::Copy;
     758            2 :         let _permit = self.permit(kind, cancel).await?;
     759              : 
     760            2 :         let timeout = tokio::time::sleep(self.timeout);
     761            2 : 
     762            2 :         let started_at = start_measuring_requests(kind);
     763            2 : 
     764            2 :         // we need to specify bucket_name as a prefix
     765            2 :         let copy_source = format!(
     766            2 :             "{}/{}",
     767            2 :             self.bucket_name,
     768            2 :             self.relative_path_to_s3_object(from)
     769            2 :         );
     770            2 : 
     771            2 :         let op = self
     772            2 :             .client
     773            2 :             .copy_object()
     774            2 :             .bucket(self.bucket_name.clone())
     775            2 :             .key(self.relative_path_to_s3_object(to))
     776            2 :             .set_storage_class(self.upload_storage_class.clone())
     777            2 :             .copy_source(copy_source)
     778            2 :             .send();
     779              : 
     780            2 :         let res = tokio::select! {
     781            2 :             res = op => res,
     782            2 :             _ = timeout => return Err(TimeoutOrCancel::Timeout.into()),
     783            2 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     784              :         };
     785              : 
     786            2 :         let started_at = ScopeGuard::into_inner(started_at);
     787            2 :         crate::metrics::BUCKET_METRICS
     788            2 :             .req_seconds
     789            2 :             .observe_elapsed(kind, &res, started_at);
     790            2 : 
     791            2 :         res?;
     792              : 
     793            2 :         Ok(())
     794            2 :     }
     795              : 
     796           33 :     async fn download(
     797           33 :         &self,
     798           33 :         from: &RemotePath,
     799           33 :         opts: &DownloadOpts,
     800           33 :         cancel: &CancellationToken,
     801           33 :     ) -> Result<Download, DownloadError> {
     802           33 :         // if prefix is not none then download file `prefix/from`
     803           33 :         // if prefix is none then download file `from`
     804           33 :         self.download_object(
     805           33 :             GetObjectRequest {
     806           33 :                 bucket: self.bucket_name.clone(),
     807           33 :                 key: self.relative_path_to_s3_object(from),
     808           33 :                 etag: opts.etag.as_ref().map(|e| e.to_string()),
     809           33 :                 range: opts.byte_range_header(),
     810           33 :             },
     811           33 :             cancel,
     812           33 :         )
     813           33 :         .await
     814           33 :     }
     815              : 
     816          194 :     async fn delete_objects(
     817          194 :         &self,
     818          194 :         paths: &[RemotePath],
     819          194 :         cancel: &CancellationToken,
     820          194 :     ) -> anyhow::Result<()> {
     821          194 :         let kind = RequestKind::Delete;
     822          194 :         let permit = self.permit(kind, cancel).await?;
     823          194 :         let mut delete_objects = Vec::with_capacity(paths.len());
     824          430 :         for path in paths {
     825          236 :             let obj_id = ObjectIdentifier::builder()
     826          236 :                 .set_key(Some(self.relative_path_to_s3_object(path)))
     827          236 :                 .build()
     828          236 :                 .context("convert path to oid")?;
     829          236 :             delete_objects.push(obj_id);
     830              :         }
     831              : 
     832          194 :         self.delete_oids(&permit, &delete_objects, cancel).await
     833          194 :     }
     834              : 
     835            0 :     fn max_keys_per_delete(&self) -> usize {
     836            0 :         MAX_KEYS_PER_DELETE_S3
     837            0 :     }
     838              : 
     839          174 :     async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
     840          174 :         let paths = std::array::from_ref(path);
     841          174 :         self.delete_objects(paths, cancel).await
     842          174 :     }
     843              : 
     844            6 :     async fn time_travel_recover(
     845            6 :         &self,
     846            6 :         prefix: Option<&RemotePath>,
     847            6 :         timestamp: SystemTime,
     848            6 :         done_if_after: SystemTime,
     849            6 :         cancel: &CancellationToken,
     850            6 :     ) -> Result<(), TimeTravelError> {
     851            6 :         let kind = RequestKind::TimeTravel;
     852            6 :         let permit = self.permit(kind, cancel).await?;
     853              : 
     854            6 :         let timestamp = DateTime::from(timestamp);
     855            6 :         let done_if_after = DateTime::from(done_if_after);
     856            6 : 
     857            6 :         tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
     858              : 
     859              :         // get the passed prefix or if it is not set use prefix_in_bucket value
     860            6 :         let prefix = prefix
     861            6 :             .map(|p| self.relative_path_to_s3_object(p))
     862            6 :             .or_else(|| self.prefix_in_bucket.clone());
     863            6 : 
     864            6 :         let warn_threshold = 3;
     865            6 :         let max_retries = 10;
     866            6 :         let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
     867              : 
     868            6 :         let mut key_marker = None;
     869            6 :         let mut version_id_marker = None;
     870            6 :         let mut versions_and_deletes = Vec::new();
     871              : 
     872              :         loop {
     873            6 :             let response = backoff::retry(
     874            6 :                 || async {
     875            6 :                     let op = self
     876            6 :                         .client
     877            6 :                         .list_object_versions()
     878            6 :                         .bucket(self.bucket_name.clone())
     879            6 :                         .set_prefix(prefix.clone())
     880            6 :                         .set_key_marker(key_marker.clone())
     881            6 :                         .set_version_id_marker(version_id_marker.clone())
     882            6 :                         .send();
     883            6 : 
     884            6 :                     tokio::select! {
     885            6 :                         res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
     886            6 :                         _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
     887              :                     }
     888           12 :                 },
     889            6 :                 is_permanent,
     890            6 :                 warn_threshold,
     891            6 :                 max_retries,
     892            6 :                 "listing object versions for time_travel_recover",
     893            6 :                 cancel,
     894            6 :             )
     895            6 :             .await
     896            6 :             .ok_or_else(|| TimeTravelError::Cancelled)
     897            6 :             .and_then(|x| x)?;
     898              : 
     899            6 :             tracing::trace!(
     900            0 :                 "  Got List response version_id_marker={:?}, key_marker={:?}",
     901              :                 response.version_id_marker,
     902              :                 response.key_marker
     903              :             );
     904            6 :             let versions = response
     905            6 :                 .versions
     906            6 :                 .unwrap_or_default()
     907            6 :                 .into_iter()
     908            6 :                 .map(VerOrDelete::from_version);
     909            6 :             let deletes = response
     910            6 :                 .delete_markers
     911            6 :                 .unwrap_or_default()
     912            6 :                 .into_iter()
     913            6 :                 .map(VerOrDelete::from_delete_marker);
     914            6 :             itertools::process_results(versions.chain(deletes), |n_vds| {
     915            6 :                 versions_and_deletes.extend(n_vds)
     916            6 :             })
     917            6 :             .map_err(TimeTravelError::Other)?;
     918           12 :             fn none_if_empty(v: Option<String>) -> Option<String> {
     919           12 :                 v.filter(|v| !v.is_empty())
     920           12 :             }
     921            6 :             version_id_marker = none_if_empty(response.next_version_id_marker);
     922            6 :             key_marker = none_if_empty(response.next_key_marker);
     923            6 :             if version_id_marker.is_none() {
     924              :                 // The final response is not supposed to be truncated
     925            6 :                 if response.is_truncated.unwrap_or_default() {
     926            0 :                     return Err(TimeTravelError::Other(anyhow::anyhow!(
     927            0 :                         "Received truncated ListObjectVersions response for prefix={prefix:?}"
     928            0 :                     )));
     929            6 :                 }
     930            6 :                 break;
     931            0 :             }
     932              :             // Limit the number of versions deletions, mostly so that we don't
     933              :             // keep requesting forever if the list is too long, as we'd put the
     934              :             // list in RAM.
     935              :             // Building a list of 100k entries that reaches the limit roughly takes
     936              :             // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
     937              :             const COMPLEXITY_LIMIT: usize = 100_000;
     938            0 :             if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
     939            0 :                 return Err(TimeTravelError::TooManyVersions);
     940            0 :             }
     941              :         }
     942              : 
     943            6 :         tracing::info!(
     944            0 :             "Built list for time travel with {} versions and deletions",
     945            0 :             versions_and_deletes.len()
     946              :         );
     947              : 
     948              :         // Work on the list of references instead of the objects directly,
     949              :         // otherwise we get lifetime errors in the sort_by_key call below.
     950            6 :         let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
     951            6 : 
     952          124 :         versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
     953            6 : 
     954            6 :         let mut vds_for_key = HashMap::<_, Vec<_>>::new();
     955              : 
     956           42 :         for vd in &versions_and_deletes {
     957              :             let VerOrDelete {
     958           36 :                 version_id, key, ..
     959           36 :             } = &vd;
     960           36 :             if version_id == "null" {
     961            0 :                 return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \
     962            0 :                     indicating either disabled versioning, or legacy objects with null version id values")));
     963           36 :             }
     964           36 :             tracing::trace!(
     965            0 :                 "Parsing version key={key} version_id={version_id} kind={:?}",
     966              :                 vd.kind
     967              :             );
     968              : 
     969           36 :             vds_for_key.entry(key).or_default().push(vd);
     970              :         }
     971           24 :         for (key, versions) in vds_for_key {
     972           18 :             let last_vd = versions.last().unwrap();
     973           18 :             if last_vd.last_modified > done_if_after {
     974            0 :                 tracing::trace!("Key {key} has version later than done_if_after, skipping");
     975            0 :                 continue;
     976           18 :             }
     977              :             // the version we want to restore to.
     978           18 :             let version_to_restore_to =
     979           36 :                 match versions.binary_search_by_key(&timestamp, |tpl| tpl.last_modified) {
     980            0 :                     Ok(v) => v,
     981           18 :                     Err(e) => e,
     982              :                 };
     983           18 :             if version_to_restore_to == versions.len() {
     984            6 :                 tracing::trace!("Key {key} has no changes since timestamp, skipping");
     985            6 :                 continue;
     986           12 :             }
     987           12 :             let mut do_delete = false;
     988           12 :             if version_to_restore_to == 0 {
     989              :                 // All versions more recent, so the key didn't exist at the specified time point.
     990            6 :                 tracing::trace!(
     991            0 :                     "All {} versions more recent for {key}, deleting",
     992            0 :                     versions.len()
     993              :                 );
     994            6 :                 do_delete = true;
     995              :             } else {
     996            6 :                 match &versions[version_to_restore_to - 1] {
     997              :                     VerOrDelete {
     998              :                         kind: VerOrDeleteKind::Version,
     999            6 :                         version_id,
    1000            6 :                         ..
    1001            6 :                     } => {
    1002            6 :                         tracing::trace!("Copying old version {version_id} for {key}...");
    1003              :                         // Restore the state to the last version by copying
    1004            6 :                         let source_id =
    1005            6 :                             format!("{}/{key}?versionId={version_id}", self.bucket_name);
    1006            6 : 
    1007            6 :                         backoff::retry(
    1008            6 :                             || async {
    1009            6 :                                 let op = self
    1010            6 :                                     .client
    1011            6 :                                     .copy_object()
    1012            6 :                                     .bucket(self.bucket_name.clone())
    1013            6 :                                     .key(key)
    1014            6 :                                     .set_storage_class(self.upload_storage_class.clone())
    1015            6 :                                     .copy_source(&source_id)
    1016            6 :                                     .send();
    1017            6 : 
    1018            6 :                                 tokio::select! {
    1019            6 :                                     res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
    1020            6 :                                     _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
    1021              :                                 }
    1022           12 :                             },
    1023            6 :                             is_permanent,
    1024            6 :                             warn_threshold,
    1025            6 :                             max_retries,
    1026            6 :                             "copying object version for time_travel_recover",
    1027            6 :                             cancel,
    1028            6 :                         )
    1029            6 :                         .await
    1030            6 :                         .ok_or_else(|| TimeTravelError::Cancelled)
    1031            6 :                         .and_then(|x| x)?;
    1032            6 :                         tracing::info!(%version_id, %key, "Copied old version in S3");
    1033              :                     }
    1034              :                     VerOrDelete {
    1035              :                         kind: VerOrDeleteKind::DeleteMarker,
    1036              :                         ..
    1037            0 :                     } => {
    1038            0 :                         do_delete = true;
    1039            0 :                     }
    1040              :                 }
    1041              :             };
    1042           12 :             if do_delete {
    1043            6 :                 if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
    1044              :                     // Key has since been deleted (but there was some history), no need to do anything
    1045            2 :                     tracing::trace!("Key {key} already deleted, skipping.");
    1046              :                 } else {
    1047            4 :                     tracing::trace!("Deleting {key}...");
    1048              : 
    1049            4 :                     let oid = ObjectIdentifier::builder()
    1050            4 :                         .key(key.to_owned())
    1051            4 :                         .build()
    1052            4 :                         .map_err(|e| TimeTravelError::Other(e.into()))?;
    1053              : 
    1054            4 :                     self.delete_oids(&permit, &[oid], cancel)
    1055            4 :                         .await
    1056            4 :                         .map_err(|e| {
    1057            0 :                             // delete_oid0 will use TimeoutOrCancel
    1058            0 :                             if TimeoutOrCancel::caused_by_cancel(&e) {
    1059            0 :                                 TimeTravelError::Cancelled
    1060              :                             } else {
    1061            0 :                                 TimeTravelError::Other(e)
    1062              :                             }
    1063            4 :                         })?;
    1064              :                 }
    1065            6 :             }
    1066              :         }
    1067            6 :         Ok(())
    1068            6 :     }
    1069              : }
    1070              : 
    1071              : // Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
    1072              : struct VerOrDelete {
    1073              :     kind: VerOrDeleteKind,
    1074              :     last_modified: DateTime,
    1075              :     version_id: String,
    1076              :     key: String,
    1077              : }
    1078              : 
    1079              : #[derive(Debug)]
    1080              : enum VerOrDeleteKind {
    1081              :     Version,
    1082              :     DeleteMarker,
    1083              : }
    1084              : 
    1085              : impl VerOrDelete {
    1086           36 :     fn with_kind(
    1087           36 :         kind: VerOrDeleteKind,
    1088           36 :         last_modified: Option<DateTime>,
    1089           36 :         version_id: Option<String>,
    1090           36 :         key: Option<String>,
    1091           36 :     ) -> anyhow::Result<Self> {
    1092           36 :         let lvk = (last_modified, version_id, key);
    1093           36 :         let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
    1094            0 :             anyhow::bail!(
    1095            0 :                 "One (or more) of last_modified, key, and id is None. \
    1096            0 :             Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
    1097            0 :                 lvk.0,
    1098            0 :                 lvk.1,
    1099            0 :                 lvk.2,
    1100            0 :             );
    1101              :         };
    1102           36 :         Ok(Self {
    1103           36 :             kind,
    1104           36 :             last_modified,
    1105           36 :             version_id,
    1106           36 :             key,
    1107           36 :         })
    1108           36 :     }
    1109           28 :     fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
    1110           28 :         Self::with_kind(
    1111           28 :             VerOrDeleteKind::Version,
    1112           28 :             v.last_modified,
    1113           28 :             v.version_id,
    1114           28 :             v.key,
    1115           28 :         )
    1116           28 :     }
    1117            8 :     fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
    1118            8 :         Self::with_kind(
    1119            8 :             VerOrDeleteKind::DeleteMarker,
    1120            8 :             v.last_modified,
    1121            8 :             v.version_id,
    1122            8 :             v.key,
    1123            8 :         )
    1124            8 :     }
    1125              : }
    1126              : 
    1127              : #[cfg(test)]
    1128              : mod tests {
    1129              :     use camino::Utf8Path;
    1130              :     use std::num::NonZeroUsize;
    1131              : 
    1132              :     use crate::{RemotePath, S3Bucket, S3Config};
    1133              : 
    1134              :     #[tokio::test]
    1135            3 :     async fn relative_path() {
    1136            3 :         let all_paths = ["", "some/path", "some/path/"];
    1137            3 :         let all_paths: Vec<RemotePath> = all_paths
    1138            3 :             .iter()
    1139            9 :             .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
    1140            3 :             .collect();
    1141            3 :         let prefixes = [
    1142            3 :             None,
    1143            3 :             Some(""),
    1144            3 :             Some("test/prefix"),
    1145            3 :             Some("test/prefix/"),
    1146            3 :             Some("/test/prefix/"),
    1147            3 :         ];
    1148            3 :         let expected_outputs = [
    1149            3 :             vec!["", "some/path", "some/path/"],
    1150            3 :             vec!["/", "/some/path", "/some/path/"],
    1151            3 :             vec![
    1152            3 :                 "test/prefix/",
    1153            3 :                 "test/prefix/some/path",
    1154            3 :                 "test/prefix/some/path/",
    1155            3 :             ],
    1156            3 :             vec![
    1157            3 :                 "test/prefix/",
    1158            3 :                 "test/prefix/some/path",
    1159            3 :                 "test/prefix/some/path/",
    1160            3 :             ],
    1161            3 :             vec![
    1162            3 :                 "test/prefix/",
    1163            3 :                 "test/prefix/some/path",
    1164            3 :                 "test/prefix/some/path/",
    1165            3 :             ],
    1166            3 :         ];
    1167            3 : 
    1168           15 :         for (prefix_idx, prefix) in prefixes.iter().enumerate() {
    1169           15 :             let config = S3Config {
    1170           15 :                 bucket_name: "bucket".to_owned(),
    1171           15 :                 bucket_region: "region".to_owned(),
    1172           15 :                 prefix_in_bucket: prefix.map(str::to_string),
    1173           15 :                 endpoint: None,
    1174           15 :                 concurrency_limit: NonZeroUsize::new(100).unwrap(),
    1175           15 :                 max_keys_per_list_response: Some(5),
    1176           15 :                 upload_storage_class: None,
    1177           15 :             };
    1178           15 :             let storage = S3Bucket::new(&config, std::time::Duration::ZERO)
    1179           15 :                 .await
    1180           15 :                 .expect("remote storage init");
    1181           45 :             for (test_path_idx, test_path) in all_paths.iter().enumerate() {
    1182           45 :                 let result = storage.relative_path_to_s3_object(test_path);
    1183           45 :                 let expected = expected_outputs[prefix_idx][test_path_idx];
    1184           45 :                 assert_eq!(result, expected);
    1185            3 :             }
    1186            3 :         }
    1187            3 :     }
    1188              : }
        

Generated by: LCOV version 2.1-beta