LCOV - code coverage report
Current view: top level - libs/remote_storage/src - s3_bucket.rs (source / functions) Coverage Total Hit
Test: 7179b4db0d82ca8088cc95c44c4be4232078509c.info Lines: 89.7 % 857 769
Test Date: 2024-11-21 16:46:58 Functions: 66.7 % 93 62

            Line data    Source code
       1              : //! AWS S3 storage wrapper around `rusoto` library.
       2              : //!
       3              : //! Respects `prefix_in_bucket` property from [`S3Config`],
       4              : //! allowing multiple api users to independently work with the same S3 bucket, if
       5              : //! their bucket prefixes are both specified and different.
       6              : 
       7              : use std::{
       8              :     borrow::Cow,
       9              :     collections::HashMap,
      10              :     num::NonZeroU32,
      11              :     pin::Pin,
      12              :     sync::Arc,
      13              :     task::{Context, Poll},
      14              :     time::{Duration, SystemTime},
      15              : };
      16              : 
      17              : use anyhow::{anyhow, Context as _};
      18              : use aws_config::{
      19              :     default_provider::credentials::DefaultCredentialsChain,
      20              :     retry::{RetryConfigBuilder, RetryMode},
      21              :     BehaviorVersion,
      22              : };
      23              : use aws_sdk_s3::{
      24              :     config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
      25              :     error::SdkError,
      26              :     operation::{get_object::GetObjectError, head_object::HeadObjectError},
      27              :     types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass},
      28              :     Client,
      29              : };
      30              : use aws_smithy_async::rt::sleep::TokioSleep;
      31              : use http_body_util::StreamBody;
      32              : use http_types::StatusCode;
      33              : 
      34              : use aws_smithy_types::{body::SdkBody, DateTime};
      35              : use aws_smithy_types::{byte_stream::ByteStream, date_time::ConversionError};
      36              : use bytes::Bytes;
      37              : use futures::stream::Stream;
      38              : use futures_util::StreamExt;
      39              : use hyper::body::Frame;
      40              : use scopeguard::ScopeGuard;
      41              : use tokio_util::sync::CancellationToken;
      42              : use utils::backoff;
      43              : 
      44              : use super::StorageMetadata;
      45              : use crate::{
      46              :     config::S3Config,
      47              :     error::Cancelled,
      48              :     metrics::{start_counting_cancelled_wait, start_measuring_requests},
      49              :     support::PermitCarrying,
      50              :     ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
      51              :     RemotePath, RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE,
      52              :     REMOTE_STORAGE_PREFIX_SEPARATOR,
      53              : };
      54              : 
      55              : use crate::metrics::AttemptOutcome;
      56              : pub(super) use crate::metrics::RequestKind;
      57              : 
      58              : /// AWS S3 storage.
      59              : pub struct S3Bucket {
      60              :     client: Client,
      61              :     bucket_name: String,
      62              :     prefix_in_bucket: Option<String>,
      63              :     max_keys_per_list_response: Option<i32>,
      64              :     upload_storage_class: Option<StorageClass>,
      65              :     concurrency_limiter: ConcurrencyLimiter,
      66              :     // Per-request timeout. Accessible for tests.
      67              :     pub timeout: Duration,
      68              : }
      69              : 
      70              : struct GetObjectRequest {
      71              :     bucket: String,
      72              :     key: String,
      73              :     etag: Option<String>,
      74              :     range: Option<String>,
      75              : }
      76              : impl S3Bucket {
      77              :     /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
      78           41 :     pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
      79           41 :         tracing::debug!(
      80            0 :             "Creating s3 remote storage for S3 bucket {}",
      81              :             remote_storage_config.bucket_name
      82              :         );
      83              : 
      84           41 :         let region = Region::new(remote_storage_config.bucket_region.clone());
      85           41 :         let region_opt = Some(region.clone());
      86              : 
      87              :         // https://docs.aws.amazon.com/sdkref/latest/guide/standardized-credentials.html
      88              :         // https://docs.rs/aws-config/latest/aws_config/default_provider/credentials/struct.DefaultCredentialsChain.html
      89              :         // Incomplete list of auth methods used by this:
      90              :         // * "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
      91              :         // * "AWS_PROFILE" / `aws sso login --profile <profile>`
      92              :         // * "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
      93              :         // * http (ECS/EKS) container credentials
      94              :         // * imds v2
      95           41 :         let credentials_provider = DefaultCredentialsChain::builder()
      96           41 :             .region(region)
      97           41 :             .build()
      98            0 :             .await;
      99              : 
     100              :         // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
     101           41 :         let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
     102           41 : 
     103           41 :         let sdk_config_loader: aws_config::ConfigLoader = aws_config::defaults(
     104           41 :             #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
     105           41 :             BehaviorVersion::v2023_11_09(),
     106           41 :         )
     107           41 :         .region(region_opt)
     108           41 :         .identity_cache(IdentityCache::lazy().build())
     109           41 :         .credentials_provider(credentials_provider)
     110           41 :         .sleep_impl(SharedAsyncSleep::from(sleep_impl));
     111           41 : 
     112           41 :         let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
     113           41 :             s.spawn(|| {
     114           41 :                 // TODO: make this function async.
     115           41 :                 tokio::runtime::Builder::new_current_thread()
     116           41 :                     .enable_all()
     117           41 :                     .build()
     118           41 :                     .unwrap()
     119           41 :                     .block_on(sdk_config_loader.load())
     120           41 :             })
     121           41 :             .join()
     122           41 :             .unwrap()
     123           41 :         });
     124           41 : 
     125           41 :         let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
     126              : 
     127              :         // Technically, the `remote_storage_config.endpoint` field only applies to S3 interactions.
     128              :         // (In case we ever re-use the `sdk_config` for more than just the S3 client in the future)
     129           41 :         if let Some(custom_endpoint) = remote_storage_config.endpoint.clone() {
     130            0 :             s3_config_builder = s3_config_builder
     131            0 :                 .endpoint_url(custom_endpoint)
     132            0 :                 .force_path_style(true);
     133           41 :         }
     134              : 
     135              :         // We do our own retries (see [`backoff::retry`]).  However, for the AWS SDK to enable rate limiting in response to throttling
     136              :         // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config.  We set it to use at most one
     137              :         // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
     138           41 :         let mut retry_config = RetryConfigBuilder::new();
     139           41 :         retry_config
     140           41 :             .set_max_attempts(Some(1))
     141           41 :             .set_mode(Some(RetryMode::Adaptive));
     142           41 :         s3_config_builder = s3_config_builder.retry_config(retry_config.build());
     143           41 : 
     144           41 :         let s3_config = s3_config_builder.build();
     145           41 :         let client = aws_sdk_s3::Client::from_conf(s3_config);
     146           41 : 
     147           41 :         let prefix_in_bucket = remote_storage_config
     148           41 :             .prefix_in_bucket
     149           41 :             .as_deref()
     150           41 :             .map(|prefix| {
     151           38 :                 let mut prefix = prefix;
     152           41 :                 while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     153            3 :                     prefix = &prefix[1..]
     154              :                 }
     155              : 
     156           38 :                 let mut prefix = prefix.to_string();
     157           70 :                 while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     158           32 :                     prefix.pop();
     159           32 :                 }
     160           38 :                 prefix
     161           41 :             });
     162           41 : 
     163           41 :         Ok(Self {
     164           41 :             client,
     165           41 :             bucket_name: remote_storage_config.bucket_name.clone(),
     166           41 :             max_keys_per_list_response: remote_storage_config.max_keys_per_list_response,
     167           41 :             prefix_in_bucket,
     168           41 :             concurrency_limiter: ConcurrencyLimiter::new(
     169           41 :                 remote_storage_config.concurrency_limit.get(),
     170           41 :             ),
     171           41 :             upload_storage_class: remote_storage_config.upload_storage_class.clone(),
     172           41 :             timeout,
     173           41 :         })
     174           41 :     }
     175              : 
     176          519 :     fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
     177          519 :         let relative_path =
     178          519 :             match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
     179          519 :                 Some(stripped) => stripped,
     180              :                 // we rely on AWS to return properly prefixed paths
     181              :                 // for requests with a certain prefix
     182            0 :                 None => panic!(
     183            0 :                     "Key {} does not start with bucket prefix {:?}",
     184            0 :                     key, self.prefix_in_bucket
     185            0 :                 ),
     186              :             };
     187          519 :         RemotePath(
     188          519 :             relative_path
     189          519 :                 .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     190          519 :                 .collect(),
     191          519 :         )
     192          519 :     }
     193              : 
     194          553 :     pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
     195          553 :         assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
     196          553 :         let path_string = path.get_path().as_str();
     197          553 :         match &self.prefix_in_bucket {
     198          544 :             Some(prefix) => prefix.clone() + "/" + path_string,
     199            9 :             None => path_string.to_string(),
     200              :         }
     201          553 :     }
     202              : 
     203          470 :     async fn permit(
     204          470 :         &self,
     205          470 :         kind: RequestKind,
     206          470 :         cancel: &CancellationToken,
     207          470 :     ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
     208          470 :         let started_at = start_counting_cancelled_wait(kind);
     209          470 :         let acquire = self.concurrency_limiter.acquire(kind);
     210              : 
     211          470 :         let permit = tokio::select! {
     212          470 :             permit = acquire => permit.expect("semaphore is never closed"),
     213          470 :             _ = cancel.cancelled() => return Err(Cancelled),
     214              :         };
     215              : 
     216          470 :         let started_at = ScopeGuard::into_inner(started_at);
     217          470 :         crate::metrics::BUCKET_METRICS
     218          470 :             .wait_seconds
     219          470 :             .observe_elapsed(kind, started_at);
     220          470 : 
     221          470 :         Ok(permit)
     222          470 :     }
     223              : 
     224           32 :     async fn owned_permit(
     225           32 :         &self,
     226           32 :         kind: RequestKind,
     227           32 :         cancel: &CancellationToken,
     228           32 :     ) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
     229           32 :         let started_at = start_counting_cancelled_wait(kind);
     230           32 :         let acquire = self.concurrency_limiter.acquire_owned(kind);
     231              : 
     232           32 :         let permit = tokio::select! {
     233           32 :             permit = acquire => permit.expect("semaphore is never closed"),
     234           32 :             _ = cancel.cancelled() => return Err(Cancelled),
     235              :         };
     236              : 
     237           32 :         let started_at = ScopeGuard::into_inner(started_at);
     238           32 :         crate::metrics::BUCKET_METRICS
     239           32 :             .wait_seconds
     240           32 :             .observe_elapsed(kind, started_at);
     241           32 :         Ok(permit)
     242           32 :     }
     243              : 
     244           32 :     async fn download_object(
     245           32 :         &self,
     246           32 :         request: GetObjectRequest,
     247           32 :         cancel: &CancellationToken,
     248           32 :     ) -> Result<Download, DownloadError> {
     249           32 :         let kind = RequestKind::Get;
     250              : 
     251           32 :         let permit = self.owned_permit(kind, cancel).await?;
     252              : 
     253           32 :         let started_at = start_measuring_requests(kind);
     254           32 : 
     255           32 :         let mut builder = self
     256           32 :             .client
     257           32 :             .get_object()
     258           32 :             .bucket(request.bucket)
     259           32 :             .key(request.key)
     260           32 :             .set_range(request.range);
     261              : 
     262           32 :         if let Some(etag) = request.etag {
     263            6 :             builder = builder.if_none_match(etag);
     264           26 :         }
     265              : 
     266           32 :         let get_object = builder.send();
     267              : 
     268           32 :         let get_object = tokio::select! {
     269           32 :             res = get_object => res,
     270           32 :             _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
     271           32 :             _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
     272              :         };
     273              : 
     274           32 :         let started_at = ScopeGuard::into_inner(started_at);
     275              : 
     276           28 :         let object_output = match get_object {
     277           28 :             Ok(object_output) => object_output,
     278            4 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
     279              :                 // Count this in the AttemptOutcome::Ok bucket, because 404 is not
     280              :                 // an error: we expect to sometimes fetch an object and find it missing,
     281              :                 // e.g. when probing for timeline indices.
     282            0 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     283            0 :                     kind,
     284            0 :                     AttemptOutcome::Ok,
     285            0 :                     started_at,
     286            0 :                 );
     287            0 :                 return Err(DownloadError::NotFound);
     288              :             }
     289            4 :             Err(SdkError::ServiceError(e))
     290              :                 // aws_smithy_runtime_api::http::response::StatusCode isn't
     291              :                 // re-exported by any aws crates, so just check the numeric
     292              :                 // status against http_types::StatusCode instead of pulling it.
     293            4 :                 if e.raw().status().as_u16() == StatusCode::NotModified =>
     294            4 :             {
     295            4 :                 // Count an unmodified file as a success.
     296            4 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     297            4 :                     kind,
     298            4 :                     AttemptOutcome::Ok,
     299            4 :                     started_at,
     300            4 :                 );
     301            4 :                 return Err(DownloadError::Unmodified);
     302              :             }
     303            0 :             Err(e) => {
     304            0 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     305            0 :                     kind,
     306            0 :                     AttemptOutcome::Err,
     307            0 :                     started_at,
     308            0 :                 );
     309            0 : 
     310            0 :                 return Err(DownloadError::Other(
     311            0 :                     anyhow::Error::new(e).context("download s3 object"),
     312            0 :                 ));
     313              :             }
     314              :         };
     315              : 
     316              :         // even if we would have no timeout left, continue anyways. the caller can decide to ignore
     317              :         // the errors considering timeouts and cancellation.
     318           28 :         let remaining = self.timeout.saturating_sub(started_at.elapsed());
     319           28 : 
     320           28 :         let metadata = object_output.metadata().cloned().map(StorageMetadata);
     321           28 :         let etag = object_output
     322           28 :             .e_tag
     323           28 :             .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?
     324           28 :             .into();
     325           28 :         let last_modified = object_output
     326           28 :             .last_modified
     327           28 :             .ok_or(DownloadError::Other(anyhow::anyhow!(
     328           28 :                 "Missing LastModified header"
     329           28 :             )))?
     330           28 :             .try_into()
     331           28 :             .map_err(|e: ConversionError| DownloadError::Other(e.into()))?;
     332              : 
     333           28 :         let body = object_output.body;
     334           28 :         let body = ByteStreamAsStream::from(body);
     335           28 :         let body = PermitCarrying::new(permit, body);
     336           28 :         let body = TimedDownload::new(started_at, body);
     337           28 : 
     338           28 :         let cancel_or_timeout = crate::support::cancel_or_timeout(remaining, cancel.clone());
     339           28 :         let body = crate::support::DownloadStream::new(cancel_or_timeout, body);
     340           28 : 
     341           28 :         Ok(Download {
     342           28 :             metadata,
     343           28 :             etag,
     344           28 :             last_modified,
     345           28 :             download_stream: Box::pin(body),
     346           28 :         })
     347           32 :     }
     348              : 
     349          198 :     async fn delete_oids(
     350          198 :         &self,
     351          198 :         _permit: &tokio::sync::SemaphorePermit<'_>,
     352          198 :         delete_objects: &[ObjectIdentifier],
     353          198 :         cancel: &CancellationToken,
     354          198 :     ) -> anyhow::Result<()> {
     355          198 :         let kind = RequestKind::Delete;
     356          198 :         let mut cancel = std::pin::pin!(cancel.cancelled());
     357              : 
     358          198 :         for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
     359          198 :             let started_at = start_measuring_requests(kind);
     360              : 
     361          198 :             let req = self
     362          198 :                 .client
     363          198 :                 .delete_objects()
     364          198 :                 .bucket(self.bucket_name.clone())
     365          198 :                 .delete(
     366          198 :                     Delete::builder()
     367          198 :                         .set_objects(Some(chunk.to_vec()))
     368          198 :                         .build()
     369          198 :                         .context("build request")?,
     370              :                 )
     371          198 :                 .send();
     372              : 
     373          198 :             let resp = tokio::select! {
     374          198 :                 resp = req => resp,
     375          198 :                 _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
     376          198 :                 _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
     377              :             };
     378              : 
     379          198 :             let started_at = ScopeGuard::into_inner(started_at);
     380          198 :             crate::metrics::BUCKET_METRICS
     381          198 :                 .req_seconds
     382          198 :                 .observe_elapsed(kind, &resp, started_at);
     383              : 
     384          198 :             let resp = resp.context("request deletion")?;
     385          198 :             crate::metrics::BUCKET_METRICS
     386          198 :                 .deleted_objects_total
     387          198 :                 .inc_by(chunk.len() as u64);
     388              : 
     389          198 :             if let Some(errors) = resp.errors {
     390              :                 // Log a bounded number of the errors within the response:
     391              :                 // these requests can carry 1000 keys so logging each one
     392              :                 // would be too verbose, especially as errors may lead us
     393              :                 // to retry repeatedly.
     394              :                 const LOG_UP_TO_N_ERRORS: usize = 10;
     395            0 :                 for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
     396            0 :                     tracing::warn!(
     397            0 :                         "DeleteObjects key {} failed: {}: {}",
     398            0 :                         e.key.as_ref().map(Cow::from).unwrap_or("".into()),
     399            0 :                         e.code.as_ref().map(Cow::from).unwrap_or("".into()),
     400            0 :                         e.message.as_ref().map(Cow::from).unwrap_or("".into())
     401              :                     );
     402              :                 }
     403              : 
     404            0 :                 return Err(anyhow::anyhow!(
     405            0 :                     "Failed to delete {}/{} objects",
     406            0 :                     errors.len(),
     407            0 :                     chunk.len(),
     408            0 :                 ));
     409          198 :             }
     410              :         }
     411          198 :         Ok(())
     412          198 :     }
     413              : 
     414            6 :     pub fn bucket_name(&self) -> &str {
     415            6 :         &self.bucket_name
     416            6 :     }
     417              : }
     418              : 
     419              : pin_project_lite::pin_project! {
     420              :     struct ByteStreamAsStream {
     421              :         #[pin]
     422              :         inner: aws_smithy_types::byte_stream::ByteStream
     423              :     }
     424              : }
     425              : 
     426              : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
     427           28 :     fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
     428           28 :         ByteStreamAsStream { inner }
     429           28 :     }
     430              : }
     431              : 
     432              : impl Stream for ByteStreamAsStream {
     433              :     type Item = std::io::Result<Bytes>;
     434              : 
     435           49 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     436           49 :         // this does the std::io::ErrorKind::Other conversion
     437           49 :         self.project().inner.poll_next(cx).map_err(|x| x.into())
     438           49 :     }
     439              : 
     440              :     // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
     441              :     // sense and Stream::size_hint does not really
     442              : }
     443              : 
     444              : pin_project_lite::pin_project! {
     445              :     /// Times and tracks the outcome of the request.
     446              :     struct TimedDownload<S> {
     447              :         started_at: std::time::Instant,
     448              :         outcome: AttemptOutcome,
     449              :         #[pin]
     450              :         inner: S
     451              :     }
     452              : 
     453              :     impl<S> PinnedDrop for TimedDownload<S> {
     454              :         fn drop(mut this: Pin<&mut Self>) {
     455              :             crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
     456              :         }
     457              :     }
     458              : }
     459              : 
     460              : impl<S> TimedDownload<S> {
     461           28 :     fn new(started_at: std::time::Instant, inner: S) -> Self {
     462           28 :         TimedDownload {
     463           28 :             started_at,
     464           28 :             outcome: AttemptOutcome::Cancelled,
     465           28 :             inner,
     466           28 :         }
     467           28 :     }
     468              : }
     469              : 
     470              : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
     471              :     type Item = <S as Stream>::Item;
     472              : 
     473           49 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     474              :         use std::task::ready;
     475              : 
     476           49 :         let this = self.project();
     477              : 
     478           49 :         let res = ready!(this.inner.poll_next(cx));
     479           22 :         match &res {
     480           22 :             Some(Ok(_)) => {}
     481            0 :             Some(Err(_)) => *this.outcome = AttemptOutcome::Err,
     482           18 :             None => *this.outcome = AttemptOutcome::Ok,
     483              :         }
     484              : 
     485           40 :         Poll::Ready(res)
     486           49 :     }
     487              : 
     488            0 :     fn size_hint(&self) -> (usize, Option<usize>) {
     489            0 :         self.inner.size_hint()
     490            0 :     }
     491              : }
     492              : 
     493              : impl RemoteStorage for S3Bucket {
     494           64 :     fn list_streaming(
     495           64 :         &self,
     496           64 :         prefix: Option<&RemotePath>,
     497           64 :         mode: ListingMode,
     498           64 :         max_keys: Option<NonZeroU32>,
     499           64 :         cancel: &CancellationToken,
     500           64 :     ) -> impl Stream<Item = Result<Listing, DownloadError>> {
     501           64 :         let kind = RequestKind::List;
     502           64 :         // s3 sdk wants i32
     503           64 :         let mut max_keys = max_keys.map(|mk| mk.get() as i32);
     504           64 : 
     505           64 :         // get the passed prefix or if it is not set use prefix_in_bucket value
     506           64 :         let list_prefix = prefix
     507           64 :             .map(|p| self.relative_path_to_s3_object(p))
     508           64 :             .or_else(|| {
     509           32 :                 self.prefix_in_bucket.clone().map(|mut s| {
     510           32 :                     s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
     511           32 :                     s
     512           32 :                 })
     513           64 :             });
     514           64 : 
     515           64 :         async_stream::stream! {
     516           64 :             let _permit = self.permit(kind, cancel).await?;
     517           64 : 
     518           64 :             let mut continuation_token = None;
     519           64 :             'outer: loop {
     520           64 :                 let started_at = start_measuring_requests(kind);
     521           64 : 
     522           64 :                 // min of two Options, returning Some if one is value and another is
     523           64 :                 // None (None is smaller than anything, so plain min doesn't work).
     524           64 :                 let request_max_keys = self
     525           64 :                     .max_keys_per_list_response
     526           64 :                     .into_iter()
     527           64 :                     .chain(max_keys.into_iter())
     528           64 :                     .min();
     529           64 :                 let mut request = self
     530           64 :                     .client
     531           64 :                     .list_objects_v2()
     532           64 :                     .bucket(self.bucket_name.clone())
     533           64 :                     .set_prefix(list_prefix.clone())
     534           64 :                     .set_continuation_token(continuation_token.clone())
     535           64 :                     .set_max_keys(request_max_keys);
     536           64 : 
     537           64 :                 if let ListingMode::WithDelimiter = mode {
     538           64 :                     request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
     539           64 :                 }
     540           64 : 
     541           64 :                 let request = request.send();
     542           64 : 
     543           64 :                 let response = tokio::select! {
     544           64 :                     res = request => Ok(res),
     545           64 :                     _ = tokio::time::sleep(self.timeout) => Err(DownloadError::Timeout),
     546           64 :                     _ = cancel.cancelled() => Err(DownloadError::Cancelled),
     547           64 :                 }?;
     548           64 : 
     549           64 :                 let response = response
     550           64 :                     .context("Failed to list S3 prefixes")
     551           64 :                     .map_err(DownloadError::Other);
     552           64 : 
     553           64 :                 let started_at = ScopeGuard::into_inner(started_at);
     554           64 : 
     555           64 :                 crate::metrics::BUCKET_METRICS
     556           64 :                     .req_seconds
     557           64 :                     .observe_elapsed(kind, &response, started_at);
     558           64 : 
     559           64 :                 let response = match response {
     560           64 :                     Ok(response) => response,
     561           64 :                     Err(e) => {
     562           64 :                         // The error is potentially retryable, so we must rewind the loop after yielding.
     563           64 :                         yield Err(e);
     564           64 :                         continue 'outer;
     565           64 :                     },
     566           64 :                 };
     567           64 : 
     568           64 :                 let keys = response.contents();
     569           64 :                 let prefixes = response.common_prefixes.as_deref().unwrap_or_default();
     570           64 : 
     571           64 :                 tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
     572           64 :                 let mut result = Listing::default();
     573           64 : 
     574           64 :                 for object in keys {
     575           64 :                     let key = object.key().expect("response does not contain a key");
     576           64 :                     let key = self.s3_object_to_relative_path(key);
     577           64 : 
     578           64 :                     let last_modified = match object.last_modified.map(SystemTime::try_from) {
     579           64 :                         Some(Ok(t)) => t,
     580           64 :                         Some(Err(_)) => {
     581           64 :                             tracing::warn!("Remote storage last_modified {:?} for {} is out of bounds",
     582           64 :                                 object.last_modified, key
     583           64 :                         );
     584           64 :                             SystemTime::now()
     585           64 :                         },
     586           64 :                         None => {
     587           64 :                             SystemTime::now()
     588           64 :                         }
     589           64 :                     };
     590           64 : 
     591           64 :                     let size = object.size.unwrap_or(0) as u64;
     592           64 : 
     593           64 :                     result.keys.push(ListingObject{
     594           64 :                         key,
     595           64 :                         last_modified,
     596           64 :                         size,
     597           64 :                     });
     598           64 :                     if let Some(mut mk) = max_keys {
     599           64 :                         assert!(mk > 0);
     600           64 :                         mk -= 1;
     601           64 :                         if mk == 0 {
     602           64 :                             // limit reached
     603           64 :                             yield Ok(result);
     604           64 :                             break 'outer;
     605           64 :                         }
     606           64 :                         max_keys = Some(mk);
     607           64 :                     }
     608           64 :                 }
     609           64 : 
     610           64 :                 // S3 gives us prefixes like "foo/", we return them like "foo"
     611          104 :                 result.prefixes.extend(prefixes.iter().filter_map(|o| {
     612          104 :                     Some(
     613          104 :                         self.s3_object_to_relative_path(
     614          104 :                             o.prefix()?
     615          104 :                                 .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR),
     616           64 :                         ),
     617           64 :                     )
     618          104 :                 }));
     619           64 : 
     620           64 :                 yield Ok(result);
     621           64 : 
     622           64 :                 continuation_token = match response.next_continuation_token {
     623           64 :                     Some(new_token) => Some(new_token),
     624           64 :                     None => break,
     625           64 :                 };
     626           64 :             }
     627           64 :         }
     628           64 :     }
     629              : 
     630            6 :     async fn head_object(
     631            6 :         &self,
     632            6 :         key: &RemotePath,
     633            6 :         cancel: &CancellationToken,
     634            6 :     ) -> Result<ListingObject, DownloadError> {
     635            6 :         let kind = RequestKind::Head;
     636            6 :         let _permit = self.permit(kind, cancel).await?;
     637              : 
     638            6 :         let started_at = start_measuring_requests(kind);
     639            6 : 
     640            6 :         let head_future = self
     641            6 :             .client
     642            6 :             .head_object()
     643            6 :             .bucket(self.bucket_name())
     644            6 :             .key(self.relative_path_to_s3_object(key))
     645            6 :             .send();
     646            6 : 
     647            6 :         let head_future = tokio::time::timeout(self.timeout, head_future);
     648              : 
     649            6 :         let res = tokio::select! {
     650            6 :             res = head_future => res,
     651            6 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     652              :         };
     653              : 
     654            6 :         let res = res.map_err(|_e| DownloadError::Timeout)?;
     655              : 
     656              :         // do not incl. timeouts as errors in metrics but cancellations
     657            6 :         let started_at = ScopeGuard::into_inner(started_at);
     658            6 :         crate::metrics::BUCKET_METRICS
     659            6 :             .req_seconds
     660            6 :             .observe_elapsed(kind, &res, started_at);
     661              : 
     662            4 :         let data = match res {
     663            4 :             Ok(object_output) => object_output,
     664            2 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), HeadObjectError::NotFound(_)) => {
     665              :                 // Count this in the AttemptOutcome::Ok bucket, because 404 is not
     666              :                 // an error: we expect to sometimes fetch an object and find it missing,
     667              :                 // e.g. when probing for timeline indices.
     668            2 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     669            2 :                     kind,
     670            2 :                     AttemptOutcome::Ok,
     671            2 :                     started_at,
     672            2 :                 );
     673            2 :                 return Err(DownloadError::NotFound);
     674              :             }
     675            0 :             Err(e) => {
     676            0 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     677            0 :                     kind,
     678            0 :                     AttemptOutcome::Err,
     679            0 :                     started_at,
     680            0 :                 );
     681            0 : 
     682            0 :                 return Err(DownloadError::Other(
     683            0 :                     anyhow::Error::new(e).context("s3 head object"),
     684            0 :                 ));
     685              :             }
     686              :         };
     687              : 
     688            4 :         let (Some(last_modified), Some(size)) = (data.last_modified, data.content_length) else {
     689            0 :             return Err(DownloadError::Other(anyhow!(
     690            0 :                 "head_object doesn't contain last_modified or content_length"
     691            0 :             )))?;
     692              :         };
     693              :         Ok(ListingObject {
     694            4 :             key: key.to_owned(),
     695            4 :             last_modified: SystemTime::try_from(last_modified).map_err(|e| {
     696            0 :                 DownloadError::Other(anyhow!("can't convert time '{last_modified}': {e}"))
     697            4 :             })?,
     698            4 :             size: size as u64,
     699              :         })
     700            6 :     }
     701              : 
     702          198 :     async fn upload(
     703          198 :         &self,
     704          198 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     705          198 :         from_size_bytes: usize,
     706          198 :         to: &RemotePath,
     707          198 :         metadata: Option<StorageMetadata>,
     708          198 :         cancel: &CancellationToken,
     709          198 :     ) -> anyhow::Result<()> {
     710          198 :         let kind = RequestKind::Put;
     711          198 :         let _permit = self.permit(kind, cancel).await?;
     712              : 
     713          198 :         let started_at = start_measuring_requests(kind);
     714          198 : 
     715          710 :         let body = StreamBody::new(from.map(|x| x.map(Frame::data)));
     716          198 :         let bytes_stream = ByteStream::new(SdkBody::from_body_1_x(body));
     717              : 
     718          198 :         let upload = self
     719          198 :             .client
     720          198 :             .put_object()
     721          198 :             .bucket(self.bucket_name.clone())
     722          198 :             .key(self.relative_path_to_s3_object(to))
     723          198 :             .set_metadata(metadata.map(|m| m.0))
     724          198 :             .set_storage_class(self.upload_storage_class.clone())
     725          198 :             .content_length(from_size_bytes.try_into()?)
     726          198 :             .body(bytes_stream)
     727          198 :             .send();
     728          198 : 
     729          198 :         let upload = tokio::time::timeout(self.timeout, upload);
     730              : 
     731          198 :         let res = tokio::select! {
     732          198 :             res = upload => res,
     733          198 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     734              :         };
     735              : 
     736          198 :         if let Ok(inner) = &res {
     737          198 :             // do not incl. timeouts as errors in metrics but cancellations
     738          198 :             let started_at = ScopeGuard::into_inner(started_at);
     739          198 :             crate::metrics::BUCKET_METRICS
     740          198 :                 .req_seconds
     741          198 :                 .observe_elapsed(kind, inner, started_at);
     742          198 :         }
     743              : 
     744          198 :         match res {
     745          198 :             Ok(Ok(_put)) => Ok(()),
     746            0 :             Ok(Err(sdk)) => Err(sdk.into()),
     747            0 :             Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
     748              :         }
     749          198 :     }
     750              : 
     751            2 :     async fn copy(
     752            2 :         &self,
     753            2 :         from: &RemotePath,
     754            2 :         to: &RemotePath,
     755            2 :         cancel: &CancellationToken,
     756            2 :     ) -> anyhow::Result<()> {
     757            2 :         let kind = RequestKind::Copy;
     758            2 :         let _permit = self.permit(kind, cancel).await?;
     759              : 
     760            2 :         let timeout = tokio::time::sleep(self.timeout);
     761            2 : 
     762            2 :         let started_at = start_measuring_requests(kind);
     763            2 : 
     764            2 :         // we need to specify bucket_name as a prefix
     765            2 :         let copy_source = format!(
     766            2 :             "{}/{}",
     767            2 :             self.bucket_name,
     768            2 :             self.relative_path_to_s3_object(from)
     769            2 :         );
     770            2 : 
     771            2 :         let op = self
     772            2 :             .client
     773            2 :             .copy_object()
     774            2 :             .bucket(self.bucket_name.clone())
     775            2 :             .key(self.relative_path_to_s3_object(to))
     776            2 :             .set_storage_class(self.upload_storage_class.clone())
     777            2 :             .copy_source(copy_source)
     778            2 :             .send();
     779              : 
     780            2 :         let res = tokio::select! {
     781            2 :             res = op => res,
     782            2 :             _ = timeout => return Err(TimeoutOrCancel::Timeout.into()),
     783            2 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     784              :         };
     785              : 
     786            2 :         let started_at = ScopeGuard::into_inner(started_at);
     787            2 :         crate::metrics::BUCKET_METRICS
     788            2 :             .req_seconds
     789            2 :             .observe_elapsed(kind, &res, started_at);
     790            2 : 
     791            2 :         res?;
     792              : 
     793            2 :         Ok(())
     794            2 :     }
     795              : 
     796           32 :     async fn download(
     797           32 :         &self,
     798           32 :         from: &RemotePath,
     799           32 :         opts: &DownloadOpts,
     800           32 :         cancel: &CancellationToken,
     801           32 :     ) -> Result<Download, DownloadError> {
     802           32 :         // if prefix is not none then download file `prefix/from`
     803           32 :         // if prefix is none then download file `from`
     804           32 :         self.download_object(
     805           32 :             GetObjectRequest {
     806           32 :                 bucket: self.bucket_name.clone(),
     807           32 :                 key: self.relative_path_to_s3_object(from),
     808           32 :                 etag: opts.etag.as_ref().map(|e| e.to_string()),
     809           32 :                 range: opts.byte_range_header(),
     810           32 :             },
     811           32 :             cancel,
     812           32 :         )
     813           33 :         .await
     814           32 :     }
     815              : 
     816          194 :     async fn delete_objects<'a>(
     817          194 :         &self,
     818          194 :         paths: &'a [RemotePath],
     819          194 :         cancel: &CancellationToken,
     820          194 :     ) -> anyhow::Result<()> {
     821          194 :         let kind = RequestKind::Delete;
     822          194 :         let permit = self.permit(kind, cancel).await?;
     823          194 :         let mut delete_objects = Vec::with_capacity(paths.len());
     824          430 :         for path in paths {
     825          236 :             let obj_id = ObjectIdentifier::builder()
     826          236 :                 .set_key(Some(self.relative_path_to_s3_object(path)))
     827          236 :                 .build()
     828          236 :                 .context("convert path to oid")?;
     829          236 :             delete_objects.push(obj_id);
     830              :         }
     831              : 
     832          651 :         self.delete_oids(&permit, &delete_objects, cancel).await
     833          194 :     }
     834              : 
     835          174 :     async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
     836          174 :         let paths = std::array::from_ref(path);
     837          584 :         self.delete_objects(paths, cancel).await
     838          174 :     }
     839              : 
     840            6 :     async fn time_travel_recover(
     841            6 :         &self,
     842            6 :         prefix: Option<&RemotePath>,
     843            6 :         timestamp: SystemTime,
     844            6 :         done_if_after: SystemTime,
     845            6 :         cancel: &CancellationToken,
     846            6 :     ) -> Result<(), TimeTravelError> {
     847            6 :         let kind = RequestKind::TimeTravel;
     848            6 :         let permit = self.permit(kind, cancel).await?;
     849              : 
     850            6 :         let timestamp = DateTime::from(timestamp);
     851            6 :         let done_if_after = DateTime::from(done_if_after);
     852            6 : 
     853            6 :         tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
     854              : 
     855              :         // get the passed prefix or if it is not set use prefix_in_bucket value
     856            6 :         let prefix = prefix
     857            6 :             .map(|p| self.relative_path_to_s3_object(p))
     858            6 :             .or_else(|| self.prefix_in_bucket.clone());
     859            6 : 
     860            6 :         let warn_threshold = 3;
     861            6 :         let max_retries = 10;
     862            6 :         let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
     863              : 
     864            6 :         let mut key_marker = None;
     865            6 :         let mut version_id_marker = None;
     866            6 :         let mut versions_and_deletes = Vec::new();
     867              : 
     868              :         loop {
     869            6 :             let response = backoff::retry(
     870            6 :                 || async {
     871            6 :                     let op = self
     872            6 :                         .client
     873            6 :                         .list_object_versions()
     874            6 :                         .bucket(self.bucket_name.clone())
     875            6 :                         .set_prefix(prefix.clone())
     876            6 :                         .set_key_marker(key_marker.clone())
     877            6 :                         .set_version_id_marker(version_id_marker.clone())
     878            6 :                         .send();
     879            6 : 
     880            6 :                     tokio::select! {
     881            6 :                         res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
     882            6 :                         _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
     883              :                     }
     884           12 :                 },
     885            6 :                 is_permanent,
     886            6 :                 warn_threshold,
     887            6 :                 max_retries,
     888            6 :                 "listing object versions for time_travel_recover",
     889            6 :                 cancel,
     890            6 :             )
     891           38 :             .await
     892            6 :             .ok_or_else(|| TimeTravelError::Cancelled)
     893            6 :             .and_then(|x| x)?;
     894              : 
     895            6 :             tracing::trace!(
     896            0 :                 "  Got List response version_id_marker={:?}, key_marker={:?}",
     897              :                 response.version_id_marker,
     898              :                 response.key_marker
     899              :             );
     900            6 :             let versions = response
     901            6 :                 .versions
     902            6 :                 .unwrap_or_default()
     903            6 :                 .into_iter()
     904            6 :                 .map(VerOrDelete::from_version);
     905            6 :             let deletes = response
     906            6 :                 .delete_markers
     907            6 :                 .unwrap_or_default()
     908            6 :                 .into_iter()
     909            6 :                 .map(VerOrDelete::from_delete_marker);
     910            6 :             itertools::process_results(versions.chain(deletes), |n_vds| {
     911            6 :                 versions_and_deletes.extend(n_vds)
     912            6 :             })
     913            6 :             .map_err(TimeTravelError::Other)?;
     914           12 :             fn none_if_empty(v: Option<String>) -> Option<String> {
     915           12 :                 v.filter(|v| !v.is_empty())
     916           12 :             }
     917            6 :             version_id_marker = none_if_empty(response.next_version_id_marker);
     918            6 :             key_marker = none_if_empty(response.next_key_marker);
     919            6 :             if version_id_marker.is_none() {
     920              :                 // The final response is not supposed to be truncated
     921            6 :                 if response.is_truncated.unwrap_or_default() {
     922            0 :                     return Err(TimeTravelError::Other(anyhow::anyhow!(
     923            0 :                         "Received truncated ListObjectVersions response for prefix={prefix:?}"
     924            0 :                     )));
     925            6 :                 }
     926            6 :                 break;
     927            0 :             }
     928              :             // Limit the number of versions deletions, mostly so that we don't
     929              :             // keep requesting forever if the list is too long, as we'd put the
     930              :             // list in RAM.
     931              :             // Building a list of 100k entries that reaches the limit roughly takes
     932              :             // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
     933              :             const COMPLEXITY_LIMIT: usize = 100_000;
     934            0 :             if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
     935            0 :                 return Err(TimeTravelError::TooManyVersions);
     936            0 :             }
     937              :         }
     938              : 
     939            6 :         tracing::info!(
     940            0 :             "Built list for time travel with {} versions and deletions",
     941            0 :             versions_and_deletes.len()
     942              :         );
     943              : 
     944              :         // Work on the list of references instead of the objects directly,
     945              :         // otherwise we get lifetime errors in the sort_by_key call below.
     946            6 :         let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
     947            6 : 
     948          124 :         versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
     949            6 : 
     950            6 :         let mut vds_for_key = HashMap::<_, Vec<_>>::new();
     951              : 
     952           42 :         for vd in &versions_and_deletes {
     953              :             let VerOrDelete {
     954           36 :                 version_id, key, ..
     955           36 :             } = &vd;
     956           36 :             if version_id == "null" {
     957            0 :                 return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \
     958            0 :                     indicating either disabled versioning, or legacy objects with null version id values")));
     959           36 :             }
     960           36 :             tracing::trace!(
     961            0 :                 "Parsing version key={key} version_id={version_id} kind={:?}",
     962              :                 vd.kind
     963              :             );
     964              : 
     965           36 :             vds_for_key.entry(key).or_default().push(vd);
     966              :         }
     967           24 :         for (key, versions) in vds_for_key {
     968           18 :             let last_vd = versions.last().unwrap();
     969           18 :             if last_vd.last_modified > done_if_after {
     970            0 :                 tracing::trace!("Key {key} has version later than done_if_after, skipping");
     971            0 :                 continue;
     972           18 :             }
     973              :             // the version we want to restore to.
     974           18 :             let version_to_restore_to =
     975           36 :                 match versions.binary_search_by_key(&timestamp, |tpl| tpl.last_modified) {
     976            0 :                     Ok(v) => v,
     977           18 :                     Err(e) => e,
     978              :                 };
     979           18 :             if version_to_restore_to == versions.len() {
     980            6 :                 tracing::trace!("Key {key} has no changes since timestamp, skipping");
     981            6 :                 continue;
     982           12 :             }
     983           12 :             let mut do_delete = false;
     984           12 :             if version_to_restore_to == 0 {
     985              :                 // All versions more recent, so the key didn't exist at the specified time point.
     986            6 :                 tracing::trace!(
     987            0 :                     "All {} versions more recent for {key}, deleting",
     988            0 :                     versions.len()
     989              :                 );
     990            6 :                 do_delete = true;
     991              :             } else {
     992            6 :                 match &versions[version_to_restore_to - 1] {
     993              :                     VerOrDelete {
     994              :                         kind: VerOrDeleteKind::Version,
     995            6 :                         version_id,
     996            6 :                         ..
     997            6 :                     } => {
     998            6 :                         tracing::trace!("Copying old version {version_id} for {key}...");
     999              :                         // Restore the state to the last version by copying
    1000            6 :                         let source_id =
    1001            6 :                             format!("{}/{key}?versionId={version_id}", self.bucket_name);
    1002            6 : 
    1003            6 :                         backoff::retry(
    1004            6 :                             || async {
    1005            6 :                                 let op = self
    1006            6 :                                     .client
    1007            6 :                                     .copy_object()
    1008            6 :                                     .bucket(self.bucket_name.clone())
    1009            6 :                                     .key(key)
    1010            6 :                                     .set_storage_class(self.upload_storage_class.clone())
    1011            6 :                                     .copy_source(&source_id)
    1012            6 :                                     .send();
    1013            6 : 
    1014            6 :                                 tokio::select! {
    1015            6 :                                     res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
    1016            6 :                                     _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
    1017              :                                 }
    1018           12 :                             },
    1019            6 :                             is_permanent,
    1020            6 :                             warn_threshold,
    1021            6 :                             max_retries,
    1022            6 :                             "copying object version for time_travel_recover",
    1023            6 :                             cancel,
    1024            6 :                         )
    1025           21 :                         .await
    1026            6 :                         .ok_or_else(|| TimeTravelError::Cancelled)
    1027            6 :                         .and_then(|x| x)?;
    1028            6 :                         tracing::info!(%version_id, %key, "Copied old version in S3");
    1029              :                     }
    1030              :                     VerOrDelete {
    1031              :                         kind: VerOrDeleteKind::DeleteMarker,
    1032              :                         ..
    1033            0 :                     } => {
    1034            0 :                         do_delete = true;
    1035            0 :                     }
    1036              :                 }
    1037              :             };
    1038           12 :             if do_delete {
    1039            6 :                 if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
    1040              :                     // Key has since been deleted (but there was some history), no need to do anything
    1041            2 :                     tracing::trace!("Key {key} already deleted, skipping.");
    1042              :                 } else {
    1043            4 :                     tracing::trace!("Deleting {key}...");
    1044              : 
    1045            4 :                     let oid = ObjectIdentifier::builder()
    1046            4 :                         .key(key.to_owned())
    1047            4 :                         .build()
    1048            4 :                         .map_err(|e| TimeTravelError::Other(e.into()))?;
    1049              : 
    1050            4 :                     self.delete_oids(&permit, &[oid], cancel)
    1051            8 :                         .await
    1052            4 :                         .map_err(|e| {
    1053            0 :                             // delete_oid0 will use TimeoutOrCancel
    1054            0 :                             if TimeoutOrCancel::caused_by_cancel(&e) {
    1055            0 :                                 TimeTravelError::Cancelled
    1056              :                             } else {
    1057            0 :                                 TimeTravelError::Other(e)
    1058              :                             }
    1059            4 :                         })?;
    1060              :                 }
    1061            6 :             }
    1062              :         }
    1063            6 :         Ok(())
    1064            6 :     }
    1065              : }
    1066              : 
    1067              : // Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
    1068              : struct VerOrDelete {
    1069              :     kind: VerOrDeleteKind,
    1070              :     last_modified: DateTime,
    1071              :     version_id: String,
    1072              :     key: String,
    1073              : }
    1074              : 
    1075              : #[derive(Debug)]
    1076              : enum VerOrDeleteKind {
    1077              :     Version,
    1078              :     DeleteMarker,
    1079              : }
    1080              : 
    1081              : impl VerOrDelete {
    1082           36 :     fn with_kind(
    1083           36 :         kind: VerOrDeleteKind,
    1084           36 :         last_modified: Option<DateTime>,
    1085           36 :         version_id: Option<String>,
    1086           36 :         key: Option<String>,
    1087           36 :     ) -> anyhow::Result<Self> {
    1088           36 :         let lvk = (last_modified, version_id, key);
    1089           36 :         let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
    1090            0 :             anyhow::bail!(
    1091            0 :                 "One (or more) of last_modified, key, and id is None. \
    1092            0 :             Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
    1093            0 :                 lvk.0,
    1094            0 :                 lvk.1,
    1095            0 :                 lvk.2,
    1096            0 :             );
    1097              :         };
    1098           36 :         Ok(Self {
    1099           36 :             kind,
    1100           36 :             last_modified,
    1101           36 :             version_id,
    1102           36 :             key,
    1103           36 :         })
    1104           36 :     }
    1105           28 :     fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
    1106           28 :         Self::with_kind(
    1107           28 :             VerOrDeleteKind::Version,
    1108           28 :             v.last_modified,
    1109           28 :             v.version_id,
    1110           28 :             v.key,
    1111           28 :         )
    1112           28 :     }
    1113            8 :     fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
    1114            8 :         Self::with_kind(
    1115            8 :             VerOrDeleteKind::DeleteMarker,
    1116            8 :             v.last_modified,
    1117            8 :             v.version_id,
    1118            8 :             v.key,
    1119            8 :         )
    1120            8 :     }
    1121              : }
    1122              : 
    1123              : #[cfg(test)]
    1124              : mod tests {
    1125              :     use camino::Utf8Path;
    1126              :     use std::num::NonZeroUsize;
    1127              : 
    1128              :     use crate::{RemotePath, S3Bucket, S3Config};
    1129              : 
    1130              :     #[tokio::test]
    1131            3 :     async fn relative_path() {
    1132            3 :         let all_paths = ["", "some/path", "some/path/"];
    1133            3 :         let all_paths: Vec<RemotePath> = all_paths
    1134            3 :             .iter()
    1135            9 :             .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
    1136            3 :             .collect();
    1137            3 :         let prefixes = [
    1138            3 :             None,
    1139            3 :             Some(""),
    1140            3 :             Some("test/prefix"),
    1141            3 :             Some("test/prefix/"),
    1142            3 :             Some("/test/prefix/"),
    1143            3 :         ];
    1144            3 :         let expected_outputs = [
    1145            3 :             vec!["", "some/path", "some/path/"],
    1146            3 :             vec!["/", "/some/path", "/some/path/"],
    1147            3 :             vec![
    1148            3 :                 "test/prefix/",
    1149            3 :                 "test/prefix/some/path",
    1150            3 :                 "test/prefix/some/path/",
    1151            3 :             ],
    1152            3 :             vec![
    1153            3 :                 "test/prefix/",
    1154            3 :                 "test/prefix/some/path",
    1155            3 :                 "test/prefix/some/path/",
    1156            3 :             ],
    1157            3 :             vec![
    1158            3 :                 "test/prefix/",
    1159            3 :                 "test/prefix/some/path",
    1160            3 :                 "test/prefix/some/path/",
    1161            3 :             ],
    1162            3 :         ];
    1163            3 : 
    1164           15 :         for (prefix_idx, prefix) in prefixes.iter().enumerate() {
    1165           15 :             let config = S3Config {
    1166           15 :                 bucket_name: "bucket".to_owned(),
    1167           15 :                 bucket_region: "region".to_owned(),
    1168           15 :                 prefix_in_bucket: prefix.map(str::to_string),
    1169           15 :                 endpoint: None,
    1170           15 :                 concurrency_limit: NonZeroUsize::new(100).unwrap(),
    1171           15 :                 max_keys_per_list_response: Some(5),
    1172           15 :                 upload_storage_class: None,
    1173           15 :             };
    1174           15 :             let storage = S3Bucket::new(&config, std::time::Duration::ZERO)
    1175            3 :                 .await
    1176           15 :                 .expect("remote storage init");
    1177           45 :             for (test_path_idx, test_path) in all_paths.iter().enumerate() {
    1178           45 :                 let result = storage.relative_path_to_s3_object(test_path);
    1179           45 :                 let expected = expected_outputs[prefix_idx][test_path_idx];
    1180           45 :                 assert_eq!(result, expected);
    1181            3 :             }
    1182            3 :         }
    1183            3 :     }
    1184              : }
        

Generated by: LCOV version 2.1-beta