LCOV - code coverage report
Current view: top level - libs/remote_storage/src - s3_bucket.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 88.4 % 890 787
Test Date: 2025-04-24 20:31:15 Functions: 67.3 % 98 66

            Line data    Source code
       1              : //! AWS S3 storage wrapper around `rusoto` library.
       2              : //!
       3              : //! Respects `prefix_in_bucket` property from [`S3Config`],
       4              : //! allowing multiple api users to independently work with the same S3 bucket, if
       5              : //! their bucket prefixes are both specified and different.
       6              : 
       7              : use std::borrow::Cow;
       8              : use std::collections::HashMap;
       9              : use std::num::NonZeroU32;
      10              : use std::pin::Pin;
      11              : use std::sync::Arc;
      12              : use std::task::{Context, Poll};
      13              : use std::time::{Duration, SystemTime};
      14              : 
      15              : use anyhow::{Context as _, anyhow};
      16              : use aws_config::BehaviorVersion;
      17              : use aws_config::default_provider::credentials::DefaultCredentialsChain;
      18              : use aws_config::retry::{RetryConfigBuilder, RetryMode};
      19              : use aws_sdk_s3::Client;
      20              : use aws_sdk_s3::config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep};
      21              : use aws_sdk_s3::error::SdkError;
      22              : use aws_sdk_s3::operation::get_object::GetObjectError;
      23              : use aws_sdk_s3::operation::head_object::HeadObjectError;
      24              : use aws_sdk_s3::types::{Delete, ObjectIdentifier, StorageClass};
      25              : use aws_smithy_async::rt::sleep::TokioSleep;
      26              : use aws_smithy_types::body::SdkBody;
      27              : use aws_smithy_types::byte_stream::ByteStream;
      28              : use aws_smithy_types::date_time::ConversionError;
      29              : use bytes::Bytes;
      30              : use futures::stream::Stream;
      31              : use futures_util::StreamExt;
      32              : use http_body_util::StreamBody;
      33              : use http_types::StatusCode;
      34              : use hyper::body::Frame;
      35              : use scopeguard::ScopeGuard;
      36              : use tokio_util::sync::CancellationToken;
      37              : use utils::backoff;
      38              : 
      39              : use super::StorageMetadata;
      40              : use crate::config::S3Config;
      41              : use crate::error::Cancelled;
      42              : pub(super) use crate::metrics::RequestKind;
      43              : use crate::metrics::{AttemptOutcome, start_counting_cancelled_wait, start_measuring_requests};
      44              : use crate::support::PermitCarrying;
      45              : use crate::{
      46              :     ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
      47              :     MAX_KEYS_PER_DELETE_S3, REMOTE_STORAGE_PREFIX_SEPARATOR, RemotePath, RemoteStorage,
      48              :     TimeTravelError, TimeoutOrCancel, Version, VersionId, VersionKind, VersionListing,
      49              : };
      50              : 
      51              : /// AWS S3 storage.
      52              : pub struct S3Bucket {
      53              :     client: Client,
      54              :     bucket_name: String,
      55              :     prefix_in_bucket: Option<String>,
      56              :     max_keys_per_list_response: Option<i32>,
      57              :     upload_storage_class: Option<StorageClass>,
      58              :     concurrency_limiter: ConcurrencyLimiter,
      59              :     // Per-request timeout. Accessible for tests.
      60              :     pub timeout: Duration,
      61              : }
      62              : 
      63              : struct GetObjectRequest {
      64              :     bucket: String,
      65              :     key: String,
      66              :     etag: Option<String>,
      67              :     range: Option<String>,
      68              :     version_id: Option<String>,
      69              : }
      70              : impl S3Bucket {
      71              :     /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
      72           41 :     pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
      73           41 :         tracing::debug!(
      74            0 :             "Creating s3 remote storage for S3 bucket {}",
      75              :             remote_storage_config.bucket_name
      76              :         );
      77              : 
      78           41 :         let region = Region::new(remote_storage_config.bucket_region.clone());
      79           41 :         let region_opt = Some(region.clone());
      80              : 
      81              :         // https://docs.aws.amazon.com/sdkref/latest/guide/standardized-credentials.html
      82              :         // https://docs.rs/aws-config/latest/aws_config/default_provider/credentials/struct.DefaultCredentialsChain.html
      83              :         // Incomplete list of auth methods used by this:
      84              :         // * "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
      85              :         // * "AWS_PROFILE" / `aws sso login --profile <profile>`
      86              :         // * "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
      87              :         // * http (ECS/EKS) container credentials
      88              :         // * imds v2
      89           41 :         let credentials_provider = DefaultCredentialsChain::builder()
      90           41 :             .region(region)
      91           41 :             .build()
      92           41 :             .await;
      93              : 
      94              :         // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
      95           41 :         let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
      96           41 : 
      97           41 :         let sdk_config_loader: aws_config::ConfigLoader = aws_config::defaults(
      98           41 :             #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
      99           41 :             BehaviorVersion::v2023_11_09(),
     100           41 :         )
     101           41 :         .region(region_opt)
     102           41 :         .identity_cache(IdentityCache::lazy().build())
     103           41 :         .credentials_provider(credentials_provider)
     104           41 :         .sleep_impl(SharedAsyncSleep::from(sleep_impl));
     105           41 : 
     106           41 :         let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
     107           41 :             s.spawn(|| {
     108           41 :                 // TODO: make this function async.
     109           41 :                 tokio::runtime::Builder::new_current_thread()
     110           41 :                     .enable_all()
     111           41 :                     .build()
     112           41 :                     .unwrap()
     113           41 :                     .block_on(sdk_config_loader.load())
     114           41 :             })
     115           41 :             .join()
     116           41 :             .unwrap()
     117           41 :         });
     118           41 : 
     119           41 :         let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
     120              : 
     121              :         // Technically, the `remote_storage_config.endpoint` field only applies to S3 interactions.
     122              :         // (In case we ever re-use the `sdk_config` for more than just the S3 client in the future)
     123           41 :         if let Some(custom_endpoint) = remote_storage_config.endpoint.clone() {
     124            0 :             s3_config_builder = s3_config_builder
     125            0 :                 .endpoint_url(custom_endpoint)
     126            0 :                 .force_path_style(true);
     127           41 :         }
     128              : 
     129              :         // We do our own retries (see [`backoff::retry`]).  However, for the AWS SDK to enable rate limiting in response to throttling
     130              :         // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config.  We set it to use at most one
     131              :         // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
     132           41 :         let mut retry_config = RetryConfigBuilder::new();
     133           41 :         retry_config
     134           41 :             .set_max_attempts(Some(1))
     135           41 :             .set_mode(Some(RetryMode::Adaptive));
     136           41 :         s3_config_builder = s3_config_builder.retry_config(retry_config.build());
     137           41 : 
     138           41 :         let s3_config = s3_config_builder.build();
     139           41 :         let client = aws_sdk_s3::Client::from_conf(s3_config);
     140           41 : 
     141           41 :         let prefix_in_bucket = remote_storage_config
     142           41 :             .prefix_in_bucket
     143           41 :             .as_deref()
     144           41 :             .map(|prefix| {
     145           38 :                 let mut prefix = prefix;
     146           41 :                 while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     147            3 :                     prefix = &prefix[1..]
     148              :                 }
     149              : 
     150           38 :                 let mut prefix = prefix.to_string();
     151           70 :                 while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     152           32 :                     prefix.pop();
     153           32 :                 }
     154           38 :                 prefix
     155           41 :             });
     156           41 : 
     157           41 :         Ok(Self {
     158           41 :             client,
     159           41 :             bucket_name: remote_storage_config.bucket_name.clone(),
     160           41 :             max_keys_per_list_response: remote_storage_config.max_keys_per_list_response,
     161           41 :             prefix_in_bucket,
     162           41 :             concurrency_limiter: ConcurrencyLimiter::new(
     163           41 :                 remote_storage_config.concurrency_limit.get(),
     164           41 :             ),
     165           41 :             upload_storage_class: remote_storage_config.upload_storage_class.clone(),
     166           41 :             timeout,
     167           41 :         })
     168           41 :     }
     169              : 
     170          554 :     fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
     171          554 :         let relative_path =
     172          554 :             match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
     173          554 :                 Some(stripped) => stripped,
     174              :                 // we rely on AWS to return properly prefixed paths
     175              :                 // for requests with a certain prefix
     176            0 :                 None => panic!(
     177            0 :                     "Key {} does not start with bucket prefix {:?}",
     178            0 :                     key, self.prefix_in_bucket
     179            0 :                 ),
     180              :             };
     181          554 :         RemotePath(
     182          554 :             relative_path
     183          554 :                 .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     184          554 :                 .collect(),
     185          554 :         )
     186          554 :     }
     187              : 
     188          571 :     pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
     189          571 :         assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
     190          571 :         let path_string = path.get_path().as_str();
     191          571 :         match &self.prefix_in_bucket {
     192          562 :             Some(prefix) => prefix.clone() + "/" + path_string,
     193            9 :             None => path_string.to_string(),
     194              :         }
     195          571 :     }
     196              : 
     197          470 :     async fn permit(
     198          470 :         &self,
     199          470 :         kind: RequestKind,
     200          470 :         cancel: &CancellationToken,
     201          470 :     ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
     202          470 :         let started_at = start_counting_cancelled_wait(kind);
     203          470 :         let acquire = self.concurrency_limiter.acquire(kind);
     204              : 
     205          470 :         let permit = tokio::select! {
     206          470 :             permit = acquire => permit.expect("semaphore is never closed"),
     207          470 :             _ = cancel.cancelled() => return Err(Cancelled),
     208              :         };
     209              : 
     210          470 :         let started_at = ScopeGuard::into_inner(started_at);
     211          470 :         crate::metrics::BUCKET_METRICS
     212          470 :             .wait_seconds
     213          470 :             .observe_elapsed(kind, started_at);
     214          470 : 
     215          470 :         Ok(permit)
     216          470 :     }
     217              : 
     218           32 :     async fn owned_permit(
     219           32 :         &self,
     220           32 :         kind: RequestKind,
     221           32 :         cancel: &CancellationToken,
     222           32 :     ) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
     223           32 :         let started_at = start_counting_cancelled_wait(kind);
     224           32 :         let acquire = self.concurrency_limiter.acquire_owned(kind);
     225              : 
     226           32 :         let permit = tokio::select! {
     227           32 :             permit = acquire => permit.expect("semaphore is never closed"),
     228           32 :             _ = cancel.cancelled() => return Err(Cancelled),
     229              :         };
     230              : 
     231           32 :         let started_at = ScopeGuard::into_inner(started_at);
     232           32 :         crate::metrics::BUCKET_METRICS
     233           32 :             .wait_seconds
     234           32 :             .observe_elapsed(kind, started_at);
     235           32 :         Ok(permit)
     236           32 :     }
     237              : 
     238           32 :     async fn download_object(
     239           32 :         &self,
     240           32 :         request: GetObjectRequest,
     241           32 :         cancel: &CancellationToken,
     242           32 :     ) -> Result<Download, DownloadError> {
     243           32 :         let kind = RequestKind::Get;
     244              : 
     245           32 :         let permit = self.owned_permit(kind, cancel).await?;
     246              : 
     247           32 :         let started_at = start_measuring_requests(kind);
     248           32 : 
     249           32 :         let mut builder = self
     250           32 :             .client
     251           32 :             .get_object()
     252           32 :             .bucket(request.bucket)
     253           32 :             .key(request.key)
     254           32 :             .set_version_id(request.version_id)
     255           32 :             .set_range(request.range);
     256              : 
     257           32 :         if let Some(etag) = request.etag {
     258            6 :             builder = builder.if_none_match(etag);
     259           26 :         }
     260              : 
     261           32 :         let get_object = builder.send();
     262              : 
     263           32 :         let get_object = tokio::select! {
     264           32 :             res = get_object => res,
     265           32 :             _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
     266           32 :             _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
     267              :         };
     268              : 
     269           32 :         let started_at = ScopeGuard::into_inner(started_at);
     270              : 
     271           28 :         let object_output = match get_object {
     272           28 :             Ok(object_output) => object_output,
     273            4 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
     274              :                 // Count this in the AttemptOutcome::Ok bucket, because 404 is not
     275              :                 // an error: we expect to sometimes fetch an object and find it missing,
     276              :                 // e.g. when probing for timeline indices.
     277            0 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     278            0 :                     kind,
     279            0 :                     AttemptOutcome::Ok,
     280            0 :                     started_at,
     281            0 :                 );
     282            0 :                 return Err(DownloadError::NotFound);
     283              :             }
     284            4 :             Err(SdkError::ServiceError(e))
     285              :                 // aws_smithy_runtime_api::http::response::StatusCode isn't
     286              :                 // re-exported by any aws crates, so just check the numeric
     287              :                 // status against http_types::StatusCode instead of pulling it.
     288            4 :                 if e.raw().status().as_u16() == StatusCode::NotModified =>
     289            4 :             {
     290            4 :                 // Count an unmodified file as a success.
     291            4 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     292            4 :                     kind,
     293            4 :                     AttemptOutcome::Ok,
     294            4 :                     started_at,
     295            4 :                 );
     296            4 :                 return Err(DownloadError::Unmodified);
     297              :             }
     298            0 :             Err(e) => {
     299            0 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     300            0 :                     kind,
     301            0 :                     AttemptOutcome::Err,
     302            0 :                     started_at,
     303            0 :                 );
     304            0 : 
     305            0 :                 return Err(DownloadError::Other(
     306            0 :                     anyhow::Error::new(e).context("download s3 object"),
     307            0 :                 ));
     308              :             }
     309              :         };
     310              : 
     311              :         // even if we would have no timeout left, continue anyways. the caller can decide to ignore
     312              :         // the errors considering timeouts and cancellation.
     313           28 :         let remaining = self.timeout.saturating_sub(started_at.elapsed());
     314           28 : 
     315           28 :         let metadata = object_output.metadata().cloned().map(StorageMetadata);
     316           28 :         let etag = object_output
     317           28 :             .e_tag
     318           28 :             .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?
     319           28 :             .into();
     320           28 :         let last_modified = object_output
     321           28 :             .last_modified
     322           28 :             .ok_or(DownloadError::Other(anyhow::anyhow!(
     323           28 :                 "Missing LastModified header"
     324           28 :             )))?
     325           28 :             .try_into()
     326           28 :             .map_err(|e: ConversionError| DownloadError::Other(e.into()))?;
     327              : 
     328           28 :         let body = object_output.body;
     329           28 :         let body = ByteStreamAsStream::from(body);
     330           28 :         let body = PermitCarrying::new(permit, body);
     331           28 :         let body = TimedDownload::new(started_at, body);
     332           28 : 
     333           28 :         let cancel_or_timeout = crate::support::cancel_or_timeout(remaining, cancel.clone());
     334           28 :         let body = crate::support::DownloadStream::new(cancel_or_timeout, body);
     335           28 : 
     336           28 :         Ok(Download {
     337           28 :             metadata,
     338           28 :             etag,
     339           28 :             last_modified,
     340           28 :             download_stream: Box::pin(body),
     341           28 :         })
     342           32 :     }
     343              : 
     344          198 :     async fn delete_oids(
     345          198 :         &self,
     346          198 :         _permit: &tokio::sync::SemaphorePermit<'_>,
     347          198 :         delete_objects: &[ObjectIdentifier],
     348          198 :         cancel: &CancellationToken,
     349          198 :     ) -> anyhow::Result<()> {
     350          198 :         let kind = RequestKind::Delete;
     351          198 :         let mut cancel = std::pin::pin!(cancel.cancelled());
     352              : 
     353          198 :         for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE_S3) {
     354          198 :             let started_at = start_measuring_requests(kind);
     355              : 
     356          198 :             let req = self
     357          198 :                 .client
     358          198 :                 .delete_objects()
     359          198 :                 .bucket(self.bucket_name.clone())
     360          198 :                 .delete(
     361          198 :                     Delete::builder()
     362          198 :                         .set_objects(Some(chunk.to_vec()))
     363          198 :                         .build()
     364          198 :                         .context("build request")?,
     365              :                 )
     366          198 :                 .send();
     367              : 
     368          198 :             let resp = tokio::select! {
     369          198 :                 resp = req => resp,
     370          198 :                 _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
     371          198 :                 _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
     372              :             };
     373              : 
     374          198 :             let started_at = ScopeGuard::into_inner(started_at);
     375          198 :             crate::metrics::BUCKET_METRICS
     376          198 :                 .req_seconds
     377          198 :                 .observe_elapsed(kind, &resp, started_at);
     378              : 
     379          198 :             let resp = resp.context("request deletion")?;
     380          198 :             crate::metrics::BUCKET_METRICS
     381          198 :                 .deleted_objects_total
     382          198 :                 .inc_by(chunk.len() as u64);
     383              : 
     384          198 :             if let Some(errors) = resp.errors {
     385              :                 // Log a bounded number of the errors within the response:
     386              :                 // these requests can carry 1000 keys so logging each one
     387              :                 // would be too verbose, especially as errors may lead us
     388              :                 // to retry repeatedly.
     389              :                 const LOG_UP_TO_N_ERRORS: usize = 10;
     390            0 :                 for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
     391            0 :                     tracing::warn!(
     392            0 :                         "DeleteObjects key {} failed: {}: {}",
     393            0 :                         e.key.as_ref().map(Cow::from).unwrap_or("".into()),
     394            0 :                         e.code.as_ref().map(Cow::from).unwrap_or("".into()),
     395            0 :                         e.message.as_ref().map(Cow::from).unwrap_or("".into())
     396              :                     );
     397              :                 }
     398              : 
     399            0 :                 return Err(anyhow::anyhow!(
     400            0 :                     "Failed to delete {}/{} objects",
     401            0 :                     errors.len(),
     402            0 :                     chunk.len(),
     403            0 :                 ));
     404          198 :             }
     405              :         }
     406          198 :         Ok(())
     407          198 :     }
     408              : 
     409            6 :     async fn list_versions_with_permit(
     410            6 :         &self,
     411            6 :         _permit: &tokio::sync::SemaphorePermit<'_>,
     412            6 :         prefix: Option<&RemotePath>,
     413            6 :         mode: ListingMode,
     414            6 :         max_keys: Option<NonZeroU32>,
     415            6 :         cancel: &CancellationToken,
     416            6 :     ) -> Result<crate::VersionListing, DownloadError> {
     417            6 :         // get the passed prefix or if it is not set use prefix_in_bucket value
     418            6 :         let prefix = prefix
     419            6 :             .map(|p| self.relative_path_to_s3_object(p))
     420            6 :             .or_else(|| self.prefix_in_bucket.clone());
     421            6 : 
     422            6 :         let warn_threshold = 3;
     423            6 :         let max_retries = 10;
     424            6 :         let is_permanent = |e: &_| matches!(e, DownloadError::Cancelled);
     425              : 
     426            6 :         let mut key_marker = None;
     427            6 :         let mut version_id_marker = None;
     428            6 :         let mut versions_and_deletes = Vec::new();
     429              : 
     430              :         loop {
     431            6 :             let response = backoff::retry(
     432            7 :                 || async {
     433            7 :                     let mut request = self
     434            7 :                         .client
     435            7 :                         .list_object_versions()
     436            7 :                         .bucket(self.bucket_name.clone())
     437            7 :                         .set_prefix(prefix.clone())
     438            7 :                         .set_key_marker(key_marker.clone())
     439            7 :                         .set_version_id_marker(version_id_marker.clone());
     440            7 : 
     441            7 :                     if let ListingMode::WithDelimiter = mode {
     442            0 :                         request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
     443            7 :                     }
     444              : 
     445            7 :                     let op = request.send();
     446            7 : 
     447            7 :                     tokio::select! {
     448            7 :                         res = op => res.map_err(|e| DownloadError::Other(e.into())),
     449            7 :                         _ = cancel.cancelled() => Err(DownloadError::Cancelled),
     450              :                     }
     451           14 :                 },
     452            6 :                 is_permanent,
     453            6 :                 warn_threshold,
     454            6 :                 max_retries,
     455            6 :                 "listing object versions",
     456            6 :                 cancel,
     457            6 :             )
     458            6 :             .await
     459            6 :             .ok_or_else(|| DownloadError::Cancelled)
     460            6 :             .and_then(|x| x)?;
     461              : 
     462            6 :             tracing::trace!(
     463            0 :                 "  Got List response version_id_marker={:?}, key_marker={:?}",
     464              :                 response.version_id_marker,
     465              :                 response.key_marker
     466              :             );
     467            6 :             let versions = response
     468            6 :                 .versions
     469            6 :                 .unwrap_or_default()
     470            6 :                 .into_iter()
     471           28 :                 .map(|version| {
     472           28 :                     let key = version.key.expect("response does not contain a key");
     473           28 :                     let key = self.s3_object_to_relative_path(&key);
     474           28 :                     let version_id = VersionId(version.version_id.expect("needing version id"));
     475           28 :                     let last_modified =
     476           28 :                         SystemTime::try_from(version.last_modified.expect("no last_modified"))?;
     477           28 :                     Ok(Version {
     478           28 :                         key,
     479           28 :                         last_modified,
     480           28 :                         kind: crate::VersionKind::Version(version_id),
     481           28 :                     })
     482           28 :                 });
     483            6 :             let deletes = response
     484            6 :                 .delete_markers
     485            6 :                 .unwrap_or_default()
     486            6 :                 .into_iter()
     487            8 :                 .map(|version| {
     488            8 :                     let key = version.key.expect("response does not contain a key");
     489            8 :                     let key = self.s3_object_to_relative_path(&key);
     490            8 :                     let last_modified =
     491            8 :                         SystemTime::try_from(version.last_modified.expect("no last_modified"))?;
     492            8 :                     Ok(Version {
     493            8 :                         key,
     494            8 :                         last_modified,
     495            8 :                         kind: crate::VersionKind::DeletionMarker,
     496            8 :                     })
     497            8 :                 });
     498            6 :             itertools::process_results(versions.chain(deletes), |n_vds| {
     499            6 :                 versions_and_deletes.extend(n_vds)
     500            6 :             })
     501            6 :             .map_err(DownloadError::Other)?;
     502           12 :             fn none_if_empty(v: Option<String>) -> Option<String> {
     503           12 :                 v.filter(|v| !v.is_empty())
     504           12 :             }
     505            6 :             version_id_marker = none_if_empty(response.next_version_id_marker);
     506            6 :             key_marker = none_if_empty(response.next_key_marker);
     507            6 :             if version_id_marker.is_none() {
     508              :                 // The final response is not supposed to be truncated
     509            6 :                 if response.is_truncated.unwrap_or_default() {
     510            0 :                     return Err(DownloadError::Other(anyhow::anyhow!(
     511            0 :                         "Received truncated ListObjectVersions response for prefix={prefix:?}"
     512            0 :                     )));
     513            6 :                 }
     514            6 :                 break;
     515            0 :             }
     516            0 :             if let Some(max_keys) = max_keys {
     517            0 :                 if versions_and_deletes.len() >= max_keys.get().try_into().unwrap() {
     518            0 :                     return Err(DownloadError::Other(anyhow::anyhow!("too many versions")));
     519            0 :                 }
     520            0 :             }
     521              :         }
     522            6 :         Ok(VersionListing {
     523            6 :             versions: versions_and_deletes,
     524            6 :         })
     525            6 :     }
     526              : 
     527            6 :     pub fn bucket_name(&self) -> &str {
     528            6 :         &self.bucket_name
     529            6 :     }
     530              : }
     531              : 
     532              : pin_project_lite::pin_project! {
     533              :     struct ByteStreamAsStream {
     534              :         #[pin]
     535              :         inner: aws_smithy_types::byte_stream::ByteStream
     536              :     }
     537              : }
     538              : 
     539              : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
     540           28 :     fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
     541           28 :         ByteStreamAsStream { inner }
     542           28 :     }
     543              : }
     544              : 
     545              : impl Stream for ByteStreamAsStream {
     546              :     type Item = std::io::Result<Bytes>;
     547              : 
     548           41 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     549           41 :         // this does the std::io::ErrorKind::Other conversion
     550           41 :         self.project().inner.poll_next(cx).map_err(|x| x.into())
     551           41 :     }
     552              : 
     553              :     // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
     554              :     // sense and Stream::size_hint does not really
     555              : }
     556              : 
     557              : pin_project_lite::pin_project! {
     558              :     /// Times and tracks the outcome of the request.
     559              :     struct TimedDownload<S> {
     560              :         started_at: std::time::Instant,
     561              :         outcome: AttemptOutcome,
     562              :         #[pin]
     563              :         inner: S
     564              :     }
     565              : 
     566              :     impl<S> PinnedDrop for TimedDownload<S> {
     567              :         fn drop(mut this: Pin<&mut Self>) {
     568              :             crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
     569              :         }
     570              :     }
     571              : }
     572              : 
     573              : impl<S> TimedDownload<S> {
     574           28 :     fn new(started_at: std::time::Instant, inner: S) -> Self {
     575           28 :         TimedDownload {
     576           28 :             started_at,
     577           28 :             outcome: AttemptOutcome::Cancelled,
     578           28 :             inner,
     579           28 :         }
     580           28 :     }
     581              : }
     582              : 
     583              : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
     584              :     type Item = <S as Stream>::Item;
     585              : 
     586           41 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     587              :         use std::task::ready;
     588              : 
     589           41 :         let this = self.project();
     590              : 
     591           41 :         let res = ready!(this.inner.poll_next(cx));
     592           22 :         match &res {
     593           22 :             Some(Ok(_)) => {}
     594            0 :             Some(Err(_)) => *this.outcome = AttemptOutcome::Err,
     595           18 :             None => *this.outcome = AttemptOutcome::Ok,
     596              :         }
     597              : 
     598           40 :         Poll::Ready(res)
     599           41 :     }
     600              : 
     601            0 :     fn size_hint(&self) -> (usize, Option<usize>) {
     602            0 :         self.inner.size_hint()
     603            0 :     }
     604              : }
     605              : 
     606              : impl RemoteStorage for S3Bucket {
     607           64 :     fn list_streaming(
     608           64 :         &self,
     609           64 :         prefix: Option<&RemotePath>,
     610           64 :         mode: ListingMode,
     611           64 :         max_keys: Option<NonZeroU32>,
     612           64 :         cancel: &CancellationToken,
     613           64 :     ) -> impl Stream<Item = Result<Listing, DownloadError>> {
     614           64 :         let kind = RequestKind::List;
     615           64 :         // s3 sdk wants i32
     616           64 :         let mut max_keys = max_keys.map(|mk| mk.get() as i32);
     617           64 : 
     618           64 :         // get the passed prefix or if it is not set use prefix_in_bucket value
     619           64 :         let list_prefix = prefix
     620           64 :             .map(|p| self.relative_path_to_s3_object(p))
     621           64 :             .or_else(|| {
     622           32 :                 self.prefix_in_bucket.clone().map(|mut s| {
     623           32 :                     s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
     624           32 :                     s
     625           32 :                 })
     626           64 :             });
     627           64 : 
     628           64 :         async_stream::stream! {
     629           64 :             let _permit = self.permit(kind, cancel).await?;
     630           64 : 
     631           64 :             let mut continuation_token = None;
     632           64 :             'outer: loop {
     633           64 :                 let started_at = start_measuring_requests(kind);
     634           64 : 
     635           64 :                 // min of two Options, returning Some if one is value and another is
     636           64 :                 // None (None is smaller than anything, so plain min doesn't work).
     637           64 :                 let request_max_keys = self
     638           64 :                     .max_keys_per_list_response
     639           64 :                     .into_iter()
     640           64 :                     .chain(max_keys.into_iter())
     641           64 :                     .min();
     642           64 :                 let mut request = self
     643           64 :                     .client
     644           64 :                     .list_objects_v2()
     645           64 :                     .bucket(self.bucket_name.clone())
     646           64 :                     .set_prefix(list_prefix.clone())
     647           64 :                     .set_continuation_token(continuation_token.clone())
     648           64 :                     .set_max_keys(request_max_keys);
     649           64 : 
     650           64 :                 if let ListingMode::WithDelimiter = mode {
     651           64 :                     request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
     652           64 :                 }
     653           64 : 
     654           64 :                 let request = request.send();
     655           64 : 
     656           64 :                 let response = tokio::select! {
     657           64 :                     res = request => Ok(res),
     658           64 :                     _ = tokio::time::sleep(self.timeout) => Err(DownloadError::Timeout),
     659           64 :                     _ = cancel.cancelled() => Err(DownloadError::Cancelled),
     660           64 :                 }?;
     661           64 : 
     662           64 :                 let response = response
     663           64 :                     .context("Failed to list S3 prefixes")
     664           64 :                     .map_err(DownloadError::Other);
     665           64 : 
     666           64 :                 let started_at = ScopeGuard::into_inner(started_at);
     667           64 : 
     668           64 :                 crate::metrics::BUCKET_METRICS
     669           64 :                     .req_seconds
     670           64 :                     .observe_elapsed(kind, &response, started_at);
     671           64 : 
     672           64 :                 let response = match response {
     673           64 :                     Ok(response) => response,
     674           64 :                     Err(e) => {
     675           64 :                         // The error is potentially retryable, so we must rewind the loop after yielding.
     676           64 :                         yield Err(e);
     677           64 :                         continue 'outer;
     678           64 :                     },
     679           64 :                 };
     680           64 : 
     681           64 :                 let keys = response.contents();
     682           64 :                 let prefixes = response.common_prefixes.as_deref().unwrap_or_default();
     683           64 : 
     684           64 :                 tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
     685           64 :                 let mut result = Listing::default();
     686           64 : 
     687           64 :                 for object in keys {
     688           64 :                     let key = object.key().expect("response does not contain a key");
     689           64 :                     let key = self.s3_object_to_relative_path(key);
     690           64 : 
     691           64 :                     let last_modified = match object.last_modified.map(SystemTime::try_from) {
     692           64 :                         Some(Ok(t)) => t,
     693           64 :                         Some(Err(_)) => {
     694           64 :                             tracing::warn!("Remote storage last_modified {:?} for {} is out of bounds",
     695           64 :                                 object.last_modified, key
     696           64 :                         );
     697           64 :                             SystemTime::now()
     698           64 :                         },
     699           64 :                         None => {
     700           64 :                             SystemTime::now()
     701           64 :                         }
     702           64 :                     };
     703           64 : 
     704           64 :                     let size = object.size.unwrap_or(0) as u64;
     705           64 : 
     706           64 :                     result.keys.push(ListingObject{
     707           64 :                         key,
     708           64 :                         last_modified,
     709           64 :                         size,
     710           64 :                     });
     711           64 :                     if let Some(mut mk) = max_keys {
     712           64 :                         assert!(mk > 0);
     713           64 :                         mk -= 1;
     714           64 :                         if mk == 0 {
     715           64 :                             // limit reached
     716           64 :                             yield Ok(result);
     717           64 :                             break 'outer;
     718           64 :                         }
     719           64 :                         max_keys = Some(mk);
     720           64 :                     }
     721           64 :                 }
     722           64 : 
     723           64 :                 // S3 gives us prefixes like "foo/", we return them like "foo"
     724          104 :                 result.prefixes.extend(prefixes.iter().filter_map(|o| {
     725          104 :                     Some(
     726          104 :                         self.s3_object_to_relative_path(
     727          104 :                             o.prefix()?
     728          104 :                                 .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR),
     729           64 :                         ),
     730           64 :                     )
     731          104 :                 }));
     732           64 : 
     733           64 :                 yield Ok(result);
     734           64 : 
     735           64 :                 continuation_token = match response.next_continuation_token {
     736           64 :                     Some(new_token) => Some(new_token),
     737           64 :                     None => break,
     738           64 :                 };
     739           64 :             }
     740           64 :         }
     741           64 :     }
     742              : 
     743            0 :     async fn list_versions(
     744            0 :         &self,
     745            0 :         prefix: Option<&RemotePath>,
     746            0 :         mode: ListingMode,
     747            0 :         max_keys: Option<NonZeroU32>,
     748            0 :         cancel: &CancellationToken,
     749            0 :     ) -> Result<crate::VersionListing, DownloadError> {
     750            0 :         let kind = RequestKind::ListVersions;
     751            0 :         let permit = self.permit(kind, cancel).await?;
     752            0 :         self.list_versions_with_permit(&permit, prefix, mode, max_keys, cancel)
     753            0 :             .await
     754            0 :     }
     755              : 
     756            6 :     async fn head_object(
     757            6 :         &self,
     758            6 :         key: &RemotePath,
     759            6 :         cancel: &CancellationToken,
     760            6 :     ) -> Result<ListingObject, DownloadError> {
     761            6 :         let kind = RequestKind::Head;
     762            6 :         let _permit = self.permit(kind, cancel).await?;
     763              : 
     764            6 :         let started_at = start_measuring_requests(kind);
     765            6 : 
     766            6 :         let head_future = self
     767            6 :             .client
     768            6 :             .head_object()
     769            6 :             .bucket(self.bucket_name())
     770            6 :             .key(self.relative_path_to_s3_object(key))
     771            6 :             .send();
     772            6 : 
     773            6 :         let head_future = tokio::time::timeout(self.timeout, head_future);
     774              : 
     775            6 :         let res = tokio::select! {
     776            6 :             res = head_future => res,
     777            6 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     778              :         };
     779              : 
     780            6 :         let res = res.map_err(|_e| DownloadError::Timeout)?;
     781              : 
     782              :         // do not incl. timeouts as errors in metrics but cancellations
     783            6 :         let started_at = ScopeGuard::into_inner(started_at);
     784            6 :         crate::metrics::BUCKET_METRICS
     785            6 :             .req_seconds
     786            6 :             .observe_elapsed(kind, &res, started_at);
     787              : 
     788            4 :         let data = match res {
     789            4 :             Ok(object_output) => object_output,
     790            2 :             Err(SdkError::ServiceError(e)) if matches!(e.err(), HeadObjectError::NotFound(_)) => {
     791              :                 // Count this in the AttemptOutcome::Ok bucket, because 404 is not
     792              :                 // an error: we expect to sometimes fetch an object and find it missing,
     793              :                 // e.g. when probing for timeline indices.
     794            2 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     795            2 :                     kind,
     796            2 :                     AttemptOutcome::Ok,
     797            2 :                     started_at,
     798            2 :                 );
     799            2 :                 return Err(DownloadError::NotFound);
     800              :             }
     801            0 :             Err(e) => {
     802            0 :                 crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
     803            0 :                     kind,
     804            0 :                     AttemptOutcome::Err,
     805            0 :                     started_at,
     806            0 :                 );
     807            0 : 
     808            0 :                 return Err(DownloadError::Other(
     809            0 :                     anyhow::Error::new(e).context("s3 head object"),
     810            0 :                 ));
     811              :             }
     812              :         };
     813              : 
     814            4 :         let (Some(last_modified), Some(size)) = (data.last_modified, data.content_length) else {
     815            0 :             return Err(DownloadError::Other(anyhow!(
     816            0 :                 "head_object doesn't contain last_modified or content_length"
     817            0 :             )))?;
     818              :         };
     819              :         Ok(ListingObject {
     820            4 :             key: key.to_owned(),
     821            4 :             last_modified: SystemTime::try_from(last_modified).map_err(|e| {
     822            0 :                 DownloadError::Other(anyhow!("can't convert time '{last_modified}': {e}"))
     823            4 :             })?,
     824            4 :             size: size as u64,
     825              :         })
     826            6 :     }
     827              : 
     828          198 :     async fn upload(
     829          198 :         &self,
     830          198 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     831          198 :         from_size_bytes: usize,
     832          198 :         to: &RemotePath,
     833          198 :         metadata: Option<StorageMetadata>,
     834          198 :         cancel: &CancellationToken,
     835          198 :     ) -> anyhow::Result<()> {
     836          198 :         let kind = RequestKind::Put;
     837          198 :         let _permit = self.permit(kind, cancel).await?;
     838              : 
     839          198 :         let started_at = start_measuring_requests(kind);
     840          198 : 
     841          710 :         let body = StreamBody::new(from.map(|x| x.map(Frame::data)));
     842          198 :         let bytes_stream = ByteStream::new(SdkBody::from_body_1_x(body));
     843              : 
     844          198 :         let upload = self
     845          198 :             .client
     846          198 :             .put_object()
     847          198 :             .bucket(self.bucket_name.clone())
     848          198 :             .key(self.relative_path_to_s3_object(to))
     849          198 :             .set_metadata(metadata.map(|m| m.0))
     850          198 :             .set_storage_class(self.upload_storage_class.clone())
     851          198 :             .content_length(from_size_bytes.try_into()?)
     852          198 :             .body(bytes_stream)
     853          198 :             .send();
     854          198 : 
     855          198 :         let upload = tokio::time::timeout(self.timeout, upload);
     856              : 
     857          198 :         let res = tokio::select! {
     858          198 :             res = upload => res,
     859          198 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     860              :         };
     861              : 
     862          198 :         if let Ok(inner) = &res {
     863          198 :             // do not incl. timeouts as errors in metrics but cancellations
     864          198 :             let started_at = ScopeGuard::into_inner(started_at);
     865          198 :             crate::metrics::BUCKET_METRICS
     866          198 :                 .req_seconds
     867          198 :                 .observe_elapsed(kind, inner, started_at);
     868          198 :         }
     869              : 
     870          198 :         match res {
     871          198 :             Ok(Ok(_put)) => Ok(()),
     872            0 :             Ok(Err(sdk)) => Err(sdk.into()),
     873            0 :             Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
     874              :         }
     875            0 :     }
     876              : 
     877            2 :     async fn copy(
     878            2 :         &self,
     879            2 :         from: &RemotePath,
     880            2 :         to: &RemotePath,
     881            2 :         cancel: &CancellationToken,
     882            2 :     ) -> anyhow::Result<()> {
     883            2 :         let kind = RequestKind::Copy;
     884            2 :         let _permit = self.permit(kind, cancel).await?;
     885              : 
     886            2 :         let timeout = tokio::time::sleep(self.timeout);
     887            2 : 
     888            2 :         let started_at = start_measuring_requests(kind);
     889            2 : 
     890            2 :         // we need to specify bucket_name as a prefix
     891            2 :         let copy_source = format!(
     892            2 :             "{}/{}",
     893            2 :             self.bucket_name,
     894            2 :             self.relative_path_to_s3_object(from)
     895            2 :         );
     896            2 : 
     897            2 :         let op = self
     898            2 :             .client
     899            2 :             .copy_object()
     900            2 :             .bucket(self.bucket_name.clone())
     901            2 :             .key(self.relative_path_to_s3_object(to))
     902            2 :             .set_storage_class(self.upload_storage_class.clone())
     903            2 :             .copy_source(copy_source)
     904            2 :             .send();
     905              : 
     906            2 :         let res = tokio::select! {
     907            2 :             res = op => res,
     908            2 :             _ = timeout => return Err(TimeoutOrCancel::Timeout.into()),
     909            2 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     910              :         };
     911              : 
     912            2 :         let started_at = ScopeGuard::into_inner(started_at);
     913            2 :         crate::metrics::BUCKET_METRICS
     914            2 :             .req_seconds
     915            2 :             .observe_elapsed(kind, &res, started_at);
     916            2 : 
     917            2 :         res?;
     918              : 
     919            2 :         Ok(())
     920            2 :     }
     921              : 
     922           32 :     async fn download(
     923           32 :         &self,
     924           32 :         from: &RemotePath,
     925           32 :         opts: &DownloadOpts,
     926           32 :         cancel: &CancellationToken,
     927           32 :     ) -> Result<Download, DownloadError> {
     928           32 :         // if prefix is not none then download file `prefix/from`
     929           32 :         // if prefix is none then download file `from`
     930           32 :         self.download_object(
     931           32 :             GetObjectRequest {
     932           32 :                 bucket: self.bucket_name.clone(),
     933           32 :                 key: self.relative_path_to_s3_object(from),
     934           32 :                 etag: opts.etag.as_ref().map(|e| e.to_string()),
     935           32 :                 range: opts.byte_range_header(),
     936           32 :                 version_id: opts.version_id.as_ref().map(|v| v.0.to_owned()),
     937           32 :             },
     938           32 :             cancel,
     939           32 :         )
     940           32 :         .await
     941           32 :     }
     942              : 
     943          194 :     async fn delete_objects(
     944          194 :         &self,
     945          194 :         paths: &[RemotePath],
     946          194 :         cancel: &CancellationToken,
     947          194 :     ) -> anyhow::Result<()> {
     948          194 :         let kind = RequestKind::Delete;
     949          194 :         let permit = self.permit(kind, cancel).await?;
     950          194 :         let mut delete_objects = Vec::with_capacity(paths.len());
     951          430 :         for path in paths {
     952          236 :             let obj_id = ObjectIdentifier::builder()
     953          236 :                 .set_key(Some(self.relative_path_to_s3_object(path)))
     954          236 :                 .build()
     955          236 :                 .context("convert path to oid")?;
     956          236 :             delete_objects.push(obj_id);
     957              :         }
     958              : 
     959          194 :         self.delete_oids(&permit, &delete_objects, cancel).await
     960          194 :     }
     961              : 
     962            0 :     fn max_keys_per_delete(&self) -> usize {
     963            0 :         MAX_KEYS_PER_DELETE_S3
     964            0 :     }
     965              : 
     966          174 :     async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
     967          174 :         let paths = std::array::from_ref(path);
     968          174 :         self.delete_objects(paths, cancel).await
     969          174 :     }
     970              : 
     971            6 :     async fn time_travel_recover(
     972            6 :         &self,
     973            6 :         prefix: Option<&RemotePath>,
     974            6 :         timestamp: SystemTime,
     975            6 :         done_if_after: SystemTime,
     976            6 :         cancel: &CancellationToken,
     977            6 :     ) -> Result<(), TimeTravelError> {
     978            6 :         let kind = RequestKind::TimeTravel;
     979            6 :         let permit = self.permit(kind, cancel).await?;
     980              : 
     981            6 :         tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
     982              : 
     983              :         // Limit the number of versions deletions, mostly so that we don't
     984              :         // keep requesting forever if the list is too long, as we'd put the
     985              :         // list in RAM.
     986              :         // Building a list of 100k entries that reaches the limit roughly takes
     987              :         // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
     988              :         const COMPLEXITY_LIMIT: Option<NonZeroU32> = NonZeroU32::new(100_000);
     989              : 
     990            6 :         let mode = ListingMode::NoDelimiter;
     991            6 :         let version_listing = self
     992            6 :             .list_versions_with_permit(&permit, prefix, mode, COMPLEXITY_LIMIT, cancel)
     993            6 :             .await
     994            6 :             .map_err(|err| match err {
     995            0 :                 DownloadError::Other(e) => TimeTravelError::Other(e),
     996            0 :                 DownloadError::Cancelled => TimeTravelError::Cancelled,
     997            0 :                 other => TimeTravelError::Other(other.into()),
     998            6 :             })?;
     999            6 :         let versions_and_deletes = version_listing.versions;
    1000            6 : 
    1001            6 :         tracing::info!(
    1002            0 :             "Built list for time travel with {} versions and deletions",
    1003            0 :             versions_and_deletes.len()
    1004              :         );
    1005              : 
    1006              :         // Work on the list of references instead of the objects directly,
    1007              :         // otherwise we get lifetime errors in the sort_by_key call below.
    1008            6 :         let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
    1009            6 : 
    1010          124 :         versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
    1011            6 : 
    1012            6 :         let mut vds_for_key = HashMap::<_, Vec<_>>::new();
    1013              : 
    1014           42 :         for vd in &versions_and_deletes {
    1015           36 :             let Version { key, .. } = &vd;
    1016           36 :             let version_id = vd.version_id().map(|v| v.0.as_str());
    1017           36 :             if version_id == Some("null") {
    1018            0 :                 return Err(TimeTravelError::Other(anyhow!(
    1019            0 :                     "Received ListVersions response for key={key} with version_id='null', \
    1020            0 :                     indicating either disabled versioning, or legacy objects with null version id values"
    1021            0 :                 )));
    1022           36 :             }
    1023           36 :             tracing::trace!("Parsing version key={key} kind={:?}", vd.kind);
    1024              : 
    1025           36 :             vds_for_key.entry(key).or_default().push(vd);
    1026              :         }
    1027              : 
    1028            6 :         let warn_threshold = 3;
    1029            6 :         let max_retries = 10;
    1030            6 :         let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
    1031              : 
    1032           24 :         for (key, versions) in vds_for_key {
    1033           18 :             let last_vd = versions.last().unwrap();
    1034           18 :             let key = self.relative_path_to_s3_object(key);
    1035           18 :             if last_vd.last_modified > done_if_after {
    1036            0 :                 tracing::trace!("Key {key} has version later than done_if_after, skipping");
    1037            0 :                 continue;
    1038           18 :             }
    1039              :             // the version we want to restore to.
    1040           18 :             let version_to_restore_to =
    1041           36 :                 match versions.binary_search_by_key(&timestamp, |tpl| tpl.last_modified) {
    1042            0 :                     Ok(v) => v,
    1043           18 :                     Err(e) => e,
    1044              :                 };
    1045           18 :             if version_to_restore_to == versions.len() {
    1046            6 :                 tracing::trace!("Key {key} has no changes since timestamp, skipping");
    1047            6 :                 continue;
    1048           12 :             }
    1049           12 :             let mut do_delete = false;
    1050           12 :             if version_to_restore_to == 0 {
    1051              :                 // All versions more recent, so the key didn't exist at the specified time point.
    1052            6 :                 tracing::trace!(
    1053            0 :                     "All {} versions more recent for {key}, deleting",
    1054            0 :                     versions.len()
    1055              :                 );
    1056            6 :                 do_delete = true;
    1057              :             } else {
    1058            6 :                 match &versions[version_to_restore_to - 1] {
    1059              :                     Version {
    1060            6 :                         kind: VersionKind::Version(version_id),
    1061            6 :                         ..
    1062            6 :                     } => {
    1063            6 :                         let version_id = &version_id.0;
    1064            6 :                         tracing::trace!("Copying old version {version_id} for {key}...");
    1065              :                         // Restore the state to the last version by copying
    1066            6 :                         let source_id =
    1067            6 :                             format!("{}/{key}?versionId={version_id}", self.bucket_name);
    1068            6 : 
    1069            6 :                         backoff::retry(
    1070            6 :                             || async {
    1071            6 :                                 let op = self
    1072            6 :                                     .client
    1073            6 :                                     .copy_object()
    1074            6 :                                     .bucket(self.bucket_name.clone())
    1075            6 :                                     .key(&key)
    1076            6 :                                     .set_storage_class(self.upload_storage_class.clone())
    1077            6 :                                     .copy_source(&source_id)
    1078            6 :                                     .send();
    1079            6 : 
    1080            6 :                                 tokio::select! {
    1081            6 :                                     res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
    1082            6 :                                     _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
    1083              :                                 }
    1084           12 :                             },
    1085            6 :                             is_permanent,
    1086            6 :                             warn_threshold,
    1087            6 :                             max_retries,
    1088            6 :                             "copying object version for time_travel_recover",
    1089            6 :                             cancel,
    1090            6 :                         )
    1091            6 :                         .await
    1092            6 :                         .ok_or_else(|| TimeTravelError::Cancelled)
    1093            6 :                         .and_then(|x| x)?;
    1094            6 :                         tracing::info!(%version_id, %key, "Copied old version in S3");
    1095              :                     }
    1096              :                     Version {
    1097              :                         kind: VersionKind::DeletionMarker,
    1098              :                         ..
    1099            0 :                     } => {
    1100            0 :                         do_delete = true;
    1101            0 :                     }
    1102              :                 }
    1103              :             };
    1104           12 :             if do_delete {
    1105            6 :                 if matches!(last_vd.kind, VersionKind::DeletionMarker) {
    1106              :                     // Key has since been deleted (but there was some history), no need to do anything
    1107            2 :                     tracing::trace!("Key {key} already deleted, skipping.");
    1108              :                 } else {
    1109            4 :                     tracing::trace!("Deleting {key}...");
    1110              : 
    1111            4 :                     let oid = ObjectIdentifier::builder()
    1112            4 :                         .key(key.to_owned())
    1113            4 :                         .build()
    1114            4 :                         .map_err(|e| TimeTravelError::Other(e.into()))?;
    1115              : 
    1116            4 :                     self.delete_oids(&permit, &[oid], cancel)
    1117            4 :                         .await
    1118            4 :                         .map_err(|e| {
    1119            0 :                             // delete_oid0 will use TimeoutOrCancel
    1120            0 :                             if TimeoutOrCancel::caused_by_cancel(&e) {
    1121            0 :                                 TimeTravelError::Cancelled
    1122              :                             } else {
    1123            0 :                                 TimeTravelError::Other(e)
    1124              :                             }
    1125            4 :                         })?;
    1126              :                 }
    1127            6 :             }
    1128              :         }
    1129            6 :         Ok(())
    1130            6 :     }
    1131              : }
    1132              : 
    1133              : #[cfg(test)]
    1134              : mod tests {
    1135              :     use std::num::NonZeroUsize;
    1136              : 
    1137              :     use camino::Utf8Path;
    1138              : 
    1139              :     use crate::{RemotePath, S3Bucket, S3Config};
    1140              : 
    1141              :     #[tokio::test]
    1142            3 :     async fn relative_path() {
    1143            3 :         let all_paths = ["", "some/path", "some/path/"];
    1144            3 :         let all_paths: Vec<RemotePath> = all_paths
    1145            3 :             .iter()
    1146            9 :             .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
    1147            3 :             .collect();
    1148            3 :         let prefixes = [
    1149            3 :             None,
    1150            3 :             Some(""),
    1151            3 :             Some("test/prefix"),
    1152            3 :             Some("test/prefix/"),
    1153            3 :             Some("/test/prefix/"),
    1154            3 :         ];
    1155            3 :         let expected_outputs = [
    1156            3 :             vec!["", "some/path", "some/path/"],
    1157            3 :             vec!["/", "/some/path", "/some/path/"],
    1158            3 :             vec![
    1159            3 :                 "test/prefix/",
    1160            3 :                 "test/prefix/some/path",
    1161            3 :                 "test/prefix/some/path/",
    1162            3 :             ],
    1163            3 :             vec![
    1164            3 :                 "test/prefix/",
    1165            3 :                 "test/prefix/some/path",
    1166            3 :                 "test/prefix/some/path/",
    1167            3 :             ],
    1168            3 :             vec![
    1169            3 :                 "test/prefix/",
    1170            3 :                 "test/prefix/some/path",
    1171            3 :                 "test/prefix/some/path/",
    1172            3 :             ],
    1173            3 :         ];
    1174            3 : 
    1175           15 :         for (prefix_idx, prefix) in prefixes.iter().enumerate() {
    1176           15 :             let config = S3Config {
    1177           15 :                 bucket_name: "bucket".to_owned(),
    1178           15 :                 bucket_region: "region".to_owned(),
    1179           15 :                 prefix_in_bucket: prefix.map(str::to_string),
    1180           15 :                 endpoint: None,
    1181           15 :                 concurrency_limit: NonZeroUsize::new(100).unwrap(),
    1182           15 :                 max_keys_per_list_response: Some(5),
    1183           15 :                 upload_storage_class: None,
    1184           15 :             };
    1185           15 :             let storage = S3Bucket::new(&config, std::time::Duration::ZERO)
    1186           15 :                 .await
    1187           15 :                 .expect("remote storage init");
    1188           45 :             for (test_path_idx, test_path) in all_paths.iter().enumerate() {
    1189           45 :                 let result = storage.relative_path_to_s3_object(test_path);
    1190           45 :                 let expected = expected_outputs[prefix_idx][test_path_idx];
    1191           45 :                 assert_eq!(result, expected);
    1192            3 :             }
    1193            3 :         }
    1194            3 :     }
    1195              : }
        

Generated by: LCOV version 2.1-beta