LCOV - code coverage report
Current view: top level - libs/remote_storage/src - s3_bucket.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 87.6 % 725 635
Test Date: 2025-07-16 12:29:03 Functions: 72.4 % 87 63

            Line data    Source code
       1              : //! AWS S3 storage wrapper around `rusoto` library.
       2              : //!
       3              : //! Respects `prefix_in_bucket` property from [`S3Config`],
       4              : //! allowing multiple api users to independently work with the same S3 bucket, if
       5              : //! their bucket prefixes are both specified and different.
       6              : 
       7              : use std::borrow::Cow;
       8              : use std::collections::HashMap;
       9              : use std::num::NonZeroU32;
      10              : use std::pin::Pin;
      11              : use std::sync::Arc;
      12              : use std::task::{Context, Poll};
      13              : use std::time::{Duration, SystemTime};
      14              : 
      15              : use anyhow::{Context as _, anyhow};
      16              : use aws_config::BehaviorVersion;
      17              : use aws_config::default_provider::credentials::DefaultCredentialsChain;
      18              : use aws_config::retry::{RetryConfigBuilder, RetryMode};
      19              : use aws_sdk_s3::Client;
      20              : use aws_sdk_s3::config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep};
      21              : use aws_sdk_s3::error::SdkError;
      22              : use aws_sdk_s3::operation::get_object::GetObjectError;
      23              : use aws_sdk_s3::operation::head_object::HeadObjectError;
      24              : use aws_sdk_s3::types::{Delete, ObjectIdentifier, StorageClass};
      25              : use aws_smithy_async::rt::sleep::TokioSleep;
      26              : use aws_smithy_types::body::SdkBody;
      27              : use aws_smithy_types::byte_stream::ByteStream;
      28              : use aws_smithy_types::date_time::ConversionError;
      29              : use bytes::Bytes;
      30              : use futures::stream::Stream;
      31              : use futures_util::StreamExt;
      32              : use http_body_util::StreamBody;
      33              : use http_types::StatusCode;
      34              : use hyper::body::Frame;
      35              : use scopeguard::ScopeGuard;
      36              : use tokio_util::sync::CancellationToken;
      37              : use utils::backoff;
      38              : 
      39              : use super::StorageMetadata;
      40              : use crate::config::S3Config;
      41              : use crate::error::Cancelled;
      42              : pub(super) use crate::metrics::RequestKind;
      43              : use crate::metrics::{AttemptOutcome, start_counting_cancelled_wait, start_measuring_requests};
      44              : use crate::support::PermitCarrying;
      45              : use crate::{
      46              :     ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
      47              :     MAX_KEYS_PER_DELETE_S3, REMOTE_STORAGE_PREFIX_SEPARATOR, RemotePath, RemoteStorage,
      48              :     TimeTravelError, TimeoutOrCancel, Version, VersionId, VersionKind, VersionListing,
      49              : };
      50              : 
      51              : /// AWS S3 storage.
      52              : pub struct S3Bucket {
      53              :     client: Client,
      54              :     bucket_name: String,
      55              :     prefix_in_bucket: Option<String>,
      56              :     max_keys_per_list_response: Option<i32>,
      57              :     upload_storage_class: Option<StorageClass>,
      58              :     concurrency_limiter: ConcurrencyLimiter,
      59              :     // Per-request timeout. Accessible for tests.
      60              :     pub timeout: Duration,
      61              : }
      62              : 
      63              : struct GetObjectRequest {
      64              :     bucket: String,
      65              :     key: String,
      66              :     etag: Option<String>,
      67              :     range: Option<String>,
      68              :     version_id: Option<String>,
      69              : }
      70              : impl S3Bucket {
      71              :     /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
      72           41 :     pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
      73           41 :         tracing::debug!(
      74            0 :             "Creating s3 remote storage for S3 bucket {}",
      75              :             remote_storage_config.bucket_name
      76              :         );
      77              : 
      78           41 :         let region = Region::new(remote_storage_config.bucket_region.clone());
      79           41 :         let region_opt = Some(region.clone());
      80              : 
      81              :         // https://docs.aws.amazon.com/sdkref/latest/guide/standardized-credentials.html
      82              :         // https://docs.rs/aws-config/latest/aws_config/default_provider/credentials/struct.DefaultCredentialsChain.html
      83              :         // Incomplete list of auth methods used by this:
      84              :         // * "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
      85              :         // * "AWS_PROFILE" / `aws sso login --profile <profile>`
      86              :         // * "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
      87              :         // * http (ECS/EKS) container credentials
      88              :         // * imds v2
      89           41 :         let credentials_provider = DefaultCredentialsChain::builder()
      90           41 :             .region(region)
      91           41 :             .build()
      92           41 :             .await;
      93              : 
      94              :         // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
      95           41 :         let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
      96              : 
      97           41 :         let sdk_config_loader: aws_config::ConfigLoader = aws_config::defaults(
      98              :             #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
      99           41 :             BehaviorVersion::v2023_11_09(),
     100              :         )
     101           41 :         .region(region_opt)
     102           41 :         .identity_cache(IdentityCache::lazy().build())
     103           41 :         .credentials_provider(credentials_provider)
     104           41 :         .sleep_impl(SharedAsyncSleep::from(sleep_impl));
     105              : 
     106           41 :         let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
     107           41 :             s.spawn(|| {
     108              :                 // TODO: make this function async.
     109           41 :                 tokio::runtime::Builder::new_current_thread()
     110           41 :                     .enable_all()
     111           41 :                     .build()
     112           41 :                     .unwrap()
     113           41 :                     .block_on(sdk_config_loader.load())
     114           41 :             })
     115           41 :             .join()
     116           41 :             .unwrap()
     117           41 :         });
     118              : 
     119           41 :         let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
     120              : 
     121              :         // Technically, the `remote_storage_config.endpoint` field only applies to S3 interactions.
     122              :         // (In case we ever re-use the `sdk_config` for more than just the S3 client in the future)
     123           41 :         if let Some(custom_endpoint) = remote_storage_config.endpoint.clone() {
     124            0 :             s3_config_builder = s3_config_builder
     125            0 :                 .endpoint_url(custom_endpoint)
     126            0 :                 .force_path_style(true);
     127           41 :         }
     128              : 
     129              :         // We do our own retries (see [`backoff::retry`]).  However, for the AWS SDK to enable rate limiting in response to throttling
     130              :         // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config.  We set it to use at most one
     131              :         // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
     132           41 :         let mut retry_config = RetryConfigBuilder::new();
     133           41 :         retry_config
     134           41 :             .set_max_attempts(Some(1))
     135           41 :             .set_mode(Some(RetryMode::Adaptive));
     136           41 :         s3_config_builder = s3_config_builder.retry_config(retry_config.build());
     137              : 
     138           41 :         let s3_config = s3_config_builder.build();
     139           41 :         let client = aws_sdk_s3::Client::from_conf(s3_config);
     140              : 
     141           41 :         let prefix_in_bucket = remote_storage_config
     142           41 :             .prefix_in_bucket
     143           41 :             .as_deref()
     144           41 :             .map(|prefix| {
     145           38 :                 let mut prefix = prefix;
     146           41 :                 while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     147            3 :                     prefix = &prefix[1..]
     148              :                 }
     149              : 
     150           38 :                 let mut prefix = prefix.to_string();
     151           70 :                 while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     152           32 :                     prefix.pop();
     153           32 :                 }
     154           38 :                 prefix
     155           38 :             });
     156              : 
     157           41 :         Ok(Self {
     158           41 :             client,
     159           41 :             bucket_name: remote_storage_config.bucket_name.clone(),
     160           41 :             max_keys_per_list_response: remote_storage_config.max_keys_per_list_response,
     161           41 :             prefix_in_bucket,
     162           41 :             concurrency_limiter: ConcurrencyLimiter::new(
     163           41 :                 remote_storage_config.concurrency_limit.get(),
     164           41 :             ),
     165           41 :             upload_storage_class: remote_storage_config.upload_storage_class.clone(),
     166           41 :             timeout,
     167           41 :         })
     168           41 :     }
     169              : 
     170          555 :     fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
     171          555 :         let relative_path =
     172          555 :             match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
     173          555 :                 Some(stripped) => stripped,
     174              :                 // we rely on AWS to return properly prefixed paths
     175              :                 // for requests with a certain prefix
     176            0 :                 None => panic!(
     177            0 :                     "Key {} does not start with bucket prefix {:?}",
     178              :                     key, self.prefix_in_bucket
     179              :                 ),
     180              :             };
     181          555 :         RemotePath(
     182          555 :             relative_path
     183          555 :                 .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     184          555 :                 .collect(),
     185          555 :         )
     186          555 :     }
     187              : 
     188          572 :     pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
     189          572 :         assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
     190          572 :         let path_string = path.get_path().as_str();
     191          572 :         match &self.prefix_in_bucket {
     192          563 :             Some(prefix) => prefix.clone() + "/" + path_string,
     193            9 :             None => path_string.to_string(),
     194              :         }
     195          572 :     }
     196              : 
     197          470 :     async fn permit(
     198          470 :         &self,
     199          470 :         kind: RequestKind,
     200          470 :         cancel: &CancellationToken,
     201          470 :     ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
     202          470 :         let started_at = start_counting_cancelled_wait(kind);
     203          470 :         let acquire = self.concurrency_limiter.acquire(kind);
     204              : 
     205          470 :         let permit = tokio::select! {
     206          470 :             permit = acquire => permit.expect("semaphore is never closed"),
     207          470 :             _ = cancel.cancelled() => return Err(Cancelled),
     208              :         };
     209              : 
     210          470 :         let started_at = ScopeGuard::into_inner(started_at);
     211          470 :         crate::metrics::BUCKET_METRICS
     212          470 :             .wait_seconds
     213          470 :             .observe_elapsed(kind, started_at);
     214              : 
     215          470 :         Ok(permit)
     216          470 :     }
     217              : 
     218           33 :     async fn owned_permit(
     219           33 :         &self,
     220           33 :         kind: RequestKind,
     221           33 :         cancel: &CancellationToken,
     222           33 :     ) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
     223           33 :         let started_at = start_counting_cancelled_wait(kind);
     224           33 :         let acquire = self.concurrency_limiter.acquire_owned(kind);
     225              : 
     226           33 :         let permit = tokio::select! {
     227           33 :             permit = acquire => permit.expect("semaphore is never closed"),
     228           33 :             _ = cancel.cancelled() => return Err(Cancelled),
     229              :         };
     230              : 
     231           33 :         let started_at = ScopeGuard::into_inner(started_at);
     232           33 :         crate::metrics::BUCKET_METRICS
     233           33 :             .wait_seconds
     234           33 :             .observe_elapsed(kind, started_at);
     235           33 :         Ok(permit)
     236           33 :     }
     237              : 
     238           33 :     async fn download_object(
     239           33 :         &self,
     240           33 :         request: GetObjectRequest,
     241           33 :         cancel: &CancellationToken,
     242           33 :     ) -> Result<Download, DownloadError> {
     243           33 :         let kind = RequestKind::Get;
     244              : 
     245           33 :         let permit = self.owned_permit(kind, cancel).await?;
     246              : 
     247           33 :         let started_at = start_measuring_requests(kind);
     248              : 
     249           33 :         let mut builder = self
     250           33 :             .client
     251           33 :             .get_object()
     252           33 :             .bucket(request.bucket)
     253           33 :             .key(request.key)
     254           33 :             .set_version_id(request.version_id)
     255           33 :             .set_range(request.range);
     256              : 
     257           33 :         if let Some(etag) = request.etag {
     258            6 :             builder = builder.if_none_match(etag);
     259           27 :         }
     260              : 
     261           33 :         let get_object = builder.send();
     262              : 
     263           33 :         let get_object = tokio::select! {
     264           33 :             res = get_object => res,
     265           33 :             _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
     266           33 :             _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
     267              :         };
     268              : 
     269           33 :         let started_at = ScopeGuard::into_inner(started_at);
     270              : 
     271           28 :         let object_output = match get_object {
     272           28 :             Ok(object_output) => object_output,
     273            4 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
     274              :                 // Count this in the AttemptOutcome::Ok bucket, because 404 is not
     275              :                 // an error: we expect to sometimes fetch an object and find it missing,
     276              :                 // e.g. when probing for timeline indices.
     277            0 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     278            0 :                     kind,
     279            0 :                     AttemptOutcome::Ok,
     280            0 :                     started_at,
     281              :                 );
     282            0 :                 return Err(DownloadError::NotFound);
     283              :             }
     284            4 :             Err(SdkError::ServiceError(e))
     285              :                 // aws_smithy_runtime_api::http::response::StatusCode isn't
     286              :                 // re-exported by any aws crates, so just check the numeric
     287              :                 // status against http_types::StatusCode instead of pulling it.
     288            4 :                 if e.raw().status().as_u16() == StatusCode::NotModified =>
     289              :             {
     290              :                 // Count an unmodified file as a success.
     291            4 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     292            4 :                     kind,
     293            4 :                     AttemptOutcome::Ok,
     294            4 :                     started_at,
     295              :                 );
     296            4 :                 return Err(DownloadError::Unmodified);
     297              :             }
     298            1 :             Err(e) => {
     299            1 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     300            1 :                     kind,
     301            1 :                     AttemptOutcome::Err,
     302            1 :                     started_at,
     303              :                 );
     304              : 
     305            1 :                 return Err(DownloadError::Other(
     306            1 :                     anyhow::Error::new(e).context("download s3 object"),
     307            1 :                 ));
     308              :             }
     309              :         };
     310              : 
     311              :         // even if we would have no timeout left, continue anyways. the caller can decide to ignore
     312              :         // the errors considering timeouts and cancellation.
     313           28 :         let remaining = self.timeout.saturating_sub(started_at.elapsed());
     314              : 
     315           28 :         let metadata = object_output.metadata().cloned().map(StorageMetadata);
     316           28 :         let etag = object_output
     317           28 :             .e_tag
     318           28 :             .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?
     319           28 :             .into();
     320           28 :         let last_modified = object_output
     321           28 :             .last_modified
     322           28 :             .ok_or(DownloadError::Other(anyhow::anyhow!(
     323           28 :                 "Missing LastModified header"
     324           28 :             )))?
     325           28 :             .try_into()
     326           28 :             .map_err(|e: ConversionError| DownloadError::Other(e.into()))?;
     327              : 
     328           28 :         let body = object_output.body;
     329           28 :         let body = ByteStreamAsStream::from(body);
     330           28 :         let body = PermitCarrying::new(permit, body);
     331           28 :         let body = TimedDownload::new(started_at, body);
     332              : 
     333           28 :         let cancel_or_timeout = crate::support::cancel_or_timeout(remaining, cancel.clone());
     334           28 :         let body = crate::support::DownloadStream::new(cancel_or_timeout, body);
     335              : 
     336           28 :         Ok(Download {
     337           28 :             metadata,
     338           28 :             etag,
     339           28 :             last_modified,
     340           28 :             download_stream: Box::pin(body),
     341           28 :         })
     342           33 :     }
     343              : 
     344          198 :     async fn delete_oids(
     345          198 :         &self,
     346          198 :         _permit: &tokio::sync::SemaphorePermit<'_>,
     347          198 :         delete_objects: &[ObjectIdentifier],
     348          198 :         cancel: &CancellationToken,
     349          198 :     ) -> anyhow::Result<()> {
     350          198 :         let kind = RequestKind::Delete;
     351          198 :         let mut cancel = std::pin::pin!(cancel.cancelled());
     352              : 
     353          198 :         for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE_S3) {
     354          198 :             let started_at = start_measuring_requests(kind);
     355              : 
     356          198 :             let req = self
     357          198 :                 .client
     358          198 :                 .delete_objects()
     359          198 :                 .bucket(self.bucket_name.clone())
     360          198 :                 .delete(
     361          198 :                     Delete::builder()
     362          198 :                         .set_objects(Some(chunk.to_vec()))
     363          198 :                         .build()
     364          198 :                         .context("build request")?,
     365              :                 )
     366          198 :                 .send();
     367              : 
     368          198 :             let resp = tokio::select! {
     369          198 :                 resp = req => resp,
     370          198 :                 _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
     371          198 :                 _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
     372              :             };
     373              : 
     374          198 :             let started_at = ScopeGuard::into_inner(started_at);
     375          198 :             crate::metrics::BUCKET_METRICS
     376          198 :                 .req_seconds
     377          198 :                 .observe_elapsed(kind, &resp, started_at);
     378              : 
     379          198 :             let resp = resp.context("request deletion")?;
     380          198 :             crate::metrics::BUCKET_METRICS
     381          198 :                 .deleted_objects_total
     382          198 :                 .inc_by(chunk.len() as u64);
     383              : 
     384          198 :             if let Some(errors) = resp.errors {
     385              :                 // Log a bounded number of the errors within the response:
     386              :                 // these requests can carry 1000 keys so logging each one
     387              :                 // would be too verbose, especially as errors may lead us
     388              :                 // to retry repeatedly.
     389              :                 const LOG_UP_TO_N_ERRORS: usize = 10;
     390            0 :                 for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
     391            0 :                     tracing::warn!(
     392            0 :                         "DeleteObjects key {} failed: {}: {}",
     393            0 :                         e.key.as_ref().map(Cow::from).unwrap_or("".into()),
     394            0 :                         e.code.as_ref().map(Cow::from).unwrap_or("".into()),
     395            0 :                         e.message.as_ref().map(Cow::from).unwrap_or("".into())
     396              :                     );
     397              :                 }
     398              : 
     399            0 :                 return Err(anyhow::anyhow!(
     400            0 :                     "Failed to delete {}/{} objects",
     401            0 :                     errors.len(),
     402            0 :                     chunk.len(),
     403            0 :                 ));
     404          198 :             }
     405              :         }
     406          198 :         Ok(())
     407          198 :     }
     408              : 
     409            6 :     async fn list_versions_with_permit(
     410            6 :         &self,
     411            6 :         _permit: &tokio::sync::SemaphorePermit<'_>,
     412            6 :         prefix: Option<&RemotePath>,
     413            6 :         mode: ListingMode,
     414            6 :         max_keys: Option<NonZeroU32>,
     415            6 :         cancel: &CancellationToken,
     416            6 :     ) -> Result<crate::VersionListing, DownloadError> {
     417              :         // get the passed prefix or if it is not set use prefix_in_bucket value
     418            6 :         let prefix = prefix
     419            6 :             .map(|p| self.relative_path_to_s3_object(p))
     420            6 :             .or_else(|| self.prefix_in_bucket.clone());
     421              : 
     422            6 :         let warn_threshold = 3;
     423            6 :         let max_retries = 10;
     424            6 :         let is_permanent = |e: &_| matches!(e, DownloadError::Cancelled);
     425              : 
     426            6 :         let mut key_marker = None;
     427            6 :         let mut version_id_marker = None;
     428            6 :         let mut versions_and_deletes = Vec::new();
     429              : 
     430              :         loop {
     431            6 :             let response = backoff::retry(
     432            7 :                 || async {
     433            7 :                     let mut request = self
     434            7 :                         .client
     435            7 :                         .list_object_versions()
     436            7 :                         .bucket(self.bucket_name.clone())
     437            7 :                         .set_prefix(prefix.clone())
     438            7 :                         .set_key_marker(key_marker.clone())
     439            7 :                         .set_version_id_marker(version_id_marker.clone());
     440              : 
     441            7 :                     if let ListingMode::WithDelimiter = mode {
     442            0 :                         request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
     443            7 :                     }
     444              : 
     445            7 :                     let op = request.send();
     446              : 
     447            7 :                     tokio::select! {
     448            7 :                         res = op => res.map_err(|e| DownloadError::Other(e.into())),
     449            7 :                         _ = cancel.cancelled() => Err(DownloadError::Cancelled),
     450              :                     }
     451           14 :                 },
     452            6 :                 is_permanent,
     453            6 :                 warn_threshold,
     454            6 :                 max_retries,
     455            6 :                 "listing object versions",
     456            6 :                 cancel,
     457              :             )
     458            6 :             .await
     459            6 :             .ok_or_else(|| DownloadError::Cancelled)
     460            6 :             .and_then(|x| x)?;
     461              : 
     462            6 :             tracing::trace!(
     463            0 :                 "  Got List response version_id_marker={:?}, key_marker={:?}",
     464              :                 response.version_id_marker,
     465              :                 response.key_marker
     466              :             );
     467            6 :             let versions = response
     468            6 :                 .versions
     469            6 :                 .unwrap_or_default()
     470            6 :                 .into_iter()
     471           28 :                 .map(|version| {
     472           28 :                     let key = version.key.expect("response does not contain a key");
     473           28 :                     let key = self.s3_object_to_relative_path(&key);
     474           28 :                     let version_id = VersionId(version.version_id.expect("needing version id"));
     475           28 :                     let last_modified =
     476           28 :                         SystemTime::try_from(version.last_modified.expect("no last_modified"))?;
     477           28 :                     Ok(Version {
     478           28 :                         key,
     479           28 :                         last_modified,
     480           28 :                         kind: crate::VersionKind::Version(version_id),
     481           28 :                     })
     482           28 :                 });
     483            6 :             let deletes = response
     484            6 :                 .delete_markers
     485            6 :                 .unwrap_or_default()
     486            6 :                 .into_iter()
     487            8 :                 .map(|version| {
     488            8 :                     let key = version.key.expect("response does not contain a key");
     489            8 :                     let key = self.s3_object_to_relative_path(&key);
     490            8 :                     let last_modified =
     491            8 :                         SystemTime::try_from(version.last_modified.expect("no last_modified"))?;
     492            8 :                     Ok(Version {
     493            8 :                         key,
     494            8 :                         last_modified,
     495            8 :                         kind: crate::VersionKind::DeletionMarker,
     496            8 :                     })
     497            8 :                 });
     498            6 :             itertools::process_results(versions.chain(deletes), |n_vds| {
     499            6 :                 versions_and_deletes.extend(n_vds)
     500            6 :             })
     501            6 :             .map_err(DownloadError::Other)?;
     502           12 :             fn none_if_empty(v: Option<String>) -> Option<String> {
     503           12 :                 v.filter(|v| !v.is_empty())
     504           12 :             }
     505            6 :             version_id_marker = none_if_empty(response.next_version_id_marker);
     506            6 :             key_marker = none_if_empty(response.next_key_marker);
     507            6 :             if version_id_marker.is_none() {
     508              :                 // The final response is not supposed to be truncated
     509            6 :                 if response.is_truncated.unwrap_or_default() {
     510            0 :                     return Err(DownloadError::Other(anyhow::anyhow!(
     511            0 :                         "Received truncated ListObjectVersions response for prefix={prefix:?}"
     512            0 :                     )));
     513            6 :                 }
     514            6 :                 break;
     515            0 :             }
     516            0 :             if let Some(max_keys) = max_keys {
     517            0 :                 if versions_and_deletes.len() >= max_keys.get().try_into().unwrap() {
     518            0 :                     return Err(DownloadError::Other(anyhow::anyhow!("too many versions")));
     519            0 :                 }
     520            0 :             }
     521              :         }
     522            6 :         Ok(VersionListing {
     523            6 :             versions: versions_and_deletes,
     524            6 :         })
     525            6 :     }
     526              : 
     527            6 :     pub fn bucket_name(&self) -> &str {
     528            6 :         &self.bucket_name
     529            6 :     }
     530              : }
     531              : 
     532              : pin_project_lite::pin_project! {
     533              :     struct ByteStreamAsStream {
     534              :         #[pin]
     535              :         inner: aws_smithy_types::byte_stream::ByteStream
     536              :     }
     537              : }
     538              : 
     539              : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
     540           28 :     fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
     541           28 :         ByteStreamAsStream { inner }
     542           28 :     }
     543              : }
     544              : 
     545              : impl Stream for ByteStreamAsStream {
     546              :     type Item = std::io::Result<Bytes>;
     547              : 
     548           42 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     549              :         // this does the std::io::ErrorKind::Other conversion
     550           42 :         self.project().inner.poll_next(cx).map_err(|x| x.into())
     551           42 :     }
     552              : 
     553              :     // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
     554              :     // sense and Stream::size_hint does not really
     555              : }
     556              : 
     557              : pin_project_lite::pin_project! {
     558              :     /// Times and tracks the outcome of the request.
     559              :     struct TimedDownload<S> {
     560              :         started_at: std::time::Instant,
     561              :         outcome: AttemptOutcome,
     562              :         #[pin]
     563              :         inner: S
     564              :     }
     565              : 
     566              :     impl<S> PinnedDrop for TimedDownload<S> {
     567              :         fn drop(mut this: Pin<&mut Self>) {
     568              :             crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
     569              :         }
     570              :     }
     571              : }
     572              : 
     573              : impl<S> TimedDownload<S> {
     574           28 :     fn new(started_at: std::time::Instant, inner: S) -> Self {
     575           28 :         TimedDownload {
     576           28 :             started_at,
     577           28 :             outcome: AttemptOutcome::Cancelled,
     578           28 :             inner,
     579           28 :         }
     580           28 :     }
     581              : }
     582              : 
     583              : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
     584              :     type Item = <S as Stream>::Item;
     585              : 
     586           42 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     587              :         use std::task::ready;
     588              : 
     589           42 :         let this = self.project();
     590              : 
     591           42 :         let res = ready!(this.inner.poll_next(cx));
     592           22 :         match &res {
     593           22 :             Some(Ok(_)) => {}
     594            0 :             Some(Err(_)) => *this.outcome = AttemptOutcome::Err,
     595           18 :             None => *this.outcome = AttemptOutcome::Ok,
     596              :         }
     597              : 
     598           40 :         Poll::Ready(res)
     599           42 :     }
     600              : 
     601            0 :     fn size_hint(&self) -> (usize, Option<usize>) {
     602            0 :         self.inner.size_hint()
     603            0 :     }
     604              : }
     605              : 
     606              : impl RemoteStorage for S3Bucket {
     607           64 :     fn list_streaming(
     608           64 :         &self,
     609           64 :         prefix: Option<&RemotePath>,
     610           64 :         mode: ListingMode,
     611           64 :         max_keys: Option<NonZeroU32>,
     612           64 :         cancel: &CancellationToken,
     613           64 :     ) -> impl Stream<Item = Result<Listing, DownloadError>> {
     614           64 :         let kind = RequestKind::List;
     615              :         // s3 sdk wants i32
     616           64 :         let mut max_keys = max_keys.map(|mk| mk.get() as i32);
     617              : 
     618              :         // get the passed prefix or if it is not set use prefix_in_bucket value
     619           64 :         let list_prefix = prefix
     620           64 :             .map(|p| self.relative_path_to_s3_object(p))
     621           64 :             .or_else(|| {
     622           32 :                 self.prefix_in_bucket.clone().map(|mut s| {
     623           32 :                     s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
     624           32 :                     s
     625           32 :                 })
     626           32 :             });
     627              : 
     628           64 :         async_stream::stream! {
     629              :             let _permit = self.permit(kind, cancel).await?;
     630              : 
     631              :             let mut continuation_token = None;
     632              :             'outer: loop {
     633              :                 let started_at = start_measuring_requests(kind);
     634              : 
     635              :                 // min of two Options, returning Some if one is value and another is
     636              :                 // None (None is smaller than anything, so plain min doesn't work).
     637              :                 let request_max_keys = self
     638              :                     .max_keys_per_list_response
     639              :                     .into_iter()
     640              :                     .chain(max_keys.into_iter())
     641              :                     .min();
     642              :                 let mut request = self
     643              :                     .client
     644              :                     .list_objects_v2()
     645              :                     .bucket(self.bucket_name.clone())
     646              :                     .set_prefix(list_prefix.clone())
     647              :                     .set_continuation_token(continuation_token.clone())
     648              :                     .set_max_keys(request_max_keys);
     649              : 
     650           64 :                 if let ListingMode::WithDelimiter = mode {
     651              :                     request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
     652              :                 }
     653              : 
     654              :                 let request = request.send();
     655              : 
     656              :                 let response = tokio::select! {
     657              :                     res = request => Ok(res),
     658              :                     _ = tokio::time::sleep(self.timeout) => Err(DownloadError::Timeout),
     659              :                     _ = cancel.cancelled() => Err(DownloadError::Cancelled),
     660              :                 };
     661              : 
     662              :                 if let Err(DownloadError::Timeout) = &response {
     663              :                     yield Err(DownloadError::Timeout);
     664              :                     continue 'outer;
     665              :                 }
     666              : 
     667              :                 let response = response?; // always yield cancellation errors and stop the stream
     668              : 
     669              :                 let response = response
     670              :                     .context("Failed to list S3 prefixes")
     671              :                     .map_err(DownloadError::Other);
     672              : 
     673              :                 let started_at = ScopeGuard::into_inner(started_at);
     674              : 
     675              :                 crate::metrics::BUCKET_METRICS
     676              :                     .req_seconds
     677              :                     .observe_elapsed(kind, &response, started_at);
     678              : 
     679              :                 let response = match response {
     680              :                     Ok(response) => response,
     681              :                     Err(e) => {
     682              :                         // The error is potentially retryable, so we must rewind the loop after yielding.
     683              :                         yield Err(e);
     684              :                         continue 'outer;
     685              :                     },
     686              :                 };
     687              : 
     688              :                 let keys = response.contents();
     689              :                 let prefixes = response.common_prefixes.as_deref().unwrap_or_default();
     690              : 
     691              :                 tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
     692              :                 let mut result = Listing::default();
     693              : 
     694              :                 for object in keys {
     695              :                     let key = object.key().expect("response does not contain a key");
     696              :                     let key = self.s3_object_to_relative_path(key);
     697              : 
     698              :                     let last_modified = match object.last_modified.map(SystemTime::try_from) {
     699              :                         Some(Ok(t)) => t,
     700              :                         Some(Err(_)) => {
     701              :                             tracing::warn!("Remote storage last_modified {:?} for {} is out of bounds",
     702              :                                 object.last_modified, key
     703              :                         );
     704              :                             SystemTime::now()
     705              :                         },
     706              :                         None => {
     707              :                             SystemTime::now()
     708              :                         }
     709              :                     };
     710              : 
     711              :                     let size = object.size.unwrap_or(0) as u64;
     712              : 
     713              :                     result.keys.push(ListingObject{
     714              :                         key,
     715              :                         last_modified,
     716              :                         size,
     717              :                     });
     718           64 :                     if let Some(mut mk) = max_keys {
     719              :                         assert!(mk > 0);
     720              :                         mk -= 1;
     721              :                         if mk == 0 {
     722              :                             // limit reached
     723              :                             yield Ok(result);
     724              :                             break 'outer;
     725              :                         }
     726              :                         max_keys = Some(mk);
     727              :                     }
     728              :                 }
     729              : 
     730              :                 // S3 gives us prefixes like "foo/", we return them like "foo"
     731          104 :                 result.prefixes.extend(prefixes.iter().filter_map(|o| {
     732              :                     Some(
     733          104 :                         self.s3_object_to_relative_path(
     734          104 :                             o.prefix()?
     735          104 :                                 .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR),
     736              :                         ),
     737              :                     )
     738          104 :                 }));
     739              : 
     740              :                 yield Ok(result);
     741              : 
     742              :                 continuation_token = match response.next_continuation_token {
     743              :                     Some(new_token) => Some(new_token),
     744              :                     None => break,
     745              :                 };
     746              :             }
     747              :         }
     748           64 :     }
     749              : 
     750            0 :     async fn list_versions(
     751            0 :         &self,
     752            0 :         prefix: Option<&RemotePath>,
     753            0 :         mode: ListingMode,
     754            0 :         max_keys: Option<NonZeroU32>,
     755            0 :         cancel: &CancellationToken,
     756            0 :     ) -> Result<crate::VersionListing, DownloadError> {
     757            0 :         let kind = RequestKind::ListVersions;
     758            0 :         let permit = self.permit(kind, cancel).await?;
     759            0 :         self.list_versions_with_permit(&permit, prefix, mode, max_keys, cancel)
     760            0 :             .await
     761            0 :     }
     762              : 
     763            6 :     async fn head_object(
     764            6 :         &self,
     765            6 :         key: &RemotePath,
     766            6 :         cancel: &CancellationToken,
     767            6 :     ) -> Result<ListingObject, DownloadError> {
     768            6 :         let kind = RequestKind::Head;
     769            6 :         let _permit = self.permit(kind, cancel).await?;
     770              : 
     771            6 :         let started_at = start_measuring_requests(kind);
     772              : 
     773            6 :         let head_future = self
     774            6 :             .client
     775            6 :             .head_object()
     776            6 :             .bucket(self.bucket_name())
     777            6 :             .key(self.relative_path_to_s3_object(key))
     778            6 :             .send();
     779              : 
     780            6 :         let head_future = tokio::time::timeout(self.timeout, head_future);
     781              : 
     782            6 :         let res = tokio::select! {
     783            6 :             res = head_future => res,
     784            6 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     785              :         };
     786              : 
     787            6 :         let res = res.map_err(|_e| DownloadError::Timeout)?;
     788              : 
     789              :         // do not incl. timeouts as errors in metrics but cancellations
     790            6 :         let started_at = ScopeGuard::into_inner(started_at);
     791            6 :         crate::metrics::BUCKET_METRICS
     792            6 :             .req_seconds
     793            6 :             .observe_elapsed(kind, &res, started_at);
     794              : 
     795            4 :         let data = match res {
     796            4 :             Ok(object_output) => object_output,
     797            2 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), HeadObjectError::NotFound(_)) => {
     798              :                 // Count this in the AttemptOutcome::Ok bucket, because 404 is not
     799              :                 // an error: we expect to sometimes fetch an object and find it missing,
     800              :                 // e.g. when probing for timeline indices.
     801            2 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     802            2 :                     kind,
     803            2 :                     AttemptOutcome::Ok,
     804            2 :                     started_at,
     805              :                 );
     806            2 :                 return Err(DownloadError::NotFound);
     807              :             }
     808            0 :             Err(e) => {
     809            0 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     810            0 :                     kind,
     811            0 :                     AttemptOutcome::Err,
     812            0 :                     started_at,
     813              :                 );
     814              : 
     815            0 :                 return Err(DownloadError::Other(
     816            0 :                     anyhow::Error::new(e).context("s3 head object"),
     817            0 :                 ));
     818              :             }
     819              :         };
     820              : 
     821            4 :         let (Some(last_modified), Some(size)) = (data.last_modified, data.content_length) else {
     822            0 :             return Err(DownloadError::Other(anyhow!(
     823            0 :                 "head_object doesn't contain last_modified or content_length"
     824            0 :             )))?;
     825              :         };
     826              :         Ok(ListingObject {
     827            4 :             key: key.to_owned(),
     828            4 :             last_modified: SystemTime::try_from(last_modified).map_err(|e| {
     829            0 :                 DownloadError::Other(anyhow!("can't convert time '{last_modified}': {e}"))
     830            0 :             })?,
     831            4 :             size: size as u64,
     832              :         })
     833            6 :     }
     834              : 
     835          198 :     async fn upload(
     836          198 :         &self,
     837          198 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     838          198 :         from_size_bytes: usize,
     839          198 :         to: &RemotePath,
     840          198 :         metadata: Option<StorageMetadata>,
     841          198 :         cancel: &CancellationToken,
     842          198 :     ) -> anyhow::Result<()> {
     843          198 :         let kind = RequestKind::Put;
     844          198 :         let _permit = self.permit(kind, cancel).await?;
     845              : 
     846          198 :         let started_at = start_measuring_requests(kind);
     847              : 
     848          710 :         let body = StreamBody::new(from.map(|x| x.map(Frame::data)));
     849          198 :         let bytes_stream = ByteStream::new(SdkBody::from_body_1_x(body));
     850              : 
     851          198 :         let upload = self
     852          198 :             .client
     853          198 :             .put_object()
     854          198 :             .bucket(self.bucket_name.clone())
     855          198 :             .key(self.relative_path_to_s3_object(to))
     856          198 :             .set_metadata(metadata.map(|m| m.0))
     857          198 :             .set_storage_class(self.upload_storage_class.clone())
     858          198 :             .content_length(from_size_bytes.try_into()?)
     859          198 :             .body(bytes_stream)
     860          198 :             .send();
     861              : 
     862          198 :         let upload = tokio::time::timeout(self.timeout, upload);
     863              : 
     864          198 :         let res = tokio::select! {
     865          198 :             res = upload => res,
     866          198 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     867              :         };
     868              : 
     869          198 :         if let Ok(inner) = &res {
     870          198 :             // do not incl. timeouts as errors in metrics but cancellations
     871          198 :             let started_at = ScopeGuard::into_inner(started_at);
     872          198 :             crate::metrics::BUCKET_METRICS
     873          198 :                 .req_seconds
     874          198 :                 .observe_elapsed(kind, inner, started_at);
     875          198 :         }
     876              : 
     877          198 :         match res {
     878          198 :             Ok(Ok(_put)) => Ok(()),
     879            0 :             Ok(Err(sdk)) => Err(sdk.into()),
     880            0 :             Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
     881              :         }
     882            0 :     }
     883              : 
     884            2 :     async fn copy(
     885            2 :         &self,
     886            2 :         from: &RemotePath,
     887            2 :         to: &RemotePath,
     888            2 :         cancel: &CancellationToken,
     889            2 :     ) -> anyhow::Result<()> {
     890            2 :         let kind = RequestKind::Copy;
     891            2 :         let _permit = self.permit(kind, cancel).await?;
     892              : 
     893            2 :         let timeout = tokio::time::sleep(self.timeout);
     894              : 
     895            2 :         let started_at = start_measuring_requests(kind);
     896              : 
     897              :         // we need to specify bucket_name as a prefix
     898            2 :         let copy_source = format!(
     899            2 :             "{}/{}",
     900              :             self.bucket_name,
     901            2 :             self.relative_path_to_s3_object(from)
     902              :         );
     903              : 
     904            2 :         let op = self
     905            2 :             .client
     906            2 :             .copy_object()
     907            2 :             .bucket(self.bucket_name.clone())
     908            2 :             .key(self.relative_path_to_s3_object(to))
     909            2 :             .set_storage_class(self.upload_storage_class.clone())
     910            2 :             .copy_source(copy_source)
     911            2 :             .send();
     912              : 
     913            2 :         let res = tokio::select! {
     914            2 :             res = op => res,
     915            2 :             _ = timeout => return Err(TimeoutOrCancel::Timeout.into()),
     916            2 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     917              :         };
     918              : 
     919            2 :         let started_at = ScopeGuard::into_inner(started_at);
     920            2 :         crate::metrics::BUCKET_METRICS
     921            2 :             .req_seconds
     922            2 :             .observe_elapsed(kind, &res, started_at);
     923              : 
     924            2 :         res?;
     925              : 
     926            2 :         Ok(())
     927            2 :     }
     928              : 
     929           33 :     async fn download(
     930           33 :         &self,
     931           33 :         from: &RemotePath,
     932           33 :         opts: &DownloadOpts,
     933           33 :         cancel: &CancellationToken,
     934           33 :     ) -> Result<Download, DownloadError> {
     935              :         // if prefix is not none then download file `prefix/from`
     936              :         // if prefix is none then download file `from`
     937           33 :         self.download_object(
     938              :             GetObjectRequest {
     939           33 :                 bucket: self.bucket_name.clone(),
     940           33 :                 key: self.relative_path_to_s3_object(from),
     941           33 :                 etag: opts.etag.as_ref().map(|e| e.to_string()),
     942           33 :                 range: opts.byte_range_header(),
     943           33 :                 version_id: opts.version_id.as_ref().map(|v| v.0.to_owned()),
     944              :             },
     945           33 :             cancel,
     946              :         )
     947           33 :         .await
     948           33 :     }
     949              : 
     950          194 :     async fn delete_objects(
     951          194 :         &self,
     952          194 :         paths: &[RemotePath],
     953          194 :         cancel: &CancellationToken,
     954          194 :     ) -> anyhow::Result<()> {
     955          194 :         let kind = RequestKind::Delete;
     956          194 :         let permit = self.permit(kind, cancel).await?;
     957          194 :         let mut delete_objects = Vec::with_capacity(paths.len());
     958          430 :         for path in paths {
     959          236 :             let obj_id = ObjectIdentifier::builder()
     960          236 :                 .set_key(Some(self.relative_path_to_s3_object(path)))
     961          236 :                 .build()
     962          236 :                 .context("convert path to oid")?;
     963          236 :             delete_objects.push(obj_id);
     964              :         }
     965              : 
     966          194 :         self.delete_oids(&permit, &delete_objects, cancel).await
     967          194 :     }
     968              : 
     969            0 :     fn max_keys_per_delete(&self) -> usize {
     970            0 :         MAX_KEYS_PER_DELETE_S3
     971            0 :     }
     972              : 
     973          174 :     async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
     974          174 :         let paths = std::array::from_ref(path);
     975          174 :         self.delete_objects(paths, cancel).await
     976          174 :     }
     977              : 
     978            6 :     async fn time_travel_recover(
     979            6 :         &self,
     980            6 :         prefix: Option<&RemotePath>,
     981            6 :         timestamp: SystemTime,
     982            6 :         done_if_after: SystemTime,
     983            6 :         cancel: &CancellationToken,
     984            6 :         complexity_limit: Option<NonZeroU32>,
     985            6 :     ) -> Result<(), TimeTravelError> {
     986            6 :         let kind = RequestKind::TimeTravel;
     987            6 :         let permit = self.permit(kind, cancel).await?;
     988              : 
     989            6 :         tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
     990              : 
     991            6 :         let mode = ListingMode::NoDelimiter;
     992            6 :         let version_listing = self
     993            6 :             .list_versions_with_permit(&permit, prefix, mode, complexity_limit, cancel)
     994            6 :             .await
     995            6 :             .map_err(|err| match err {
     996            0 :                 DownloadError::Other(e) => TimeTravelError::Other(e),
     997            0 :                 DownloadError::Cancelled => TimeTravelError::Cancelled,
     998            0 :                 other => TimeTravelError::Other(other.into()),
     999            0 :             })?;
    1000            6 :         let versions_and_deletes = version_listing.versions;
    1001              : 
    1002            6 :         tracing::info!(
    1003            0 :             "Built list for time travel with {} versions and deletions",
    1004            0 :             versions_and_deletes.len()
    1005              :         );
    1006              : 
    1007              :         // Work on the list of references instead of the objects directly,
    1008              :         // otherwise we get lifetime errors in the sort_by_key call below.
    1009            6 :         let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
    1010              : 
    1011          124 :         versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
    1012              : 
    1013            6 :         let mut vds_for_key = HashMap::<_, Vec<_>>::new();
    1014              : 
    1015           42 :         for vd in &versions_and_deletes {
    1016           36 :             let Version { key, .. } = &vd;
    1017           36 :             let version_id = vd.version_id().map(|v| v.0.as_str());
    1018           36 :             if version_id == Some("null") {
    1019              :                 // TODO: check the behavior of using the SDK on a non-versioned container
    1020            0 :                 return Err(TimeTravelError::Other(anyhow!(
    1021            0 :                     "Received ListVersions response for key={key} with version_id='null', \
    1022            0 :                     indicating either disabled versioning, or legacy objects with null version id values"
    1023            0 :                 )));
    1024           36 :             }
    1025           36 :             tracing::trace!("Parsing version key={key} kind={:?}", vd.kind);
    1026              : 
    1027           36 :             vds_for_key.entry(key).or_default().push(vd);
    1028              :         }
    1029              : 
    1030            6 :         let warn_threshold = 3;
    1031            6 :         let max_retries = 10;
    1032            6 :         let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
    1033              : 
    1034           24 :         for (key, versions) in vds_for_key {
    1035           18 :             let last_vd = versions.last().unwrap();
    1036           18 :             let key = self.relative_path_to_s3_object(key);
    1037           18 :             if last_vd.last_modified > done_if_after {
    1038            0 :                 tracing::trace!("Key {key} has version later than done_if_after, skipping");
    1039            0 :                 continue;
    1040           18 :             }
    1041              :             // the version we want to restore to.
    1042           18 :             let version_to_restore_to =
    1043           18 :                 match versions.binary_search_by_key(&timestamp, |tpl| tpl.last_modified) {
    1044            0 :                     Ok(v) => v,
    1045           18 :                     Err(e) => e,
    1046              :                 };
    1047           18 :             if version_to_restore_to == versions.len() {
    1048            6 :                 tracing::trace!("Key {key} has no changes since timestamp, skipping");
    1049            6 :                 continue;
    1050           12 :             }
    1051           12 :             let mut do_delete = false;
    1052           12 :             if version_to_restore_to == 0 {
    1053              :                 // All versions more recent, so the key didn't exist at the specified time point.
    1054            6 :                 tracing::trace!(
    1055            0 :                     "All {} versions more recent for {key}, deleting",
    1056            0 :                     versions.len()
    1057              :                 );
    1058            6 :                 do_delete = true;
    1059              :             } else {
    1060            6 :                 match &versions[version_to_restore_to - 1] {
    1061              :                     Version {
    1062            6 :                         kind: VersionKind::Version(version_id),
    1063              :                         ..
    1064              :                     } => {
    1065            6 :                         let version_id = &version_id.0;
    1066            6 :                         tracing::trace!("Copying old version {version_id} for {key}...");
    1067              :                         // Restore the state to the last version by copying
    1068            6 :                         let source_id =
    1069            6 :                             format!("{}/{key}?versionId={version_id}", self.bucket_name);
    1070              : 
    1071            6 :                         backoff::retry(
    1072            6 :                             || async {
    1073            6 :                                 let op = self
    1074            6 :                                     .client
    1075            6 :                                     .copy_object()
    1076            6 :                                     .bucket(self.bucket_name.clone())
    1077            6 :                                     .key(&key)
    1078            6 :                                     .set_storage_class(self.upload_storage_class.clone())
    1079            6 :                                     .copy_source(&source_id)
    1080            6 :                                     .send();
    1081              : 
    1082            6 :                                 tokio::select! {
    1083            6 :                                     res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
    1084            6 :                                     _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
    1085              :                                 }
    1086           12 :                             },
    1087            6 :                             is_permanent,
    1088            6 :                             warn_threshold,
    1089            6 :                             max_retries,
    1090            6 :                             "copying object version for time_travel_recover",
    1091            6 :                             cancel,
    1092              :                         )
    1093            6 :                         .await
    1094            6 :                         .ok_or_else(|| TimeTravelError::Cancelled)
    1095            6 :                         .and_then(|x| x)?;
    1096            6 :                         tracing::info!(%version_id, %key, "Copied old version in S3");
    1097              :                     }
    1098              :                     Version {
    1099              :                         kind: VersionKind::DeletionMarker,
    1100              :                         ..
    1101            0 :                     } => {
    1102            0 :                         do_delete = true;
    1103            0 :                     }
    1104              :                 }
    1105              :             };
    1106           12 :             if do_delete {
    1107            6 :                 if matches!(last_vd.kind, VersionKind::DeletionMarker) {
    1108              :                     // Key has since been deleted (but there was some history), no need to do anything
    1109            2 :                     tracing::trace!("Key {key} already deleted, skipping.");
    1110              :                 } else {
    1111            4 :                     tracing::trace!("Deleting {key}...");
    1112              : 
    1113            4 :                     let oid = ObjectIdentifier::builder()
    1114            4 :                         .key(key.to_owned())
    1115            4 :                         .build()
    1116            4 :                         .map_err(|e| TimeTravelError::Other(e.into()))?;
    1117              : 
    1118            4 :                     self.delete_oids(&permit, &[oid], cancel)
    1119            4 :                         .await
    1120            4 :                         .map_err(|e| {
    1121              :                             // delete_oid0 will use TimeoutOrCancel
    1122            0 :                             if TimeoutOrCancel::caused_by_cancel(&e) {
    1123            0 :                                 TimeTravelError::Cancelled
    1124              :                             } else {
    1125            0 :                                 TimeTravelError::Other(e)
    1126              :                             }
    1127            0 :                         })?;
    1128              :                 }
    1129            6 :             }
    1130              :         }
    1131            6 :         Ok(())
    1132            6 :     }
    1133              : }
    1134              : 
    1135              : #[cfg(test)]
    1136              : mod tests {
    1137              :     use std::num::NonZeroUsize;
    1138              : 
    1139              :     use camino::Utf8Path;
    1140              : 
    1141              :     use crate::{RemotePath, S3Bucket, S3Config};
    1142              : 
    1143              :     #[tokio::test]
    1144            3 :     async fn relative_path() {
    1145            3 :         let all_paths = ["", "some/path", "some/path/"];
    1146            3 :         let all_paths: Vec<RemotePath> = all_paths
    1147            3 :             .iter()
    1148            9 :             .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
    1149            3 :             .collect();
    1150            3 :         let prefixes = [
    1151            3 :             None,
    1152            3 :             Some(""),
    1153            3 :             Some("test/prefix"),
    1154            3 :             Some("test/prefix/"),
    1155            3 :             Some("/test/prefix/"),
    1156            3 :         ];
    1157            3 :         let expected_outputs = [
    1158            3 :             vec!["", "some/path", "some/path/"],
    1159            3 :             vec!["/", "/some/path", "/some/path/"],
    1160            3 :             vec![
    1161            3 :                 "test/prefix/",
    1162            3 :                 "test/prefix/some/path",
    1163            3 :                 "test/prefix/some/path/",
    1164            3 :             ],
    1165            3 :             vec![
    1166            3 :                 "test/prefix/",
    1167            3 :                 "test/prefix/some/path",
    1168            3 :                 "test/prefix/some/path/",
    1169            3 :             ],
    1170            3 :             vec![
    1171            3 :                 "test/prefix/",
    1172            3 :                 "test/prefix/some/path",
    1173            3 :                 "test/prefix/some/path/",
    1174            3 :             ],
    1175            3 :         ];
    1176              : 
    1177           15 :         for (prefix_idx, prefix) in prefixes.iter().enumerate() {
    1178           15 :             let config = S3Config {
    1179           15 :                 bucket_name: "bucket".to_owned(),
    1180           15 :                 bucket_region: "region".to_owned(),
    1181           15 :                 prefix_in_bucket: prefix.map(str::to_string),
    1182           15 :                 endpoint: None,
    1183           15 :                 concurrency_limit: NonZeroUsize::new(100).unwrap(),
    1184           15 :                 max_keys_per_list_response: Some(5),
    1185           15 :                 upload_storage_class: None,
    1186           15 :             };
    1187           15 :             let storage = S3Bucket::new(&config, std::time::Duration::ZERO)
    1188           15 :                 .await
    1189           15 :                 .expect("remote storage init");
    1190           45 :             for (test_path_idx, test_path) in all_paths.iter().enumerate() {
    1191           45 :                 let result = storage.relative_path_to_s3_object(test_path);
    1192           45 :                 let expected = expected_outputs[prefix_idx][test_path_idx];
    1193           45 :                 assert_eq!(result, expected);
    1194            3 :             }
    1195            3 :         }
    1196            3 :     }
    1197              : }
        

Generated by: LCOV version 2.1-beta