LCOV - code coverage report
Current view: top level - libs/remote_storage/src - s3_bucket.rs (source / functions) Coverage Total Hit
Test: 2b0730d767f560e20b6748f57465922aa8bb805e.info Lines: 86.0 % 865 744
Test Date: 2024-09-25 14:04:07 Functions: 67.8 % 90 61

            Line data    Source code
       1              : //! AWS S3 storage wrapper around `rusoto` library.
       2              : //!
       3              : //! Respects `prefix_in_bucket` property from [`S3Config`],
       4              : //! allowing multiple api users to independently work with the same S3 bucket, if
       5              : //! their bucket prefixes are both specified and different.
       6              : 
       7              : use std::{
       8              :     borrow::Cow,
       9              :     collections::HashMap,
      10              :     num::NonZeroU32,
      11              :     pin::Pin,
      12              :     sync::Arc,
      13              :     task::{Context, Poll},
      14              :     time::{Duration, SystemTime},
      15              : };
      16              : 
      17              : use anyhow::{anyhow, Context as _};
      18              : use aws_config::{
      19              :     default_provider::credentials::DefaultCredentialsChain,
      20              :     retry::{RetryConfigBuilder, RetryMode},
      21              :     BehaviorVersion,
      22              : };
      23              : use aws_sdk_s3::{
      24              :     config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
      25              :     error::SdkError,
      26              :     operation::{get_object::GetObjectError, head_object::HeadObjectError},
      27              :     types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass},
      28              :     Client,
      29              : };
      30              : use aws_smithy_async::rt::sleep::TokioSleep;
      31              : 
      32              : use aws_smithy_types::{body::SdkBody, DateTime};
      33              : use aws_smithy_types::{byte_stream::ByteStream, date_time::ConversionError};
      34              : use bytes::Bytes;
      35              : use futures::stream::Stream;
      36              : use hyper::Body;
      37              : use scopeguard::ScopeGuard;
      38              : use tokio_util::sync::CancellationToken;
      39              : use utils::backoff;
      40              : 
      41              : use super::StorageMetadata;
      42              : use crate::{
      43              :     config::S3Config,
      44              :     error::Cancelled,
      45              :     metrics::{start_counting_cancelled_wait, start_measuring_requests},
      46              :     support::PermitCarrying,
      47              :     ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, ListingObject, RemotePath,
      48              :     RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE,
      49              :     REMOTE_STORAGE_PREFIX_SEPARATOR,
      50              : };
      51              : 
      52              : use crate::metrics::AttemptOutcome;
      53              : pub(super) use crate::metrics::RequestKind;
      54              : 
      55              : /// AWS S3 storage.
      56              : pub struct S3Bucket {
      57              :     client: Client,
      58              :     bucket_name: String,
      59              :     prefix_in_bucket: Option<String>,
      60              :     max_keys_per_list_response: Option<i32>,
      61              :     upload_storage_class: Option<StorageClass>,
      62              :     concurrency_limiter: ConcurrencyLimiter,
      63              :     // Per-request timeout. Accessible for tests.
      64              :     pub timeout: Duration,
      65              : }
      66              : 
      67              : struct GetObjectRequest {
      68              :     bucket: String,
      69              :     key: String,
      70              :     range: Option<String>,
      71              : }
      72              : impl S3Bucket {
      73              :     /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
      74           33 :     pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
      75           33 :         tracing::debug!(
      76            0 :             "Creating s3 remote storage for S3 bucket {}",
      77              :             remote_storage_config.bucket_name
      78              :         );
      79              : 
      80           33 :         let region = Region::new(remote_storage_config.bucket_region.clone());
      81           33 :         let region_opt = Some(region.clone());
      82              : 
      83              :         // https://docs.aws.amazon.com/sdkref/latest/guide/standardized-credentials.html
      84              :         // https://docs.rs/aws-config/latest/aws_config/default_provider/credentials/struct.DefaultCredentialsChain.html
      85              :         // Incomplete list of auth methods used by this:
      86              :         // * "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
      87              :         // * "AWS_PROFILE" / `aws sso login --profile <profile>`
      88              :         // * "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
      89              :         // * http (ECS/EKS) container credentials
      90              :         // * imds v2
      91           33 :         let credentials_provider = DefaultCredentialsChain::builder()
      92           33 :             .region(region)
      93           33 :             .build()
      94            0 :             .await;
      95              : 
      96              :         // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
      97           33 :         let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
      98           33 : 
      99           33 :         let sdk_config_loader: aws_config::ConfigLoader = aws_config::defaults(
     100           33 :             #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
     101           33 :             BehaviorVersion::v2023_11_09(),
     102           33 :         )
     103           33 :         .region(region_opt)
     104           33 :         .identity_cache(IdentityCache::lazy().build())
     105           33 :         .credentials_provider(credentials_provider)
     106           33 :         .sleep_impl(SharedAsyncSleep::from(sleep_impl));
     107           33 : 
     108           33 :         let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
     109           33 :             s.spawn(|| {
     110           33 :                 // TODO: make this function async.
     111           33 :                 tokio::runtime::Builder::new_current_thread()
     112           33 :                     .enable_all()
     113           33 :                     .build()
     114           33 :                     .unwrap()
     115           33 :                     .block_on(sdk_config_loader.load())
     116           33 :             })
     117           33 :             .join()
     118           33 :             .unwrap()
     119           33 :         });
     120           33 : 
     121           33 :         let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
     122              : 
     123              :         // Technically, the `remote_storage_config.endpoint` field only applies to S3 interactions.
     124              :         // (In case we ever re-use the `sdk_config` for more than just the S3 client in the future)
     125           33 :         if let Some(custom_endpoint) = remote_storage_config.endpoint.clone() {
     126            0 :             s3_config_builder = s3_config_builder
     127            0 :                 .endpoint_url(custom_endpoint)
     128            0 :                 .force_path_style(true);
     129           33 :         }
     130              : 
     131              :         // We do our own retries (see [`backoff::retry`]).  However, for the AWS SDK to enable rate limiting in response to throttling
     132              :         // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config.  We set it to use at most one
     133              :         // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
     134           33 :         let mut retry_config = RetryConfigBuilder::new();
     135           33 :         retry_config
     136           33 :             .set_max_attempts(Some(1))
     137           33 :             .set_mode(Some(RetryMode::Adaptive));
     138           33 :         s3_config_builder = s3_config_builder.retry_config(retry_config.build());
     139           33 : 
     140           33 :         let s3_config = s3_config_builder.build();
     141           33 :         let client = aws_sdk_s3::Client::from_conf(s3_config);
     142           33 : 
     143           33 :         let prefix_in_bucket = remote_storage_config
     144           33 :             .prefix_in_bucket
     145           33 :             .as_deref()
     146           33 :             .map(|prefix| {
     147           30 :                 let mut prefix = prefix;
     148           33 :                 while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     149            3 :                     prefix = &prefix[1..]
     150              :                 }
     151              : 
     152           30 :                 let mut prefix = prefix.to_string();
     153           54 :                 while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     154           24 :                     prefix.pop();
     155           24 :                 }
     156           30 :                 prefix
     157           33 :             });
     158           33 : 
     159           33 :         Ok(Self {
     160           33 :             client,
     161           33 :             bucket_name: remote_storage_config.bucket_name.clone(),
     162           33 :             max_keys_per_list_response: remote_storage_config.max_keys_per_list_response,
     163           33 :             prefix_in_bucket,
     164           33 :             concurrency_limiter: ConcurrencyLimiter::new(
     165           33 :                 remote_storage_config.concurrency_limit.get(),
     166           33 :             ),
     167           33 :             upload_storage_class: remote_storage_config.upload_storage_class.clone(),
     168           33 :             timeout,
     169           33 :         })
     170           33 :     }
     171              : 
     172          168 :     fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
     173          168 :         let relative_path =
     174          168 :             match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
     175          168 :                 Some(stripped) => stripped,
     176              :                 // we rely on AWS to return properly prefixed paths
     177              :                 // for requests with a certain prefix
     178            0 :                 None => panic!(
     179            0 :                     "Key {} does not start with bucket prefix {:?}",
     180            0 :                     key, self.prefix_in_bucket
     181            0 :                 ),
     182              :             };
     183          168 :         RemotePath(
     184          168 :             relative_path
     185          168 :                 .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     186          168 :                 .collect(),
     187          168 :         )
     188          168 :     }
     189              : 
     190          298 :     pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
     191          298 :         assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
     192          298 :         let path_string = path.get_path().as_str();
     193          298 :         match &self.prefix_in_bucket {
     194          289 :             Some(prefix) => prefix.clone() + "/" + path_string,
     195            9 :             None => path_string.to_string(),
     196              :         }
     197          298 :     }
     198              : 
     199          243 :     async fn permit(
     200          243 :         &self,
     201          243 :         kind: RequestKind,
     202          243 :         cancel: &CancellationToken,
     203          243 :     ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
     204          243 :         let started_at = start_counting_cancelled_wait(kind);
     205          243 :         let acquire = self.concurrency_limiter.acquire(kind);
     206              : 
     207          243 :         let permit = tokio::select! {
     208          243 :             permit = acquire => permit.expect("semaphore is never closed"),
     209          243 :             _ = cancel.cancelled() => return Err(Cancelled),
     210              :         };
     211              : 
     212          243 :         let started_at = ScopeGuard::into_inner(started_at);
     213          243 :         crate::metrics::BUCKET_METRICS
     214          243 :             .wait_seconds
     215          243 :             .observe_elapsed(kind, started_at);
     216          243 : 
     217          243 :         Ok(permit)
     218          243 :     }
     219              : 
     220           26 :     async fn owned_permit(
     221           26 :         &self,
     222           26 :         kind: RequestKind,
     223           26 :         cancel: &CancellationToken,
     224           26 :     ) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
     225           26 :         let started_at = start_counting_cancelled_wait(kind);
     226           26 :         let acquire = self.concurrency_limiter.acquire_owned(kind);
     227              : 
     228           26 :         let permit = tokio::select! {
     229           26 :             permit = acquire => permit.expect("semaphore is never closed"),
     230           26 :             _ = cancel.cancelled() => return Err(Cancelled),
     231              :         };
     232              : 
     233           26 :         let started_at = ScopeGuard::into_inner(started_at);
     234           26 :         crate::metrics::BUCKET_METRICS
     235           26 :             .wait_seconds
     236           26 :             .observe_elapsed(kind, started_at);
     237           26 :         Ok(permit)
     238           26 :     }
     239              : 
     240           26 :     async fn download_object(
     241           26 :         &self,
     242           26 :         request: GetObjectRequest,
     243           26 :         cancel: &CancellationToken,
     244           26 :     ) -> Result<Download, DownloadError> {
     245           26 :         let kind = RequestKind::Get;
     246              : 
     247           26 :         let permit = self.owned_permit(kind, cancel).await?;
     248              : 
     249           26 :         let started_at = start_measuring_requests(kind);
     250           26 : 
     251           26 :         let get_object = self
     252           26 :             .client
     253           26 :             .get_object()
     254           26 :             .bucket(request.bucket)
     255           26 :             .key(request.key)
     256           26 :             .set_range(request.range)
     257           26 :             .send();
     258              : 
     259           26 :         let get_object = tokio::select! {
     260           26 :             res = get_object => res,
     261           26 :             _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
     262           26 :             _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
     263              :         };
     264              : 
     265           26 :         let started_at = ScopeGuard::into_inner(started_at);
     266              : 
     267           24 :         let object_output = match get_object {
     268           24 :             Ok(object_output) => object_output,
     269            0 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
     270              :                 // Count this in the AttemptOutcome::Ok bucket, because 404 is not
     271              :                 // an error: we expect to sometimes fetch an object and find it missing,
     272              :                 // e.g. when probing for timeline indices.
     273            0 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     274            0 :                     kind,
     275            0 :                     AttemptOutcome::Ok,
     276            0 :                     started_at,
     277            0 :                 );
     278            0 :                 return Err(DownloadError::NotFound);
     279              :             }
     280            2 :             Err(e) => {
     281            2 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     282            2 :                     kind,
     283            2 :                     AttemptOutcome::Err,
     284            2 :                     started_at,
     285            2 :                 );
     286            2 : 
     287            2 :                 return Err(DownloadError::Other(
     288            2 :                     anyhow::Error::new(e).context("download s3 object"),
     289            2 :                 ));
     290              :             }
     291              :         };
     292              : 
     293              :         // even if we would have no timeout left, continue anyways. the caller can decide to ignore
     294              :         // the errors considering timeouts and cancellation.
     295           24 :         let remaining = self.timeout.saturating_sub(started_at.elapsed());
     296           24 : 
     297           24 :         let metadata = object_output.metadata().cloned().map(StorageMetadata);
     298           24 :         let etag = object_output
     299           24 :             .e_tag
     300           24 :             .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?
     301           24 :             .into();
     302           24 :         let last_modified = object_output
     303           24 :             .last_modified
     304           24 :             .ok_or(DownloadError::Other(anyhow::anyhow!(
     305           24 :                 "Missing LastModified header"
     306           24 :             )))?
     307           24 :             .try_into()
     308           24 :             .map_err(|e: ConversionError| DownloadError::Other(e.into()))?;
     309              : 
     310           24 :         let body = object_output.body;
     311           24 :         let body = ByteStreamAsStream::from(body);
     312           24 :         let body = PermitCarrying::new(permit, body);
     313           24 :         let body = TimedDownload::new(started_at, body);
     314           24 : 
     315           24 :         let cancel_or_timeout = crate::support::cancel_or_timeout(remaining, cancel.clone());
     316           24 :         let body = crate::support::DownloadStream::new(cancel_or_timeout, body);
     317           24 : 
     318           24 :         Ok(Download {
     319           24 :             metadata,
     320           24 :             etag,
     321           24 :             last_modified,
     322           24 :             download_stream: Box::pin(body),
     323           24 :         })
     324           26 :     }
     325              : 
     326          106 :     async fn delete_oids(
     327          106 :         &self,
     328          106 :         _permit: &tokio::sync::SemaphorePermit<'_>,
     329          106 :         delete_objects: &[ObjectIdentifier],
     330          106 :         cancel: &CancellationToken,
     331          106 :     ) -> anyhow::Result<()> {
     332          106 :         let kind = RequestKind::Delete;
     333          106 :         let mut cancel = std::pin::pin!(cancel.cancelled());
     334              : 
     335          106 :         for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
     336          106 :             let started_at = start_measuring_requests(kind);
     337              : 
     338          106 :             let req = self
     339          106 :                 .client
     340          106 :                 .delete_objects()
     341          106 :                 .bucket(self.bucket_name.clone())
     342          106 :                 .delete(
     343          106 :                     Delete::builder()
     344          106 :                         .set_objects(Some(chunk.to_vec()))
     345          106 :                         .build()
     346          106 :                         .context("build request")?,
     347              :                 )
     348          106 :                 .send();
     349              : 
     350          106 :             let resp = tokio::select! {
     351          106 :                 resp = req => resp,
     352          106 :                 _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
     353          106 :                 _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
     354              :             };
     355              : 
     356          106 :             let started_at = ScopeGuard::into_inner(started_at);
     357          106 :             crate::metrics::BUCKET_METRICS
     358          106 :                 .req_seconds
     359          106 :                 .observe_elapsed(kind, &resp, started_at);
     360              : 
     361          106 :             let resp = resp.context("request deletion")?;
     362          106 :             crate::metrics::BUCKET_METRICS
     363          106 :                 .deleted_objects_total
     364          106 :                 .inc_by(chunk.len() as u64);
     365              : 
     366          106 :             if let Some(errors) = resp.errors {
     367              :                 // Log a bounded number of the errors within the response:
     368              :                 // these requests can carry 1000 keys so logging each one
     369              :                 // would be too verbose, especially as errors may lead us
     370              :                 // to retry repeatedly.
     371              :                 const LOG_UP_TO_N_ERRORS: usize = 10;
     372            0 :                 for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
     373            0 :                     tracing::warn!(
     374            0 :                         "DeleteObjects key {} failed: {}: {}",
     375            0 :                         e.key.as_ref().map(Cow::from).unwrap_or("".into()),
     376            0 :                         e.code.as_ref().map(Cow::from).unwrap_or("".into()),
     377            0 :                         e.message.as_ref().map(Cow::from).unwrap_or("".into())
     378              :                     );
     379              :                 }
     380              : 
     381            0 :                 return Err(anyhow::anyhow!(
     382            0 :                     "Failed to delete {}/{} objects",
     383            0 :                     errors.len(),
     384            0 :                     chunk.len(),
     385            0 :                 ));
     386          106 :             }
     387              :         }
     388          106 :         Ok(())
     389          106 :     }
     390              : 
     391            0 :     pub fn bucket_name(&self) -> &str {
     392            0 :         &self.bucket_name
     393            0 :     }
     394              : }
     395              : 
     396              : pin_project_lite::pin_project! {
     397              :     struct ByteStreamAsStream {
     398              :         #[pin]
     399              :         inner: aws_smithy_types::byte_stream::ByteStream
     400              :     }
     401              : }
     402              : 
     403              : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
     404           24 :     fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
     405           24 :         ByteStreamAsStream { inner }
     406           24 :     }
     407              : }
     408              : 
     409              : impl Stream for ByteStreamAsStream {
     410              :     type Item = std::io::Result<Bytes>;
     411              : 
     412           46 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     413           46 :         // this does the std::io::ErrorKind::Other conversion
     414           46 :         self.project().inner.poll_next(cx).map_err(|x| x.into())
     415           46 :     }
     416              : 
     417              :     // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
     418              :     // sense and Stream::size_hint does not really
     419              : }
     420              : 
     421              : pin_project_lite::pin_project! {
     422              :     /// Times and tracks the outcome of the request.
     423              :     struct TimedDownload<S> {
     424              :         started_at: std::time::Instant,
     425              :         outcome: AttemptOutcome,
     426              :         #[pin]
     427              :         inner: S
     428              :     }
     429              : 
     430              :     impl<S> PinnedDrop for TimedDownload<S> {
     431              :         fn drop(mut this: Pin<&mut Self>) {
     432              :             crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
     433              :         }
     434              :     }
     435              : }
     436              : 
     437              : impl<S> TimedDownload<S> {
     438           24 :     fn new(started_at: std::time::Instant, inner: S) -> Self {
     439           24 :         TimedDownload {
     440           24 :             started_at,
     441           24 :             outcome: AttemptOutcome::Cancelled,
     442           24 :             inner,
     443           24 :         }
     444           24 :     }
     445              : }
     446              : 
     447              : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
     448              :     type Item = <S as Stream>::Item;
     449              : 
     450           46 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     451              :         use std::task::ready;
     452              : 
     453           46 :         let this = self.project();
     454              : 
     455           46 :         let res = ready!(this.inner.poll_next(cx));
     456           22 :         match &res {
     457           22 :             Some(Ok(_)) => {}
     458            0 :             Some(Err(_)) => *this.outcome = AttemptOutcome::Err,
     459           18 :             None => *this.outcome = AttemptOutcome::Ok,
     460              :         }
     461              : 
     462           40 :         Poll::Ready(res)
     463           46 :     }
     464              : 
     465            0 :     fn size_hint(&self) -> (usize, Option<usize>) {
     466            0 :         self.inner.size_hint()
     467            0 :     }
     468              : }
     469              : 
     470              : impl RemoteStorage for S3Bucket {
     471           26 :     fn list_streaming(
     472           26 :         &self,
     473           26 :         prefix: Option<&RemotePath>,
     474           26 :         mode: ListingMode,
     475           26 :         max_keys: Option<NonZeroU32>,
     476           26 :         cancel: &CancellationToken,
     477           26 :     ) -> impl Stream<Item = Result<Listing, DownloadError>> {
     478           26 :         let kind = RequestKind::List;
     479           26 :         // s3 sdk wants i32
     480           26 :         let mut max_keys = max_keys.map(|mk| mk.get() as i32);
     481           26 : 
     482           26 :         // get the passed prefix or if it is not set use prefix_in_bucket value
     483           26 :         let list_prefix = prefix
     484           26 :             .map(|p| self.relative_path_to_s3_object(p))
     485           26 :             .or_else(|| {
     486           20 :                 self.prefix_in_bucket.clone().map(|mut s| {
     487           20 :                     s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
     488           20 :                     s
     489           20 :                 })
     490           26 :             });
     491           26 : 
     492           26 :         async_stream::stream! {
     493           26 :             let _permit = self.permit(kind, cancel).await?;
     494           26 : 
     495           26 :             let mut continuation_token = None;
     496           26 :             'outer: loop {
     497           26 :                 let started_at = start_measuring_requests(kind);
     498           26 : 
     499           26 :                 // min of two Options, returning Some if one is value and another is
     500           26 :                 // None (None is smaller than anything, so plain min doesn't work).
     501           26 :                 let request_max_keys = self
     502           26 :                     .max_keys_per_list_response
     503           26 :                     .into_iter()
     504           26 :                     .chain(max_keys.into_iter())
     505           26 :                     .min();
     506           26 :                 let mut request = self
     507           26 :                     .client
     508           26 :                     .list_objects_v2()
     509           26 :                     .bucket(self.bucket_name.clone())
     510           26 :                     .set_prefix(list_prefix.clone())
     511           26 :                     .set_continuation_token(continuation_token.clone())
     512           26 :                     .set_max_keys(request_max_keys);
     513           26 : 
     514           26 :                 if let ListingMode::WithDelimiter = mode {
     515           26 :                     request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
     516           26 :                 }
     517           26 : 
     518           26 :                 let request = request.send();
     519           26 : 
     520           26 :                 let response = tokio::select! {
     521           26 :                     res = request => Ok(res),
     522           26 :                     _ = tokio::time::sleep(self.timeout) => Err(DownloadError::Timeout),
     523           26 :                     _ = cancel.cancelled() => Err(DownloadError::Cancelled),
     524           26 :                 }?;
     525           26 : 
     526           26 :                 let response = response
     527           26 :                     .context("Failed to list S3 prefixes")
     528           26 :                     .map_err(DownloadError::Other);
     529           26 : 
     530           26 :                 let started_at = ScopeGuard::into_inner(started_at);
     531           26 : 
     532           26 :                 crate::metrics::BUCKET_METRICS
     533           26 :                     .req_seconds
     534           26 :                     .observe_elapsed(kind, &response, started_at);
     535           26 : 
     536           26 :                 let response = match response {
     537           26 :                     Ok(response) => response,
     538           26 :                     Err(e) => {
     539           26 :                         // The error is potentially retryable, so we must rewind the loop after yielding.
     540           26 :                         yield Err(e);
     541           26 :                         continue 'outer;
     542           26 :                     },
     543           26 :                 };
     544           26 : 
     545           26 :                 let keys = response.contents();
     546           26 :                 let prefixes = response.common_prefixes.as_deref().unwrap_or_default();
     547           26 : 
     548           26 :                 tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
     549           26 :                 let mut result = Listing::default();
     550           26 : 
     551           26 :                 for object in keys {
     552           26 :                     let key = object.key().expect("response does not contain a key");
     553           26 :                     let key = self.s3_object_to_relative_path(key);
     554           26 : 
     555           26 :                     let last_modified = match object.last_modified.map(SystemTime::try_from) {
     556           26 :                         Some(Ok(t)) => t,
     557           26 :                         Some(Err(_)) => {
     558           26 :                             tracing::warn!("Remote storage last_modified {:?} for {} is out of bounds",
     559           26 :                                 object.last_modified, key
     560           26 :                         );
     561           26 :                             SystemTime::now()
     562           26 :                         },
     563           26 :                         None => {
     564           26 :                             SystemTime::now()
     565           26 :                         }
     566           26 :                     };
     567           26 : 
     568           26 :                     let size = object.size.unwrap_or(0) as u64;
     569           26 : 
     570           26 :                     result.keys.push(ListingObject{
     571           26 :                         key,
     572           26 :                         last_modified,
     573           26 :                         size,
     574           26 :                     });
     575           26 :                     if let Some(mut mk) = max_keys {
     576           26 :                         assert!(mk > 0);
     577           26 :                         mk -= 1;
     578           26 :                         if mk == 0 {
     579           26 :                             // limit reached
     580           26 :                             yield Ok(result);
     581           26 :                             break 'outer;
     582           26 :                         }
     583           26 :                         max_keys = Some(mk);
     584           26 :                     }
     585           26 :                 }
     586           26 : 
     587           26 :                 // S3 gives us prefixes like "foo/", we return them like "foo"
     588           88 :                 result.prefixes.extend(prefixes.iter().filter_map(|o| {
     589           88 :                     Some(
     590           88 :                         self.s3_object_to_relative_path(
     591           88 :                             o.prefix()?
     592           88 :                                 .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR),
     593           26 :                         ),
     594           26 :                     )
     595           88 :                 }));
     596           26 : 
     597           26 :                 yield Ok(result);
     598           26 : 
     599           26 :                 continuation_token = match response.next_continuation_token {
     600           26 :                     Some(new_token) => Some(new_token),
     601           26 :                     None => break,
     602           26 :                 };
     603           26 :             }
     604           26 :         }
     605           26 :     }
     606              : 
     607            0 :     async fn head_object(
     608            0 :         &self,
     609            0 :         key: &RemotePath,
     610            0 :         cancel: &CancellationToken,
     611            0 :     ) -> Result<ListingObject, DownloadError> {
     612            0 :         let kind = RequestKind::Head;
     613            0 :         let _permit = self.permit(kind, cancel).await?;
     614              : 
     615            0 :         let started_at = start_measuring_requests(kind);
     616            0 : 
     617            0 :         let head_future = self
     618            0 :             .client
     619            0 :             .head_object()
     620            0 :             .bucket(self.bucket_name())
     621            0 :             .key(self.relative_path_to_s3_object(key))
     622            0 :             .send();
     623            0 : 
     624            0 :         let head_future = tokio::time::timeout(self.timeout, head_future);
     625              : 
     626            0 :         let res = tokio::select! {
     627            0 :             res = head_future => res,
     628            0 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     629              :         };
     630              : 
     631            0 :         let res = res.map_err(|_e| DownloadError::Timeout)?;
     632              : 
     633              :         // do not incl. timeouts as errors in metrics but cancellations
     634            0 :         let started_at = ScopeGuard::into_inner(started_at);
     635            0 :         crate::metrics::BUCKET_METRICS
     636            0 :             .req_seconds
     637            0 :             .observe_elapsed(kind, &res, started_at);
     638              : 
     639            0 :         let data = match res {
     640            0 :             Ok(object_output) => object_output,
     641            0 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), HeadObjectError::NotFound(_)) => {
     642              :                 // Count this in the AttemptOutcome::Ok bucket, because 404 is not
     643              :                 // an error: we expect to sometimes fetch an object and find it missing,
     644              :                 // e.g. when probing for timeline indices.
     645            0 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     646            0 :                     kind,
     647            0 :                     AttemptOutcome::Ok,
     648            0 :                     started_at,
     649            0 :                 );
     650            0 :                 return Err(DownloadError::NotFound);
     651              :             }
     652            0 :             Err(e) => {
     653            0 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     654            0 :                     kind,
     655            0 :                     AttemptOutcome::Err,
     656            0 :                     started_at,
     657            0 :                 );
     658            0 : 
     659            0 :                 return Err(DownloadError::Other(
     660            0 :                     anyhow::Error::new(e).context("s3 head object"),
     661            0 :                 ));
     662              :             }
     663              :         };
     664              : 
     665            0 :         let (Some(last_modified), Some(size)) = (data.last_modified, data.content_length) else {
     666            0 :             return Err(DownloadError::Other(anyhow!(
     667            0 :                 "head_object doesn't contain last_modified or content_length"
     668            0 :             )))?;
     669              :         };
     670              :         Ok(ListingObject {
     671            0 :             key: key.to_owned(),
     672            0 :             last_modified: SystemTime::try_from(last_modified).map_err(|e| {
     673            0 :                 DownloadError::Other(anyhow!("can't convert time '{last_modified}': {e}"))
     674            0 :             })?,
     675            0 :             size: size as u64,
     676              :         })
     677            0 :     }
     678              : 
     679          107 :     async fn upload(
     680          107 :         &self,
     681          107 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     682          107 :         from_size_bytes: usize,
     683          107 :         to: &RemotePath,
     684          107 :         metadata: Option<StorageMetadata>,
     685          107 :         cancel: &CancellationToken,
     686          107 :     ) -> anyhow::Result<()> {
     687          107 :         let kind = RequestKind::Put;
     688          107 :         let _permit = self.permit(kind, cancel).await?;
     689              : 
     690          107 :         let started_at = start_measuring_requests(kind);
     691          107 : 
     692          107 :         let body = Body::wrap_stream(from);
     693          107 :         let bytes_stream = ByteStream::new(SdkBody::from_body_0_4(body));
     694              : 
     695          107 :         let upload = self
     696          107 :             .client
     697          107 :             .put_object()
     698          107 :             .bucket(self.bucket_name.clone())
     699          107 :             .key(self.relative_path_to_s3_object(to))
     700          107 :             .set_metadata(metadata.map(|m| m.0))
     701          107 :             .set_storage_class(self.upload_storage_class.clone())
     702          107 :             .content_length(from_size_bytes.try_into()?)
     703          107 :             .body(bytes_stream)
     704          107 :             .send();
     705          107 : 
     706          107 :         let upload = tokio::time::timeout(self.timeout, upload);
     707              : 
     708          107 :         let res = tokio::select! {
     709          107 :             res = upload => res,
     710          107 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     711              :         };
     712              : 
     713          107 :         if let Ok(inner) = &res {
     714          107 :             // do not incl. timeouts as errors in metrics but cancellations
     715          107 :             let started_at = ScopeGuard::into_inner(started_at);
     716          107 :             crate::metrics::BUCKET_METRICS
     717          107 :                 .req_seconds
     718          107 :                 .observe_elapsed(kind, inner, started_at);
     719          107 :         }
     720              : 
     721          107 :         match res {
     722          106 :             Ok(Ok(_put)) => Ok(()),
     723            1 :             Ok(Err(sdk)) => Err(sdk.into()),
     724            0 :             Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
     725              :         }
     726          107 :     }
     727              : 
     728            2 :     async fn copy(
     729            2 :         &self,
     730            2 :         from: &RemotePath,
     731            2 :         to: &RemotePath,
     732            2 :         cancel: &CancellationToken,
     733            2 :     ) -> anyhow::Result<()> {
     734            2 :         let kind = RequestKind::Copy;
     735            2 :         let _permit = self.permit(kind, cancel).await?;
     736              : 
     737            2 :         let timeout = tokio::time::sleep(self.timeout);
     738            2 : 
     739            2 :         let started_at = start_measuring_requests(kind);
     740            2 : 
     741            2 :         // we need to specify bucket_name as a prefix
     742            2 :         let copy_source = format!(
     743            2 :             "{}/{}",
     744            2 :             self.bucket_name,
     745            2 :             self.relative_path_to_s3_object(from)
     746            2 :         );
     747            2 : 
     748            2 :         let op = self
     749            2 :             .client
     750            2 :             .copy_object()
     751            2 :             .bucket(self.bucket_name.clone())
     752            2 :             .key(self.relative_path_to_s3_object(to))
     753            2 :             .set_storage_class(self.upload_storage_class.clone())
     754            2 :             .copy_source(copy_source)
     755            2 :             .send();
     756              : 
     757            2 :         let res = tokio::select! {
     758            2 :             res = op => res,
     759            2 :             _ = timeout => return Err(TimeoutOrCancel::Timeout.into()),
     760            2 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     761              :         };
     762              : 
     763            2 :         let started_at = ScopeGuard::into_inner(started_at);
     764            2 :         crate::metrics::BUCKET_METRICS
     765            2 :             .req_seconds
     766            2 :             .observe_elapsed(kind, &res, started_at);
     767            2 : 
     768            2 :         res?;
     769              : 
     770            2 :         Ok(())
     771            2 :     }
     772              : 
     773           16 :     async fn download(
     774           16 :         &self,
     775           16 :         from: &RemotePath,
     776           16 :         cancel: &CancellationToken,
     777           16 :     ) -> Result<Download, DownloadError> {
     778           16 :         // if prefix is not none then download file `prefix/from`
     779           16 :         // if prefix is none then download file `from`
     780           16 :         self.download_object(
     781           16 :             GetObjectRequest {
     782           16 :                 bucket: self.bucket_name.clone(),
     783           16 :                 key: self.relative_path_to_s3_object(from),
     784           16 :                 range: None,
     785           16 :             },
     786           16 :             cancel,
     787           16 :         )
     788           28 :         .await
     789           16 :     }
     790              : 
     791           10 :     async fn download_byte_range(
     792           10 :         &self,
     793           10 :         from: &RemotePath,
     794           10 :         start_inclusive: u64,
     795           10 :         end_exclusive: Option<u64>,
     796           10 :         cancel: &CancellationToken,
     797           10 :     ) -> Result<Download, DownloadError> {
     798           10 :         // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
     799           10 :         // and needs both ends to be exclusive
     800           10 :         let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
     801           10 :         let range = Some(match end_inclusive {
     802            6 :             Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
     803            4 :             None => format!("bytes={start_inclusive}-"),
     804              :         });
     805              : 
     806           10 :         self.download_object(
     807           10 :             GetObjectRequest {
     808           10 :                 bucket: self.bucket_name.clone(),
     809           10 :                 key: self.relative_path_to_s3_object(from),
     810           10 :                 range,
     811           10 :             },
     812           10 :             cancel,
     813           10 :         )
     814           10 :         .await
     815           10 :     }
     816              : 
     817          102 :     async fn delete_objects<'a>(
     818          102 :         &self,
     819          102 :         paths: &'a [RemotePath],
     820          102 :         cancel: &CancellationToken,
     821          102 :     ) -> anyhow::Result<()> {
     822          102 :         let kind = RequestKind::Delete;
     823          102 :         let permit = self.permit(kind, cancel).await?;
     824          102 :         let mut delete_objects = Vec::with_capacity(paths.len());
     825          212 :         for path in paths {
     826          110 :             let obj_id = ObjectIdentifier::builder()
     827          110 :                 .set_key(Some(self.relative_path_to_s3_object(path)))
     828          110 :                 .build()
     829          110 :                 .context("convert path to oid")?;
     830          110 :             delete_objects.push(obj_id);
     831              :         }
     832              : 
     833          337 :         self.delete_oids(&permit, &delete_objects, cancel).await
     834          102 :     }
     835              : 
     836           90 :     async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
     837           90 :         let paths = std::array::from_ref(path);
     838          284 :         self.delete_objects(paths, cancel).await
     839           90 :     }
     840              : 
     841            6 :     async fn time_travel_recover(
     842            6 :         &self,
     843            6 :         prefix: Option<&RemotePath>,
     844            6 :         timestamp: SystemTime,
     845            6 :         done_if_after: SystemTime,
     846            6 :         cancel: &CancellationToken,
     847            6 :     ) -> Result<(), TimeTravelError> {
     848            6 :         let kind = RequestKind::TimeTravel;
     849            6 :         let permit = self.permit(kind, cancel).await?;
     850              : 
     851            6 :         let timestamp = DateTime::from(timestamp);
     852            6 :         let done_if_after = DateTime::from(done_if_after);
     853            6 : 
     854            6 :         tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
     855              : 
     856              :         // get the passed prefix or if it is not set use prefix_in_bucket value
     857            6 :         let prefix = prefix
     858            6 :             .map(|p| self.relative_path_to_s3_object(p))
     859            6 :             .or_else(|| self.prefix_in_bucket.clone());
     860            6 : 
     861            6 :         let warn_threshold = 3;
     862            6 :         let max_retries = 10;
     863            6 :         let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
     864              : 
     865            6 :         let mut key_marker = None;
     866            6 :         let mut version_id_marker = None;
     867            6 :         let mut versions_and_deletes = Vec::new();
     868              : 
     869              :         loop {
     870            6 :             let response = backoff::retry(
     871            7 :                 || async {
     872            7 :                     let op = self
     873            7 :                         .client
     874            7 :                         .list_object_versions()
     875            7 :                         .bucket(self.bucket_name.clone())
     876            7 :                         .set_prefix(prefix.clone())
     877            7 :                         .set_key_marker(key_marker.clone())
     878            7 :                         .set_version_id_marker(version_id_marker.clone())
     879            7 :                         .send();
     880            7 : 
     881            7 :                     tokio::select! {
     882            7 :                         res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
     883            7 :                         _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
     884              :                     }
     885           14 :                 },
     886            6 :                 is_permanent,
     887            6 :                 warn_threshold,
     888            6 :                 max_retries,
     889            6 :                 "listing object versions for time_travel_recover",
     890            6 :                 cancel,
     891            6 :             )
     892           49 :             .await
     893            6 :             .ok_or_else(|| TimeTravelError::Cancelled)
     894            6 :             .and_then(|x| x)?;
     895              : 
     896            6 :             tracing::trace!(
     897            0 :                 "  Got List response version_id_marker={:?}, key_marker={:?}",
     898              :                 response.version_id_marker,
     899              :                 response.key_marker
     900              :             );
     901            6 :             let versions = response
     902            6 :                 .versions
     903            6 :                 .unwrap_or_default()
     904            6 :                 .into_iter()
     905            6 :                 .map(VerOrDelete::from_version);
     906            6 :             let deletes = response
     907            6 :                 .delete_markers
     908            6 :                 .unwrap_or_default()
     909            6 :                 .into_iter()
     910            6 :                 .map(VerOrDelete::from_delete_marker);
     911            6 :             itertools::process_results(versions.chain(deletes), |n_vds| {
     912            6 :                 versions_and_deletes.extend(n_vds)
     913            6 :             })
     914            6 :             .map_err(TimeTravelError::Other)?;
     915           12 :             fn none_if_empty(v: Option<String>) -> Option<String> {
     916           12 :                 v.filter(|v| !v.is_empty())
     917           12 :             }
     918            6 :             version_id_marker = none_if_empty(response.next_version_id_marker);
     919            6 :             key_marker = none_if_empty(response.next_key_marker);
     920            6 :             if version_id_marker.is_none() {
     921              :                 // The final response is not supposed to be truncated
     922            6 :                 if response.is_truncated.unwrap_or_default() {
     923            0 :                     return Err(TimeTravelError::Other(anyhow::anyhow!(
     924            0 :                         "Received truncated ListObjectVersions response for prefix={prefix:?}"
     925            0 :                     )));
     926            6 :                 }
     927            6 :                 break;
     928            0 :             }
     929              :             // Limit the number of versions deletions, mostly so that we don't
     930              :             // keep requesting forever if the list is too long, as we'd put the
     931              :             // list in RAM.
     932              :             // Building a list of 100k entries that reaches the limit roughly takes
     933              :             // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
     934              :             const COMPLEXITY_LIMIT: usize = 100_000;
     935            0 :             if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
     936            0 :                 return Err(TimeTravelError::TooManyVersions);
     937            0 :             }
     938              :         }
     939              : 
     940            6 :         tracing::info!(
     941            0 :             "Built list for time travel with {} versions and deletions",
     942            0 :             versions_and_deletes.len()
     943              :         );
     944              : 
     945              :         // Work on the list of references instead of the objects directly,
     946              :         // otherwise we get lifetime errors in the sort_by_key call below.
     947            6 :         let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
     948            6 : 
     949          124 :         versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
     950            6 : 
     951            6 :         let mut vds_for_key = HashMap::<_, Vec<_>>::new();
     952              : 
     953           42 :         for vd in &versions_and_deletes {
     954              :             let VerOrDelete {
     955           36 :                 version_id, key, ..
     956           36 :             } = &vd;
     957           36 :             if version_id == "null" {
     958            0 :                 return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \
     959            0 :                     indicating either disabled versioning, or legacy objects with null version id values")));
     960           36 :             }
     961           36 :             tracing::trace!(
     962            0 :                 "Parsing version key={key} version_id={version_id} kind={:?}",
     963              :                 vd.kind
     964              :             );
     965              : 
     966           36 :             vds_for_key.entry(key).or_default().push(vd);
     967              :         }
     968           24 :         for (key, versions) in vds_for_key {
     969           18 :             let last_vd = versions.last().unwrap();
     970           18 :             if last_vd.last_modified > done_if_after {
     971            0 :                 tracing::trace!("Key {key} has version later than done_if_after, skipping");
     972            0 :                 continue;
     973           18 :             }
     974              :             // the version we want to restore to.
     975           18 :             let version_to_restore_to =
     976           28 :                 match versions.binary_search_by_key(&timestamp, |tpl| tpl.last_modified) {
     977            0 :                     Ok(v) => v,
     978           18 :                     Err(e) => e,
     979              :                 };
     980           18 :             if version_to_restore_to == versions.len() {
     981            6 :                 tracing::trace!("Key {key} has no changes since timestamp, skipping");
     982            6 :                 continue;
     983           12 :             }
     984           12 :             let mut do_delete = false;
     985           12 :             if version_to_restore_to == 0 {
     986              :                 // All versions more recent, so the key didn't exist at the specified time point.
     987            6 :                 tracing::trace!(
     988            0 :                     "All {} versions more recent for {key}, deleting",
     989            0 :                     versions.len()
     990              :                 );
     991            6 :                 do_delete = true;
     992              :             } else {
     993            6 :                 match &versions[version_to_restore_to - 1] {
     994              :                     VerOrDelete {
     995              :                         kind: VerOrDeleteKind::Version,
     996            6 :                         version_id,
     997            6 :                         ..
     998            6 :                     } => {
     999            6 :                         tracing::trace!("Copying old version {version_id} for {key}...");
    1000              :                         // Restore the state to the last version by copying
    1001            6 :                         let source_id =
    1002            6 :                             format!("{}/{key}?versionId={version_id}", self.bucket_name);
    1003            6 : 
    1004            6 :                         backoff::retry(
    1005            6 :                             || async {
    1006            6 :                                 let op = self
    1007            6 :                                     .client
    1008            6 :                                     .copy_object()
    1009            6 :                                     .bucket(self.bucket_name.clone())
    1010            6 :                                     .key(key)
    1011            6 :                                     .set_storage_class(self.upload_storage_class.clone())
    1012            6 :                                     .copy_source(&source_id)
    1013            6 :                                     .send();
    1014            6 : 
    1015            6 :                                 tokio::select! {
    1016            6 :                                     res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
    1017            6 :                                     _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
    1018              :                                 }
    1019           12 :                             },
    1020            6 :                             is_permanent,
    1021            6 :                             warn_threshold,
    1022            6 :                             max_retries,
    1023            6 :                             "copying object version for time_travel_recover",
    1024            6 :                             cancel,
    1025            6 :                         )
    1026           12 :                         .await
    1027            6 :                         .ok_or_else(|| TimeTravelError::Cancelled)
    1028            6 :                         .and_then(|x| x)?;
    1029            6 :                         tracing::info!(%version_id, %key, "Copied old version in S3");
    1030              :                     }
    1031              :                     VerOrDelete {
    1032              :                         kind: VerOrDeleteKind::DeleteMarker,
    1033              :                         ..
    1034            0 :                     } => {
    1035            0 :                         do_delete = true;
    1036            0 :                     }
    1037              :                 }
    1038              :             };
    1039           12 :             if do_delete {
    1040            6 :                 if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
    1041              :                     // Key has since been deleted (but there was some history), no need to do anything
    1042            2 :                     tracing::trace!("Key {key} already deleted, skipping.");
    1043              :                 } else {
    1044            4 :                     tracing::trace!("Deleting {key}...");
    1045              : 
    1046            4 :                     let oid = ObjectIdentifier::builder()
    1047            4 :                         .key(key.to_owned())
    1048            4 :                         .build()
    1049            4 :                         .map_err(|e| TimeTravelError::Other(e.into()))?;
    1050              : 
    1051            4 :                     self.delete_oids(&permit, &[oid], cancel)
    1052            8 :                         .await
    1053            4 :                         .map_err(|e| {
    1054            0 :                             // delete_oid0 will use TimeoutOrCancel
    1055            0 :                             if TimeoutOrCancel::caused_by_cancel(&e) {
    1056            0 :                                 TimeTravelError::Cancelled
    1057              :                             } else {
    1058            0 :                                 TimeTravelError::Other(e)
    1059              :                             }
    1060            4 :                         })?;
    1061              :                 }
    1062            6 :             }
    1063              :         }
    1064            6 :         Ok(())
    1065            6 :     }
    1066              : }
    1067              : 
    1068              : // Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
    1069              : struct VerOrDelete {
    1070              :     kind: VerOrDeleteKind,
    1071              :     last_modified: DateTime,
    1072              :     version_id: String,
    1073              :     key: String,
    1074              : }
    1075              : 
    1076              : #[derive(Debug)]
    1077              : enum VerOrDeleteKind {
    1078              :     Version,
    1079              :     DeleteMarker,
    1080              : }
    1081              : 
    1082              : impl VerOrDelete {
    1083           36 :     fn with_kind(
    1084           36 :         kind: VerOrDeleteKind,
    1085           36 :         last_modified: Option<DateTime>,
    1086           36 :         version_id: Option<String>,
    1087           36 :         key: Option<String>,
    1088           36 :     ) -> anyhow::Result<Self> {
    1089           36 :         let lvk = (last_modified, version_id, key);
    1090           36 :         let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
    1091            0 :             anyhow::bail!(
    1092            0 :                 "One (or more) of last_modified, key, and id is None. \
    1093            0 :             Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
    1094            0 :                 lvk.0,
    1095            0 :                 lvk.1,
    1096            0 :                 lvk.2,
    1097            0 :             );
    1098              :         };
    1099           36 :         Ok(Self {
    1100           36 :             kind,
    1101           36 :             last_modified,
    1102           36 :             version_id,
    1103           36 :             key,
    1104           36 :         })
    1105           36 :     }
    1106           28 :     fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
    1107           28 :         Self::with_kind(
    1108           28 :             VerOrDeleteKind::Version,
    1109           28 :             v.last_modified,
    1110           28 :             v.version_id,
    1111           28 :             v.key,
    1112           28 :         )
    1113           28 :     }
    1114            8 :     fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
    1115            8 :         Self::with_kind(
    1116            8 :             VerOrDeleteKind::DeleteMarker,
    1117            8 :             v.last_modified,
    1118            8 :             v.version_id,
    1119            8 :             v.key,
    1120            8 :         )
    1121            8 :     }
    1122              : }
    1123              : 
    1124              : #[cfg(test)]
    1125              : mod tests {
    1126              :     use camino::Utf8Path;
    1127              :     use std::num::NonZeroUsize;
    1128              : 
    1129              :     use crate::{RemotePath, S3Bucket, S3Config};
    1130              : 
    1131              :     #[tokio::test]
    1132            3 :     async fn relative_path() {
    1133            3 :         let all_paths = ["", "some/path", "some/path/"];
    1134            3 :         let all_paths: Vec<RemotePath> = all_paths
    1135            3 :             .iter()
    1136            9 :             .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
    1137            3 :             .collect();
    1138            3 :         let prefixes = [
    1139            3 :             None,
    1140            3 :             Some(""),
    1141            3 :             Some("test/prefix"),
    1142            3 :             Some("test/prefix/"),
    1143            3 :             Some("/test/prefix/"),
    1144            3 :         ];
    1145            3 :         let expected_outputs = [
    1146            3 :             vec!["", "some/path", "some/path/"],
    1147            3 :             vec!["/", "/some/path", "/some/path/"],
    1148            3 :             vec![
    1149            3 :                 "test/prefix/",
    1150            3 :                 "test/prefix/some/path",
    1151            3 :                 "test/prefix/some/path/",
    1152            3 :             ],
    1153            3 :             vec![
    1154            3 :                 "test/prefix/",
    1155            3 :                 "test/prefix/some/path",
    1156            3 :                 "test/prefix/some/path/",
    1157            3 :             ],
    1158            3 :             vec![
    1159            3 :                 "test/prefix/",
    1160            3 :                 "test/prefix/some/path",
    1161            3 :                 "test/prefix/some/path/",
    1162            3 :             ],
    1163            3 :         ];
    1164            3 : 
    1165           15 :         for (prefix_idx, prefix) in prefixes.iter().enumerate() {
    1166           15 :             let config = S3Config {
    1167           15 :                 bucket_name: "bucket".to_owned(),
    1168           15 :                 bucket_region: "region".to_owned(),
    1169           15 :                 prefix_in_bucket: prefix.map(str::to_string),
    1170           15 :                 endpoint: None,
    1171           15 :                 concurrency_limit: NonZeroUsize::new(100).unwrap(),
    1172           15 :                 max_keys_per_list_response: Some(5),
    1173           15 :                 upload_storage_class: None,
    1174           15 :             };
    1175           15 :             let storage = S3Bucket::new(&config, std::time::Duration::ZERO)
    1176            3 :                 .await
    1177           15 :                 .expect("remote storage init");
    1178           45 :             for (test_path_idx, test_path) in all_paths.iter().enumerate() {
    1179           45 :                 let result = storage.relative_path_to_s3_object(test_path);
    1180           45 :                 let expected = expected_outputs[prefix_idx][test_path_idx];
    1181           45 :                 assert_eq!(result, expected);
    1182            3 :             }
    1183            3 :         }
    1184            3 :     }
    1185              : }
        

Generated by: LCOV version 2.1-beta