Line data Source code
1 : //! AWS S3 storage wrapper around `rusoto` library.
2 : //!
3 : //! Respects `prefix_in_bucket` property from [`S3Config`],
4 : //! allowing multiple api users to independently work with the same S3 bucket, if
5 : //! their bucket prefixes are both specified and different.
6 :
7 : use std::borrow::Cow;
8 : use std::collections::HashMap;
9 : use std::num::NonZeroU32;
10 : use std::pin::Pin;
11 : use std::sync::Arc;
12 : use std::task::{Context, Poll};
13 : use std::time::{Duration, SystemTime};
14 :
15 : use anyhow::{Context as _, anyhow};
16 : use aws_config::BehaviorVersion;
17 : use aws_config::default_provider::credentials::DefaultCredentialsChain;
18 : use aws_config::retry::{RetryConfigBuilder, RetryMode};
19 : use aws_sdk_s3::Client;
20 : use aws_sdk_s3::config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep};
21 : use aws_sdk_s3::error::SdkError;
22 : use aws_sdk_s3::operation::get_object::GetObjectError;
23 : use aws_sdk_s3::operation::head_object::HeadObjectError;
24 : use aws_sdk_s3::types::{Delete, ObjectIdentifier, StorageClass};
25 : use aws_smithy_async::rt::sleep::TokioSleep;
26 : use aws_smithy_types::body::SdkBody;
27 : use aws_smithy_types::byte_stream::ByteStream;
28 : use aws_smithy_types::date_time::ConversionError;
29 : use bytes::Bytes;
30 : use futures::stream::Stream;
31 : use futures_util::StreamExt;
32 : use http_body_util::StreamBody;
33 : use http_types::StatusCode;
34 : use hyper::body::Frame;
35 : use scopeguard::ScopeGuard;
36 : use tokio_util::sync::CancellationToken;
37 : use utils::backoff;
38 :
39 : use super::StorageMetadata;
40 : use crate::config::S3Config;
41 : use crate::error::Cancelled;
42 : pub(super) use crate::metrics::RequestKind;
43 : use crate::metrics::{AttemptOutcome, start_counting_cancelled_wait, start_measuring_requests};
44 : use crate::support::PermitCarrying;
45 : use crate::{
46 : ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
47 : MAX_KEYS_PER_DELETE_S3, REMOTE_STORAGE_PREFIX_SEPARATOR, RemotePath, RemoteStorage,
48 : TimeTravelError, TimeoutOrCancel, Version, VersionId, VersionKind, VersionListing,
49 : };
50 :
51 : /// AWS S3 storage.
52 : pub struct S3Bucket {
53 : client: Client,
54 : bucket_name: String,
55 : prefix_in_bucket: Option<String>,
56 : max_keys_per_list_response: Option<i32>,
57 : upload_storage_class: Option<StorageClass>,
58 : concurrency_limiter: ConcurrencyLimiter,
59 : // Per-request timeout. Accessible for tests.
60 : pub timeout: Duration,
61 : }
62 :
63 : struct GetObjectRequest {
64 : bucket: String,
65 : key: String,
66 : etag: Option<String>,
67 : range: Option<String>,
68 : version_id: Option<String>,
69 : }
70 : impl S3Bucket {
71 : /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
72 41 : pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
73 41 : tracing::debug!(
74 0 : "Creating s3 remote storage for S3 bucket {}",
75 : remote_storage_config.bucket_name
76 : );
77 :
78 41 : let region = Region::new(remote_storage_config.bucket_region.clone());
79 41 : let region_opt = Some(region.clone());
80 :
81 : // https://docs.aws.amazon.com/sdkref/latest/guide/standardized-credentials.html
82 : // https://docs.rs/aws-config/latest/aws_config/default_provider/credentials/struct.DefaultCredentialsChain.html
83 : // Incomplete list of auth methods used by this:
84 : // * "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
85 : // * "AWS_PROFILE" / `aws sso login --profile <profile>`
86 : // * "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
87 : // * http (ECS/EKS) container credentials
88 : // * imds v2
89 41 : let credentials_provider = DefaultCredentialsChain::builder()
90 41 : .region(region)
91 41 : .build()
92 41 : .await;
93 :
94 : // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
95 41 : let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
96 41 :
97 41 : let sdk_config_loader: aws_config::ConfigLoader = aws_config::defaults(
98 41 : #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
99 41 : BehaviorVersion::v2023_11_09(),
100 41 : )
101 41 : .region(region_opt)
102 41 : .identity_cache(IdentityCache::lazy().build())
103 41 : .credentials_provider(credentials_provider)
104 41 : .sleep_impl(SharedAsyncSleep::from(sleep_impl));
105 41 :
106 41 : let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
107 41 : s.spawn(|| {
108 41 : // TODO: make this function async.
109 41 : tokio::runtime::Builder::new_current_thread()
110 41 : .enable_all()
111 41 : .build()
112 41 : .unwrap()
113 41 : .block_on(sdk_config_loader.load())
114 41 : })
115 41 : .join()
116 41 : .unwrap()
117 41 : });
118 41 :
119 41 : let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
120 :
121 : // Technically, the `remote_storage_config.endpoint` field only applies to S3 interactions.
122 : // (In case we ever re-use the `sdk_config` for more than just the S3 client in the future)
123 41 : if let Some(custom_endpoint) = remote_storage_config.endpoint.clone() {
124 0 : s3_config_builder = s3_config_builder
125 0 : .endpoint_url(custom_endpoint)
126 0 : .force_path_style(true);
127 41 : }
128 :
129 : // We do our own retries (see [`backoff::retry`]). However, for the AWS SDK to enable rate limiting in response to throttling
130 : // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config. We set it to use at most one
131 : // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
132 41 : let mut retry_config = RetryConfigBuilder::new();
133 41 : retry_config
134 41 : .set_max_attempts(Some(1))
135 41 : .set_mode(Some(RetryMode::Adaptive));
136 41 : s3_config_builder = s3_config_builder.retry_config(retry_config.build());
137 41 :
138 41 : let s3_config = s3_config_builder.build();
139 41 : let client = aws_sdk_s3::Client::from_conf(s3_config);
140 41 :
141 41 : let prefix_in_bucket = remote_storage_config
142 41 : .prefix_in_bucket
143 41 : .as_deref()
144 41 : .map(|prefix| {
145 38 : let mut prefix = prefix;
146 41 : while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
147 3 : prefix = &prefix[1..]
148 : }
149 :
150 38 : let mut prefix = prefix.to_string();
151 70 : while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
152 32 : prefix.pop();
153 32 : }
154 38 : prefix
155 41 : });
156 41 :
157 41 : Ok(Self {
158 41 : client,
159 41 : bucket_name: remote_storage_config.bucket_name.clone(),
160 41 : max_keys_per_list_response: remote_storage_config.max_keys_per_list_response,
161 41 : prefix_in_bucket,
162 41 : concurrency_limiter: ConcurrencyLimiter::new(
163 41 : remote_storage_config.concurrency_limit.get(),
164 41 : ),
165 41 : upload_storage_class: remote_storage_config.upload_storage_class.clone(),
166 41 : timeout,
167 41 : })
168 41 : }
169 :
170 554 : fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
171 554 : let relative_path =
172 554 : match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
173 554 : Some(stripped) => stripped,
174 : // we rely on AWS to return properly prefixed paths
175 : // for requests with a certain prefix
176 0 : None => panic!(
177 0 : "Key {} does not start with bucket prefix {:?}",
178 0 : key, self.prefix_in_bucket
179 0 : ),
180 : };
181 554 : RemotePath(
182 554 : relative_path
183 554 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
184 554 : .collect(),
185 554 : )
186 554 : }
187 :
188 571 : pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
189 571 : assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
190 571 : let path_string = path.get_path().as_str();
191 571 : match &self.prefix_in_bucket {
192 562 : Some(prefix) => prefix.clone() + "/" + path_string,
193 9 : None => path_string.to_string(),
194 : }
195 571 : }
196 :
197 470 : async fn permit(
198 470 : &self,
199 470 : kind: RequestKind,
200 470 : cancel: &CancellationToken,
201 470 : ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
202 470 : let started_at = start_counting_cancelled_wait(kind);
203 470 : let acquire = self.concurrency_limiter.acquire(kind);
204 :
205 470 : let permit = tokio::select! {
206 470 : permit = acquire => permit.expect("semaphore is never closed"),
207 470 : _ = cancel.cancelled() => return Err(Cancelled),
208 : };
209 :
210 470 : let started_at = ScopeGuard::into_inner(started_at);
211 470 : crate::metrics::BUCKET_METRICS
212 470 : .wait_seconds
213 470 : .observe_elapsed(kind, started_at);
214 470 :
215 470 : Ok(permit)
216 470 : }
217 :
218 32 : async fn owned_permit(
219 32 : &self,
220 32 : kind: RequestKind,
221 32 : cancel: &CancellationToken,
222 32 : ) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
223 32 : let started_at = start_counting_cancelled_wait(kind);
224 32 : let acquire = self.concurrency_limiter.acquire_owned(kind);
225 :
226 32 : let permit = tokio::select! {
227 32 : permit = acquire => permit.expect("semaphore is never closed"),
228 32 : _ = cancel.cancelled() => return Err(Cancelled),
229 : };
230 :
231 32 : let started_at = ScopeGuard::into_inner(started_at);
232 32 : crate::metrics::BUCKET_METRICS
233 32 : .wait_seconds
234 32 : .observe_elapsed(kind, started_at);
235 32 : Ok(permit)
236 32 : }
237 :
238 32 : async fn download_object(
239 32 : &self,
240 32 : request: GetObjectRequest,
241 32 : cancel: &CancellationToken,
242 32 : ) -> Result<Download, DownloadError> {
243 32 : let kind = RequestKind::Get;
244 :
245 32 : let permit = self.owned_permit(kind, cancel).await?;
246 :
247 32 : let started_at = start_measuring_requests(kind);
248 32 :
249 32 : let mut builder = self
250 32 : .client
251 32 : .get_object()
252 32 : .bucket(request.bucket)
253 32 : .key(request.key)
254 32 : .set_version_id(request.version_id)
255 32 : .set_range(request.range);
256 :
257 32 : if let Some(etag) = request.etag {
258 6 : builder = builder.if_none_match(etag);
259 26 : }
260 :
261 32 : let get_object = builder.send();
262 :
263 32 : let get_object = tokio::select! {
264 32 : res = get_object => res,
265 32 : _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
266 32 : _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
267 : };
268 :
269 32 : let started_at = ScopeGuard::into_inner(started_at);
270 :
271 28 : let object_output = match get_object {
272 28 : Ok(object_output) => object_output,
273 4 : Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
274 : // Count this in the AttemptOutcome::Ok bucket, because 404 is not
275 : // an error: we expect to sometimes fetch an object and find it missing,
276 : // e.g. when probing for timeline indices.
277 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
278 0 : kind,
279 0 : AttemptOutcome::Ok,
280 0 : started_at,
281 0 : );
282 0 : return Err(DownloadError::NotFound);
283 : }
284 4 : Err(SdkError::ServiceError(e))
285 : // aws_smithy_runtime_api::http::response::StatusCode isn't
286 : // re-exported by any aws crates, so just check the numeric
287 : // status against http_types::StatusCode instead of pulling it.
288 4 : if e.raw().status().as_u16() == StatusCode::NotModified =>
289 4 : {
290 4 : // Count an unmodified file as a success.
291 4 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
292 4 : kind,
293 4 : AttemptOutcome::Ok,
294 4 : started_at,
295 4 : );
296 4 : return Err(DownloadError::Unmodified);
297 : }
298 0 : Err(e) => {
299 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
300 0 : kind,
301 0 : AttemptOutcome::Err,
302 0 : started_at,
303 0 : );
304 0 :
305 0 : return Err(DownloadError::Other(
306 0 : anyhow::Error::new(e).context("download s3 object"),
307 0 : ));
308 : }
309 : };
310 :
311 : // even if we would have no timeout left, continue anyways. the caller can decide to ignore
312 : // the errors considering timeouts and cancellation.
313 28 : let remaining = self.timeout.saturating_sub(started_at.elapsed());
314 28 :
315 28 : let metadata = object_output.metadata().cloned().map(StorageMetadata);
316 28 : let etag = object_output
317 28 : .e_tag
318 28 : .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?
319 28 : .into();
320 28 : let last_modified = object_output
321 28 : .last_modified
322 28 : .ok_or(DownloadError::Other(anyhow::anyhow!(
323 28 : "Missing LastModified header"
324 28 : )))?
325 28 : .try_into()
326 28 : .map_err(|e: ConversionError| DownloadError::Other(e.into()))?;
327 :
328 28 : let body = object_output.body;
329 28 : let body = ByteStreamAsStream::from(body);
330 28 : let body = PermitCarrying::new(permit, body);
331 28 : let body = TimedDownload::new(started_at, body);
332 28 :
333 28 : let cancel_or_timeout = crate::support::cancel_or_timeout(remaining, cancel.clone());
334 28 : let body = crate::support::DownloadStream::new(cancel_or_timeout, body);
335 28 :
336 28 : Ok(Download {
337 28 : metadata,
338 28 : etag,
339 28 : last_modified,
340 28 : download_stream: Box::pin(body),
341 28 : })
342 32 : }
343 :
344 198 : async fn delete_oids(
345 198 : &self,
346 198 : _permit: &tokio::sync::SemaphorePermit<'_>,
347 198 : delete_objects: &[ObjectIdentifier],
348 198 : cancel: &CancellationToken,
349 198 : ) -> anyhow::Result<()> {
350 198 : let kind = RequestKind::Delete;
351 198 : let mut cancel = std::pin::pin!(cancel.cancelled());
352 :
353 198 : for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE_S3) {
354 198 : let started_at = start_measuring_requests(kind);
355 :
356 198 : let req = self
357 198 : .client
358 198 : .delete_objects()
359 198 : .bucket(self.bucket_name.clone())
360 198 : .delete(
361 198 : Delete::builder()
362 198 : .set_objects(Some(chunk.to_vec()))
363 198 : .build()
364 198 : .context("build request")?,
365 : )
366 198 : .send();
367 :
368 198 : let resp = tokio::select! {
369 198 : resp = req => resp,
370 198 : _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
371 198 : _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
372 : };
373 :
374 198 : let started_at = ScopeGuard::into_inner(started_at);
375 198 : crate::metrics::BUCKET_METRICS
376 198 : .req_seconds
377 198 : .observe_elapsed(kind, &resp, started_at);
378 :
379 198 : let resp = resp.context("request deletion")?;
380 198 : crate::metrics::BUCKET_METRICS
381 198 : .deleted_objects_total
382 198 : .inc_by(chunk.len() as u64);
383 :
384 198 : if let Some(errors) = resp.errors {
385 : // Log a bounded number of the errors within the response:
386 : // these requests can carry 1000 keys so logging each one
387 : // would be too verbose, especially as errors may lead us
388 : // to retry repeatedly.
389 : const LOG_UP_TO_N_ERRORS: usize = 10;
390 0 : for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
391 0 : tracing::warn!(
392 0 : "DeleteObjects key {} failed: {}: {}",
393 0 : e.key.as_ref().map(Cow::from).unwrap_or("".into()),
394 0 : e.code.as_ref().map(Cow::from).unwrap_or("".into()),
395 0 : e.message.as_ref().map(Cow::from).unwrap_or("".into())
396 : );
397 : }
398 :
399 0 : return Err(anyhow::anyhow!(
400 0 : "Failed to delete {}/{} objects",
401 0 : errors.len(),
402 0 : chunk.len(),
403 0 : ));
404 198 : }
405 : }
406 198 : Ok(())
407 198 : }
408 :
409 6 : async fn list_versions_with_permit(
410 6 : &self,
411 6 : _permit: &tokio::sync::SemaphorePermit<'_>,
412 6 : prefix: Option<&RemotePath>,
413 6 : mode: ListingMode,
414 6 : max_keys: Option<NonZeroU32>,
415 6 : cancel: &CancellationToken,
416 6 : ) -> Result<crate::VersionListing, DownloadError> {
417 6 : // get the passed prefix or if it is not set use prefix_in_bucket value
418 6 : let prefix = prefix
419 6 : .map(|p| self.relative_path_to_s3_object(p))
420 6 : .or_else(|| self.prefix_in_bucket.clone());
421 6 :
422 6 : let warn_threshold = 3;
423 6 : let max_retries = 10;
424 6 : let is_permanent = |e: &_| matches!(e, DownloadError::Cancelled);
425 :
426 6 : let mut key_marker = None;
427 6 : let mut version_id_marker = None;
428 6 : let mut versions_and_deletes = Vec::new();
429 :
430 : loop {
431 6 : let response = backoff::retry(
432 7 : || async {
433 7 : let mut request = self
434 7 : .client
435 7 : .list_object_versions()
436 7 : .bucket(self.bucket_name.clone())
437 7 : .set_prefix(prefix.clone())
438 7 : .set_key_marker(key_marker.clone())
439 7 : .set_version_id_marker(version_id_marker.clone());
440 7 :
441 7 : if let ListingMode::WithDelimiter = mode {
442 0 : request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
443 7 : }
444 :
445 7 : let op = request.send();
446 7 :
447 7 : tokio::select! {
448 7 : res = op => res.map_err(|e| DownloadError::Other(e.into())),
449 7 : _ = cancel.cancelled() => Err(DownloadError::Cancelled),
450 : }
451 14 : },
452 6 : is_permanent,
453 6 : warn_threshold,
454 6 : max_retries,
455 6 : "listing object versions",
456 6 : cancel,
457 6 : )
458 6 : .await
459 6 : .ok_or_else(|| DownloadError::Cancelled)
460 6 : .and_then(|x| x)?;
461 :
462 6 : tracing::trace!(
463 0 : " Got List response version_id_marker={:?}, key_marker={:?}",
464 : response.version_id_marker,
465 : response.key_marker
466 : );
467 6 : let versions = response
468 6 : .versions
469 6 : .unwrap_or_default()
470 6 : .into_iter()
471 28 : .map(|version| {
472 28 : let key = version.key.expect("response does not contain a key");
473 28 : let key = self.s3_object_to_relative_path(&key);
474 28 : let version_id = VersionId(version.version_id.expect("needing version id"));
475 28 : let last_modified =
476 28 : SystemTime::try_from(version.last_modified.expect("no last_modified"))?;
477 28 : Ok(Version {
478 28 : key,
479 28 : last_modified,
480 28 : kind: crate::VersionKind::Version(version_id),
481 28 : })
482 28 : });
483 6 : let deletes = response
484 6 : .delete_markers
485 6 : .unwrap_or_default()
486 6 : .into_iter()
487 8 : .map(|version| {
488 8 : let key = version.key.expect("response does not contain a key");
489 8 : let key = self.s3_object_to_relative_path(&key);
490 8 : let last_modified =
491 8 : SystemTime::try_from(version.last_modified.expect("no last_modified"))?;
492 8 : Ok(Version {
493 8 : key,
494 8 : last_modified,
495 8 : kind: crate::VersionKind::DeletionMarker,
496 8 : })
497 8 : });
498 6 : itertools::process_results(versions.chain(deletes), |n_vds| {
499 6 : versions_and_deletes.extend(n_vds)
500 6 : })
501 6 : .map_err(DownloadError::Other)?;
502 12 : fn none_if_empty(v: Option<String>) -> Option<String> {
503 12 : v.filter(|v| !v.is_empty())
504 12 : }
505 6 : version_id_marker = none_if_empty(response.next_version_id_marker);
506 6 : key_marker = none_if_empty(response.next_key_marker);
507 6 : if version_id_marker.is_none() {
508 : // The final response is not supposed to be truncated
509 6 : if response.is_truncated.unwrap_or_default() {
510 0 : return Err(DownloadError::Other(anyhow::anyhow!(
511 0 : "Received truncated ListObjectVersions response for prefix={prefix:?}"
512 0 : )));
513 6 : }
514 6 : break;
515 0 : }
516 0 : if let Some(max_keys) = max_keys {
517 0 : if versions_and_deletes.len() >= max_keys.get().try_into().unwrap() {
518 0 : return Err(DownloadError::Other(anyhow::anyhow!("too many versions")));
519 0 : }
520 0 : }
521 : }
522 6 : Ok(VersionListing {
523 6 : versions: versions_and_deletes,
524 6 : })
525 6 : }
526 :
527 6 : pub fn bucket_name(&self) -> &str {
528 6 : &self.bucket_name
529 6 : }
530 : }
531 :
532 : pin_project_lite::pin_project! {
533 : struct ByteStreamAsStream {
534 : #[pin]
535 : inner: aws_smithy_types::byte_stream::ByteStream
536 : }
537 : }
538 :
539 : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
540 28 : fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
541 28 : ByteStreamAsStream { inner }
542 28 : }
543 : }
544 :
545 : impl Stream for ByteStreamAsStream {
546 : type Item = std::io::Result<Bytes>;
547 :
548 41 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
549 41 : // this does the std::io::ErrorKind::Other conversion
550 41 : self.project().inner.poll_next(cx).map_err(|x| x.into())
551 41 : }
552 :
553 : // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
554 : // sense and Stream::size_hint does not really
555 : }
556 :
557 : pin_project_lite::pin_project! {
558 : /// Times and tracks the outcome of the request.
559 : struct TimedDownload<S> {
560 : started_at: std::time::Instant,
561 : outcome: AttemptOutcome,
562 : #[pin]
563 : inner: S
564 : }
565 :
566 : impl<S> PinnedDrop for TimedDownload<S> {
567 : fn drop(mut this: Pin<&mut Self>) {
568 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
569 : }
570 : }
571 : }
572 :
573 : impl<S> TimedDownload<S> {
574 28 : fn new(started_at: std::time::Instant, inner: S) -> Self {
575 28 : TimedDownload {
576 28 : started_at,
577 28 : outcome: AttemptOutcome::Cancelled,
578 28 : inner,
579 28 : }
580 28 : }
581 : }
582 :
583 : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
584 : type Item = <S as Stream>::Item;
585 :
586 41 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
587 : use std::task::ready;
588 :
589 41 : let this = self.project();
590 :
591 41 : let res = ready!(this.inner.poll_next(cx));
592 22 : match &res {
593 22 : Some(Ok(_)) => {}
594 0 : Some(Err(_)) => *this.outcome = AttemptOutcome::Err,
595 18 : None => *this.outcome = AttemptOutcome::Ok,
596 : }
597 :
598 40 : Poll::Ready(res)
599 41 : }
600 :
601 0 : fn size_hint(&self) -> (usize, Option<usize>) {
602 0 : self.inner.size_hint()
603 0 : }
604 : }
605 :
606 : impl RemoteStorage for S3Bucket {
607 64 : fn list_streaming(
608 64 : &self,
609 64 : prefix: Option<&RemotePath>,
610 64 : mode: ListingMode,
611 64 : max_keys: Option<NonZeroU32>,
612 64 : cancel: &CancellationToken,
613 64 : ) -> impl Stream<Item = Result<Listing, DownloadError>> {
614 64 : let kind = RequestKind::List;
615 64 : // s3 sdk wants i32
616 64 : let mut max_keys = max_keys.map(|mk| mk.get() as i32);
617 64 :
618 64 : // get the passed prefix or if it is not set use prefix_in_bucket value
619 64 : let list_prefix = prefix
620 64 : .map(|p| self.relative_path_to_s3_object(p))
621 64 : .or_else(|| {
622 32 : self.prefix_in_bucket.clone().map(|mut s| {
623 32 : s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
624 32 : s
625 32 : })
626 64 : });
627 64 :
628 64 : async_stream::stream! {
629 64 : let _permit = self.permit(kind, cancel).await?;
630 64 :
631 64 : let mut continuation_token = None;
632 64 : 'outer: loop {
633 64 : let started_at = start_measuring_requests(kind);
634 64 :
635 64 : // min of two Options, returning Some if one is value and another is
636 64 : // None (None is smaller than anything, so plain min doesn't work).
637 64 : let request_max_keys = self
638 64 : .max_keys_per_list_response
639 64 : .into_iter()
640 64 : .chain(max_keys.into_iter())
641 64 : .min();
642 64 : let mut request = self
643 64 : .client
644 64 : .list_objects_v2()
645 64 : .bucket(self.bucket_name.clone())
646 64 : .set_prefix(list_prefix.clone())
647 64 : .set_continuation_token(continuation_token.clone())
648 64 : .set_max_keys(request_max_keys);
649 64 :
650 64 : if let ListingMode::WithDelimiter = mode {
651 64 : request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
652 64 : }
653 64 :
654 64 : let request = request.send();
655 64 :
656 64 : let response = tokio::select! {
657 64 : res = request => Ok(res),
658 64 : _ = tokio::time::sleep(self.timeout) => Err(DownloadError::Timeout),
659 64 : _ = cancel.cancelled() => Err(DownloadError::Cancelled),
660 64 : }?;
661 64 :
662 64 : let response = response
663 64 : .context("Failed to list S3 prefixes")
664 64 : .map_err(DownloadError::Other);
665 64 :
666 64 : let started_at = ScopeGuard::into_inner(started_at);
667 64 :
668 64 : crate::metrics::BUCKET_METRICS
669 64 : .req_seconds
670 64 : .observe_elapsed(kind, &response, started_at);
671 64 :
672 64 : let response = match response {
673 64 : Ok(response) => response,
674 64 : Err(e) => {
675 64 : // The error is potentially retryable, so we must rewind the loop after yielding.
676 64 : yield Err(e);
677 64 : continue 'outer;
678 64 : },
679 64 : };
680 64 :
681 64 : let keys = response.contents();
682 64 : let prefixes = response.common_prefixes.as_deref().unwrap_or_default();
683 64 :
684 64 : tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
685 64 : let mut result = Listing::default();
686 64 :
687 64 : for object in keys {
688 64 : let key = object.key().expect("response does not contain a key");
689 64 : let key = self.s3_object_to_relative_path(key);
690 64 :
691 64 : let last_modified = match object.last_modified.map(SystemTime::try_from) {
692 64 : Some(Ok(t)) => t,
693 64 : Some(Err(_)) => {
694 64 : tracing::warn!("Remote storage last_modified {:?} for {} is out of bounds",
695 64 : object.last_modified, key
696 64 : );
697 64 : SystemTime::now()
698 64 : },
699 64 : None => {
700 64 : SystemTime::now()
701 64 : }
702 64 : };
703 64 :
704 64 : let size = object.size.unwrap_or(0) as u64;
705 64 :
706 64 : result.keys.push(ListingObject{
707 64 : key,
708 64 : last_modified,
709 64 : size,
710 64 : });
711 64 : if let Some(mut mk) = max_keys {
712 64 : assert!(mk > 0);
713 64 : mk -= 1;
714 64 : if mk == 0 {
715 64 : // limit reached
716 64 : yield Ok(result);
717 64 : break 'outer;
718 64 : }
719 64 : max_keys = Some(mk);
720 64 : }
721 64 : }
722 64 :
723 64 : // S3 gives us prefixes like "foo/", we return them like "foo"
724 104 : result.prefixes.extend(prefixes.iter().filter_map(|o| {
725 104 : Some(
726 104 : self.s3_object_to_relative_path(
727 104 : o.prefix()?
728 104 : .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR),
729 64 : ),
730 64 : )
731 104 : }));
732 64 :
733 64 : yield Ok(result);
734 64 :
735 64 : continuation_token = match response.next_continuation_token {
736 64 : Some(new_token) => Some(new_token),
737 64 : None => break,
738 64 : };
739 64 : }
740 64 : }
741 64 : }
742 :
743 0 : async fn list_versions(
744 0 : &self,
745 0 : prefix: Option<&RemotePath>,
746 0 : mode: ListingMode,
747 0 : max_keys: Option<NonZeroU32>,
748 0 : cancel: &CancellationToken,
749 0 : ) -> Result<crate::VersionListing, DownloadError> {
750 0 : let kind = RequestKind::ListVersions;
751 0 : let permit = self.permit(kind, cancel).await?;
752 0 : self.list_versions_with_permit(&permit, prefix, mode, max_keys, cancel)
753 0 : .await
754 0 : }
755 :
756 6 : async fn head_object(
757 6 : &self,
758 6 : key: &RemotePath,
759 6 : cancel: &CancellationToken,
760 6 : ) -> Result<ListingObject, DownloadError> {
761 6 : let kind = RequestKind::Head;
762 6 : let _permit = self.permit(kind, cancel).await?;
763 :
764 6 : let started_at = start_measuring_requests(kind);
765 6 :
766 6 : let head_future = self
767 6 : .client
768 6 : .head_object()
769 6 : .bucket(self.bucket_name())
770 6 : .key(self.relative_path_to_s3_object(key))
771 6 : .send();
772 6 :
773 6 : let head_future = tokio::time::timeout(self.timeout, head_future);
774 :
775 6 : let res = tokio::select! {
776 6 : res = head_future => res,
777 6 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
778 : };
779 :
780 6 : let res = res.map_err(|_e| DownloadError::Timeout)?;
781 :
782 : // do not incl. timeouts as errors in metrics but cancellations
783 6 : let started_at = ScopeGuard::into_inner(started_at);
784 6 : crate::metrics::BUCKET_METRICS
785 6 : .req_seconds
786 6 : .observe_elapsed(kind, &res, started_at);
787 :
788 4 : let data = match res {
789 4 : Ok(object_output) => object_output,
790 2 : Err(SdkError::ServiceError(e)) if matches!(e.err(), HeadObjectError::NotFound(_)) => {
791 : // Count this in the AttemptOutcome::Ok bucket, because 404 is not
792 : // an error: we expect to sometimes fetch an object and find it missing,
793 : // e.g. when probing for timeline indices.
794 2 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
795 2 : kind,
796 2 : AttemptOutcome::Ok,
797 2 : started_at,
798 2 : );
799 2 : return Err(DownloadError::NotFound);
800 : }
801 0 : Err(e) => {
802 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
803 0 : kind,
804 0 : AttemptOutcome::Err,
805 0 : started_at,
806 0 : );
807 0 :
808 0 : return Err(DownloadError::Other(
809 0 : anyhow::Error::new(e).context("s3 head object"),
810 0 : ));
811 : }
812 : };
813 :
814 4 : let (Some(last_modified), Some(size)) = (data.last_modified, data.content_length) else {
815 0 : return Err(DownloadError::Other(anyhow!(
816 0 : "head_object doesn't contain last_modified or content_length"
817 0 : )))?;
818 : };
819 : Ok(ListingObject {
820 4 : key: key.to_owned(),
821 4 : last_modified: SystemTime::try_from(last_modified).map_err(|e| {
822 0 : DownloadError::Other(anyhow!("can't convert time '{last_modified}': {e}"))
823 4 : })?,
824 4 : size: size as u64,
825 : })
826 6 : }
827 :
828 198 : async fn upload(
829 198 : &self,
830 198 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
831 198 : from_size_bytes: usize,
832 198 : to: &RemotePath,
833 198 : metadata: Option<StorageMetadata>,
834 198 : cancel: &CancellationToken,
835 198 : ) -> anyhow::Result<()> {
836 198 : let kind = RequestKind::Put;
837 198 : let _permit = self.permit(kind, cancel).await?;
838 :
839 198 : let started_at = start_measuring_requests(kind);
840 198 :
841 710 : let body = StreamBody::new(from.map(|x| x.map(Frame::data)));
842 198 : let bytes_stream = ByteStream::new(SdkBody::from_body_1_x(body));
843 :
844 198 : let upload = self
845 198 : .client
846 198 : .put_object()
847 198 : .bucket(self.bucket_name.clone())
848 198 : .key(self.relative_path_to_s3_object(to))
849 198 : .set_metadata(metadata.map(|m| m.0))
850 198 : .set_storage_class(self.upload_storage_class.clone())
851 198 : .content_length(from_size_bytes.try_into()?)
852 198 : .body(bytes_stream)
853 198 : .send();
854 198 :
855 198 : let upload = tokio::time::timeout(self.timeout, upload);
856 :
857 198 : let res = tokio::select! {
858 198 : res = upload => res,
859 198 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
860 : };
861 :
862 198 : if let Ok(inner) = &res {
863 198 : // do not incl. timeouts as errors in metrics but cancellations
864 198 : let started_at = ScopeGuard::into_inner(started_at);
865 198 : crate::metrics::BUCKET_METRICS
866 198 : .req_seconds
867 198 : .observe_elapsed(kind, inner, started_at);
868 198 : }
869 :
870 198 : match res {
871 198 : Ok(Ok(_put)) => Ok(()),
872 0 : Ok(Err(sdk)) => Err(sdk.into()),
873 0 : Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
874 : }
875 0 : }
876 :
877 2 : async fn copy(
878 2 : &self,
879 2 : from: &RemotePath,
880 2 : to: &RemotePath,
881 2 : cancel: &CancellationToken,
882 2 : ) -> anyhow::Result<()> {
883 2 : let kind = RequestKind::Copy;
884 2 : let _permit = self.permit(kind, cancel).await?;
885 :
886 2 : let timeout = tokio::time::sleep(self.timeout);
887 2 :
888 2 : let started_at = start_measuring_requests(kind);
889 2 :
890 2 : // we need to specify bucket_name as a prefix
891 2 : let copy_source = format!(
892 2 : "{}/{}",
893 2 : self.bucket_name,
894 2 : self.relative_path_to_s3_object(from)
895 2 : );
896 2 :
897 2 : let op = self
898 2 : .client
899 2 : .copy_object()
900 2 : .bucket(self.bucket_name.clone())
901 2 : .key(self.relative_path_to_s3_object(to))
902 2 : .set_storage_class(self.upload_storage_class.clone())
903 2 : .copy_source(copy_source)
904 2 : .send();
905 :
906 2 : let res = tokio::select! {
907 2 : res = op => res,
908 2 : _ = timeout => return Err(TimeoutOrCancel::Timeout.into()),
909 2 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
910 : };
911 :
912 2 : let started_at = ScopeGuard::into_inner(started_at);
913 2 : crate::metrics::BUCKET_METRICS
914 2 : .req_seconds
915 2 : .observe_elapsed(kind, &res, started_at);
916 2 :
917 2 : res?;
918 :
919 2 : Ok(())
920 2 : }
921 :
922 32 : async fn download(
923 32 : &self,
924 32 : from: &RemotePath,
925 32 : opts: &DownloadOpts,
926 32 : cancel: &CancellationToken,
927 32 : ) -> Result<Download, DownloadError> {
928 32 : // if prefix is not none then download file `prefix/from`
929 32 : // if prefix is none then download file `from`
930 32 : self.download_object(
931 32 : GetObjectRequest {
932 32 : bucket: self.bucket_name.clone(),
933 32 : key: self.relative_path_to_s3_object(from),
934 32 : etag: opts.etag.as_ref().map(|e| e.to_string()),
935 32 : range: opts.byte_range_header(),
936 32 : version_id: opts.version_id.as_ref().map(|v| v.0.to_owned()),
937 32 : },
938 32 : cancel,
939 32 : )
940 32 : .await
941 32 : }
942 :
943 194 : async fn delete_objects(
944 194 : &self,
945 194 : paths: &[RemotePath],
946 194 : cancel: &CancellationToken,
947 194 : ) -> anyhow::Result<()> {
948 194 : let kind = RequestKind::Delete;
949 194 : let permit = self.permit(kind, cancel).await?;
950 194 : let mut delete_objects = Vec::with_capacity(paths.len());
951 430 : for path in paths {
952 236 : let obj_id = ObjectIdentifier::builder()
953 236 : .set_key(Some(self.relative_path_to_s3_object(path)))
954 236 : .build()
955 236 : .context("convert path to oid")?;
956 236 : delete_objects.push(obj_id);
957 : }
958 :
959 194 : self.delete_oids(&permit, &delete_objects, cancel).await
960 194 : }
961 :
962 0 : fn max_keys_per_delete(&self) -> usize {
963 0 : MAX_KEYS_PER_DELETE_S3
964 0 : }
965 :
966 174 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
967 174 : let paths = std::array::from_ref(path);
968 174 : self.delete_objects(paths, cancel).await
969 174 : }
970 :
971 6 : async fn time_travel_recover(
972 6 : &self,
973 6 : prefix: Option<&RemotePath>,
974 6 : timestamp: SystemTime,
975 6 : done_if_after: SystemTime,
976 6 : cancel: &CancellationToken,
977 6 : ) -> Result<(), TimeTravelError> {
978 6 : let kind = RequestKind::TimeTravel;
979 6 : let permit = self.permit(kind, cancel).await?;
980 :
981 6 : tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
982 :
983 : // Limit the number of versions deletions, mostly so that we don't
984 : // keep requesting forever if the list is too long, as we'd put the
985 : // list in RAM.
986 : // Building a list of 100k entries that reaches the limit roughly takes
987 : // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
988 : const COMPLEXITY_LIMIT: Option<NonZeroU32> = NonZeroU32::new(100_000);
989 :
990 6 : let mode = ListingMode::NoDelimiter;
991 6 : let version_listing = self
992 6 : .list_versions_with_permit(&permit, prefix, mode, COMPLEXITY_LIMIT, cancel)
993 6 : .await
994 6 : .map_err(|err| match err {
995 0 : DownloadError::Other(e) => TimeTravelError::Other(e),
996 0 : DownloadError::Cancelled => TimeTravelError::Cancelled,
997 0 : other => TimeTravelError::Other(other.into()),
998 6 : })?;
999 6 : let versions_and_deletes = version_listing.versions;
1000 6 :
1001 6 : tracing::info!(
1002 0 : "Built list for time travel with {} versions and deletions",
1003 0 : versions_and_deletes.len()
1004 : );
1005 :
1006 : // Work on the list of references instead of the objects directly,
1007 : // otherwise we get lifetime errors in the sort_by_key call below.
1008 6 : let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
1009 6 :
1010 124 : versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
1011 6 :
1012 6 : let mut vds_for_key = HashMap::<_, Vec<_>>::new();
1013 :
1014 42 : for vd in &versions_and_deletes {
1015 36 : let Version { key, .. } = &vd;
1016 36 : let version_id = vd.version_id().map(|v| v.0.as_str());
1017 36 : if version_id == Some("null") {
1018 0 : return Err(TimeTravelError::Other(anyhow!(
1019 0 : "Received ListVersions response for key={key} with version_id='null', \
1020 0 : indicating either disabled versioning, or legacy objects with null version id values"
1021 0 : )));
1022 36 : }
1023 36 : tracing::trace!("Parsing version key={key} kind={:?}", vd.kind);
1024 :
1025 36 : vds_for_key.entry(key).or_default().push(vd);
1026 : }
1027 :
1028 6 : let warn_threshold = 3;
1029 6 : let max_retries = 10;
1030 6 : let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
1031 :
1032 24 : for (key, versions) in vds_for_key {
1033 18 : let last_vd = versions.last().unwrap();
1034 18 : let key = self.relative_path_to_s3_object(key);
1035 18 : if last_vd.last_modified > done_if_after {
1036 0 : tracing::trace!("Key {key} has version later than done_if_after, skipping");
1037 0 : continue;
1038 18 : }
1039 : // the version we want to restore to.
1040 18 : let version_to_restore_to =
1041 36 : match versions.binary_search_by_key(×tamp, |tpl| tpl.last_modified) {
1042 0 : Ok(v) => v,
1043 18 : Err(e) => e,
1044 : };
1045 18 : if version_to_restore_to == versions.len() {
1046 6 : tracing::trace!("Key {key} has no changes since timestamp, skipping");
1047 6 : continue;
1048 12 : }
1049 12 : let mut do_delete = false;
1050 12 : if version_to_restore_to == 0 {
1051 : // All versions more recent, so the key didn't exist at the specified time point.
1052 6 : tracing::trace!(
1053 0 : "All {} versions more recent for {key}, deleting",
1054 0 : versions.len()
1055 : );
1056 6 : do_delete = true;
1057 : } else {
1058 6 : match &versions[version_to_restore_to - 1] {
1059 : Version {
1060 6 : kind: VersionKind::Version(version_id),
1061 6 : ..
1062 6 : } => {
1063 6 : let version_id = &version_id.0;
1064 6 : tracing::trace!("Copying old version {version_id} for {key}...");
1065 : // Restore the state to the last version by copying
1066 6 : let source_id =
1067 6 : format!("{}/{key}?versionId={version_id}", self.bucket_name);
1068 6 :
1069 6 : backoff::retry(
1070 6 : || async {
1071 6 : let op = self
1072 6 : .client
1073 6 : .copy_object()
1074 6 : .bucket(self.bucket_name.clone())
1075 6 : .key(&key)
1076 6 : .set_storage_class(self.upload_storage_class.clone())
1077 6 : .copy_source(&source_id)
1078 6 : .send();
1079 6 :
1080 6 : tokio::select! {
1081 6 : res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
1082 6 : _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
1083 : }
1084 12 : },
1085 6 : is_permanent,
1086 6 : warn_threshold,
1087 6 : max_retries,
1088 6 : "copying object version for time_travel_recover",
1089 6 : cancel,
1090 6 : )
1091 6 : .await
1092 6 : .ok_or_else(|| TimeTravelError::Cancelled)
1093 6 : .and_then(|x| x)?;
1094 6 : tracing::info!(%version_id, %key, "Copied old version in S3");
1095 : }
1096 : Version {
1097 : kind: VersionKind::DeletionMarker,
1098 : ..
1099 0 : } => {
1100 0 : do_delete = true;
1101 0 : }
1102 : }
1103 : };
1104 12 : if do_delete {
1105 6 : if matches!(last_vd.kind, VersionKind::DeletionMarker) {
1106 : // Key has since been deleted (but there was some history), no need to do anything
1107 2 : tracing::trace!("Key {key} already deleted, skipping.");
1108 : } else {
1109 4 : tracing::trace!("Deleting {key}...");
1110 :
1111 4 : let oid = ObjectIdentifier::builder()
1112 4 : .key(key.to_owned())
1113 4 : .build()
1114 4 : .map_err(|e| TimeTravelError::Other(e.into()))?;
1115 :
1116 4 : self.delete_oids(&permit, &[oid], cancel)
1117 4 : .await
1118 4 : .map_err(|e| {
1119 0 : // delete_oid0 will use TimeoutOrCancel
1120 0 : if TimeoutOrCancel::caused_by_cancel(&e) {
1121 0 : TimeTravelError::Cancelled
1122 : } else {
1123 0 : TimeTravelError::Other(e)
1124 : }
1125 4 : })?;
1126 : }
1127 6 : }
1128 : }
1129 6 : Ok(())
1130 6 : }
1131 : }
1132 :
1133 : #[cfg(test)]
1134 : mod tests {
1135 : use std::num::NonZeroUsize;
1136 :
1137 : use camino::Utf8Path;
1138 :
1139 : use crate::{RemotePath, S3Bucket, S3Config};
1140 :
1141 : #[tokio::test]
1142 3 : async fn relative_path() {
1143 3 : let all_paths = ["", "some/path", "some/path/"];
1144 3 : let all_paths: Vec<RemotePath> = all_paths
1145 3 : .iter()
1146 9 : .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
1147 3 : .collect();
1148 3 : let prefixes = [
1149 3 : None,
1150 3 : Some(""),
1151 3 : Some("test/prefix"),
1152 3 : Some("test/prefix/"),
1153 3 : Some("/test/prefix/"),
1154 3 : ];
1155 3 : let expected_outputs = [
1156 3 : vec!["", "some/path", "some/path/"],
1157 3 : vec!["/", "/some/path", "/some/path/"],
1158 3 : vec![
1159 3 : "test/prefix/",
1160 3 : "test/prefix/some/path",
1161 3 : "test/prefix/some/path/",
1162 3 : ],
1163 3 : vec![
1164 3 : "test/prefix/",
1165 3 : "test/prefix/some/path",
1166 3 : "test/prefix/some/path/",
1167 3 : ],
1168 3 : vec![
1169 3 : "test/prefix/",
1170 3 : "test/prefix/some/path",
1171 3 : "test/prefix/some/path/",
1172 3 : ],
1173 3 : ];
1174 3 :
1175 15 : for (prefix_idx, prefix) in prefixes.iter().enumerate() {
1176 15 : let config = S3Config {
1177 15 : bucket_name: "bucket".to_owned(),
1178 15 : bucket_region: "region".to_owned(),
1179 15 : prefix_in_bucket: prefix.map(str::to_string),
1180 15 : endpoint: None,
1181 15 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
1182 15 : max_keys_per_list_response: Some(5),
1183 15 : upload_storage_class: None,
1184 15 : };
1185 15 : let storage = S3Bucket::new(&config, std::time::Duration::ZERO)
1186 15 : .await
1187 15 : .expect("remote storage init");
1188 45 : for (test_path_idx, test_path) in all_paths.iter().enumerate() {
1189 45 : let result = storage.relative_path_to_s3_object(test_path);
1190 45 : let expected = expected_outputs[prefix_idx][test_path_idx];
1191 45 : assert_eq!(result, expected);
1192 3 : }
1193 3 : }
1194 3 : }
1195 : }
|