Line data Source code
1 : //! AWS S3 storage wrapper around `rusoto` library.
2 : //!
3 : //! Respects `prefix_in_bucket` property from [`S3Config`],
4 : //! allowing multiple api users to independently work with the same S3 bucket, if
5 : //! their bucket prefixes are both specified and different.
6 :
7 : use std::sync::Arc;
8 :
9 : use anyhow::Context;
10 : use aws_config::{
11 : environment::credentials::EnvironmentVariableCredentialsProvider,
12 : imds::credentials::ImdsCredentialsProvider, meta::credentials::CredentialsProviderChain,
13 : provider_config::ProviderConfig, web_identity_token::WebIdentityTokenCredentialsProvider,
14 : };
15 : use aws_credential_types::cache::CredentialsCache;
16 : use aws_sdk_s3::{
17 : config::{Config, Region},
18 : error::SdkError,
19 : operation::get_object::GetObjectError,
20 : primitives::ByteStream,
21 : types::{Delete, ObjectIdentifier},
22 : Client,
23 : };
24 : use aws_smithy_http::body::SdkBody;
25 : use hyper::Body;
26 : use scopeguard::ScopeGuard;
27 : use tokio::{
28 : io::{self, AsyncRead},
29 : sync::Semaphore,
30 : };
31 : use tokio_util::io::ReaderStream;
32 : use tracing::debug;
33 :
34 : use super::StorageMetadata;
35 : use crate::{
36 : Download, DownloadError, RemotePath, RemoteStorage, S3Config, REMOTE_STORAGE_PREFIX_SEPARATOR,
37 : };
38 :
39 : const MAX_DELETE_OBJECTS_REQUEST_SIZE: usize = 1000;
40 :
41 : pub(super) mod metrics;
42 :
43 : use self::metrics::{AttemptOutcome, RequestKind};
44 :
45 : /// AWS S3 storage.
46 : pub struct S3Bucket {
47 : client: Client,
48 : bucket_name: String,
49 : prefix_in_bucket: Option<String>,
50 : max_keys_per_list_response: Option<i32>,
51 : // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
52 : // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
53 : // The helps to ensure we don't exceed the thresholds.
54 : concurrency_limiter: Arc<Semaphore>,
55 : }
56 :
57 0 : #[derive(Default)]
58 : struct GetObjectRequest {
59 : bucket: String,
60 : key: String,
61 : range: Option<String>,
62 : }
63 : impl S3Bucket {
64 : /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
65 241 : pub fn new(aws_config: &S3Config) -> anyhow::Result<Self> {
66 241 : debug!(
67 0 : "Creating s3 remote storage for S3 bucket {}",
68 0 : aws_config.bucket_name
69 0 : );
70 :
71 241 : let region = Some(Region::new(aws_config.bucket_region.clone()));
72 241 :
73 241 : let credentials_provider = {
74 241 : // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
75 241 : CredentialsProviderChain::first_try(
76 241 : "env",
77 241 : EnvironmentVariableCredentialsProvider::new(),
78 241 : )
79 241 : // uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
80 241 : // needed to access remote extensions bucket
81 241 : .or_else("token", {
82 241 : let provider_conf = ProviderConfig::without_region().with_region(region.clone());
83 241 :
84 241 : WebIdentityTokenCredentialsProvider::builder()
85 241 : .configure(&provider_conf)
86 241 : .build()
87 241 : })
88 241 : // uses imds v2
89 241 : .or_else("imds", ImdsCredentialsProvider::builder().build())
90 241 : };
91 241 :
92 241 : let mut config_builder = Config::builder()
93 241 : .region(region)
94 241 : .credentials_cache(CredentialsCache::lazy())
95 241 : .credentials_provider(credentials_provider);
96 :
97 241 : if let Some(custom_endpoint) = aws_config.endpoint.clone() {
98 114 : config_builder = config_builder
99 114 : .endpoint_url(custom_endpoint)
100 114 : .force_path_style(true);
101 127 : }
102 241 : let client = Client::from_conf(config_builder.build());
103 241 :
104 241 : let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| {
105 240 : let mut prefix = prefix;
106 241 : while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
107 1 : prefix = &prefix[1..]
108 : }
109 :
110 240 : let mut prefix = prefix.to_string();
111 242 : while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
112 2 : prefix.pop();
113 2 : }
114 240 : prefix
115 241 : });
116 241 : Ok(Self {
117 241 : client,
118 241 : bucket_name: aws_config.bucket_name.clone(),
119 241 : max_keys_per_list_response: aws_config.max_keys_per_list_response,
120 241 : prefix_in_bucket,
121 241 : concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())),
122 241 : })
123 241 : }
124 :
125 1098 : fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
126 1098 : let relative_path =
127 1098 : match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
128 1098 : Some(stripped) => stripped,
129 : // we rely on AWS to return properly prefixed paths
130 : // for requests with a certain prefix
131 0 : None => panic!(
132 0 : "Key {} does not start with bucket prefix {:?}",
133 0 : key, self.prefix_in_bucket
134 0 : ),
135 : };
136 1098 : RemotePath(
137 1098 : relative_path
138 1098 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
139 1098 : .collect(),
140 1098 : )
141 1098 : }
142 :
143 18844 : pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
144 18844 : assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
145 18844 : let path_string = path
146 18844 : .get_path()
147 18844 : .to_string_lossy()
148 18844 : .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR)
149 18844 : .to_string();
150 18844 : match &self.prefix_in_bucket {
151 18841 : Some(prefix) => prefix.clone() + "/" + &path_string,
152 3 : None => path_string,
153 : }
154 18844 : }
155 :
156 17346 : async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> {
157 17346 : let started_at = start_counting_cancelled_wait(kind);
158 17346 : let permit = self
159 17346 : .concurrency_limiter
160 17346 : .acquire()
161 1 : .await
162 17346 : .expect("semaphore is never closed");
163 17346 :
164 17346 : let started_at = ScopeGuard::into_inner(started_at);
165 17346 : metrics::BUCKET_METRICS
166 17346 : .wait_seconds
167 17346 : .observe_elapsed(kind, started_at);
168 17346 :
169 17346 : permit
170 17346 : }
171 :
172 761 : async fn owned_permit(&self, kind: RequestKind) -> tokio::sync::OwnedSemaphorePermit {
173 761 : let started_at = start_counting_cancelled_wait(kind);
174 761 : let permit = self
175 761 : .concurrency_limiter
176 761 : .clone()
177 761 : .acquire_owned()
178 0 : .await
179 761 : .expect("semaphore is never closed");
180 761 :
181 761 : let started_at = ScopeGuard::into_inner(started_at);
182 761 : metrics::BUCKET_METRICS
183 761 : .wait_seconds
184 761 : .observe_elapsed(kind, started_at);
185 761 : permit
186 761 : }
187 :
188 761 : async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
189 761 : let kind = RequestKind::Get;
190 761 : let permit = self.owned_permit(kind).await;
191 :
192 761 : let started_at = start_measuring_requests(kind);
193 :
194 761 : let get_object = self
195 761 : .client
196 761 : .get_object()
197 761 : .bucket(request.bucket)
198 761 : .key(request.key)
199 761 : .set_range(request.range)
200 761 : .send()
201 2736 : .await;
202 :
203 761 : let started_at = ScopeGuard::into_inner(started_at);
204 761 :
205 761 : if get_object.is_err() {
206 239 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
207 239 : kind,
208 239 : AttemptOutcome::Err,
209 239 : started_at,
210 239 : );
211 522 : }
212 :
213 239 : match get_object {
214 522 : Ok(object_output) => {
215 522 : let metadata = object_output.metadata().cloned().map(StorageMetadata);
216 522 : Ok(Download {
217 522 : metadata,
218 522 : download_stream: Box::pin(io::BufReader::new(TimedDownload::new(
219 522 : started_at,
220 522 : RatelimitedAsyncRead::new(permit, object_output.body.into_async_read()),
221 522 : ))),
222 522 : })
223 : }
224 239 : Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
225 239 : Err(DownloadError::NotFound)
226 : }
227 0 : Err(e) => Err(DownloadError::Other(
228 0 : anyhow::Error::new(e).context("download s3 object"),
229 0 : )),
230 : }
231 761 : }
232 : }
233 :
234 : pin_project_lite::pin_project! {
235 : /// An `AsyncRead` adapter which carries a permit for the lifetime of the value.
236 : struct RatelimitedAsyncRead<S> {
237 : permit: tokio::sync::OwnedSemaphorePermit,
238 : #[pin]
239 : inner: S,
240 : }
241 : }
242 :
243 : impl<S: AsyncRead> RatelimitedAsyncRead<S> {
244 522 : fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self {
245 522 : RatelimitedAsyncRead { permit, inner }
246 522 : }
247 : }
248 :
249 : impl<S: AsyncRead> AsyncRead for RatelimitedAsyncRead<S> {
250 169160 : fn poll_read(
251 169160 : self: std::pin::Pin<&mut Self>,
252 169160 : cx: &mut std::task::Context<'_>,
253 169160 : buf: &mut io::ReadBuf<'_>,
254 169160 : ) -> std::task::Poll<std::io::Result<()>> {
255 169160 : let this = self.project();
256 169160 : this.inner.poll_read(cx, buf)
257 169160 : }
258 : }
259 :
260 : pin_project_lite::pin_project! {
261 : /// Times and tracks the outcome of the request.
262 : struct TimedDownload<S> {
263 : started_at: std::time::Instant,
264 : outcome: metrics::AttemptOutcome,
265 : #[pin]
266 : inner: S
267 : }
268 :
269 : impl<S> PinnedDrop for TimedDownload<S> {
270 : fn drop(mut this: Pin<&mut Self>) {
271 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
272 : }
273 : }
274 : }
275 :
276 : impl<S: AsyncRead> TimedDownload<S> {
277 522 : fn new(started_at: std::time::Instant, inner: S) -> Self {
278 522 : TimedDownload {
279 522 : started_at,
280 522 : outcome: metrics::AttemptOutcome::Cancelled,
281 522 : inner,
282 522 : }
283 522 : }
284 : }
285 :
286 : impl<S: AsyncRead> AsyncRead for TimedDownload<S> {
287 169160 : fn poll_read(
288 169160 : self: std::pin::Pin<&mut Self>,
289 169160 : cx: &mut std::task::Context<'_>,
290 169160 : buf: &mut io::ReadBuf<'_>,
291 169160 : ) -> std::task::Poll<std::io::Result<()>> {
292 169160 : let this = self.project();
293 169160 : let before = buf.filled().len();
294 169160 : let read = std::task::ready!(this.inner.poll_read(cx, buf));
295 :
296 133819 : let read_eof = buf.filled().len() == before;
297 :
298 133819 : match read {
299 512 : Ok(()) if read_eof => *this.outcome = AttemptOutcome::Ok,
300 133307 : Ok(()) => { /* still in progress */ }
301 0 : Err(_) => *this.outcome = AttemptOutcome::Err,
302 : }
303 :
304 133819 : std::task::Poll::Ready(read)
305 169160 : }
306 : }
307 :
308 : #[async_trait::async_trait]
309 : impl RemoteStorage for S3Bucket {
310 : /// See the doc for `RemoteStorage::list_prefixes`
311 : /// Note: it wont include empty "directories"
312 20 : async fn list_prefixes(
313 20 : &self,
314 20 : prefix: Option<&RemotePath>,
315 20 : ) -> Result<Vec<RemotePath>, DownloadError> {
316 20 : let kind = RequestKind::List;
317 20 :
318 20 : // get the passed prefix or if it is not set use prefix_in_bucket value
319 20 : let list_prefix = prefix
320 20 : .map(|p| self.relative_path_to_s3_object(p))
321 20 : .or_else(|| self.prefix_in_bucket.clone())
322 20 : .map(|mut p| {
323 20 : // required to end with a separator
324 20 : // otherwise request will return only the entry of a prefix
325 20 : if !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
326 20 : p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
327 20 : }
328 20 : p
329 20 : });
330 20 :
331 20 : let mut document_keys = Vec::new();
332 20 :
333 20 : let mut continuation_token = None;
334 :
335 : loop {
336 20 : let _guard = self.permit(kind).await;
337 20 : let started_at = start_measuring_requests(kind);
338 :
339 20 : let fetch_response = self
340 20 : .client
341 20 : .list_objects_v2()
342 20 : .bucket(self.bucket_name.clone())
343 20 : .set_prefix(list_prefix.clone())
344 20 : .set_continuation_token(continuation_token)
345 20 : .delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string())
346 20 : .set_max_keys(self.max_keys_per_list_response)
347 20 : .send()
348 65 : .await
349 20 : .context("Failed to list S3 prefixes")
350 20 : .map_err(DownloadError::Other);
351 20 :
352 20 : let started_at = ScopeGuard::into_inner(started_at);
353 20 :
354 20 : metrics::BUCKET_METRICS
355 20 : .req_seconds
356 20 : .observe_elapsed(kind, &fetch_response, started_at);
357 :
358 20 : let fetch_response = fetch_response?;
359 :
360 20 : document_keys.extend(
361 20 : fetch_response
362 20 : .common_prefixes
363 20 : .unwrap_or_default()
364 20 : .into_iter()
365 30 : .filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))),
366 20 : );
367 :
368 20 : continuation_token = match fetch_response.next_continuation_token {
369 0 : Some(new_token) => Some(new_token),
370 20 : None => break,
371 20 : };
372 20 : }
373 20 :
374 20 : Ok(document_keys)
375 40 : }
376 :
377 : /// See the doc for `RemoteStorage::list_files`
378 188 : async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
379 188 : let kind = RequestKind::List;
380 188 :
381 188 : let folder_name = folder
382 188 : .map(|p| self.relative_path_to_s3_object(p))
383 188 : .or_else(|| self.prefix_in_bucket.clone());
384 188 :
385 188 : // AWS may need to break the response into several parts
386 188 : let mut continuation_token = None;
387 188 : let mut all_files = vec![];
388 : loop {
389 188 : let _guard = self.permit(kind).await;
390 188 : let started_at = start_measuring_requests(kind);
391 :
392 188 : let response = self
393 188 : .client
394 188 : .list_objects_v2()
395 188 : .bucket(self.bucket_name.clone())
396 188 : .set_prefix(folder_name.clone())
397 188 : .set_continuation_token(continuation_token)
398 188 : .set_max_keys(self.max_keys_per_list_response)
399 188 : .send()
400 1142 : .await
401 188 : .context("Failed to list files in S3 bucket");
402 188 :
403 188 : let started_at = ScopeGuard::into_inner(started_at);
404 188 : metrics::BUCKET_METRICS
405 188 : .req_seconds
406 188 : .observe_elapsed(kind, &response, started_at);
407 :
408 188 : let response = response?;
409 :
410 1068 : for object in response.contents().unwrap_or_default() {
411 1068 : let object_path = object.key().expect("response does not contain a key");
412 1068 : let remote_path = self.s3_object_to_relative_path(object_path);
413 1068 : all_files.push(remote_path);
414 1068 : }
415 188 : match response.next_continuation_token {
416 0 : Some(new_token) => continuation_token = Some(new_token),
417 188 : None => break,
418 188 : }
419 188 : }
420 188 : Ok(all_files)
421 376 : }
422 :
423 10490 : async fn upload(
424 10490 : &self,
425 10490 : from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
426 10490 : from_size_bytes: usize,
427 10490 : to: &RemotePath,
428 10490 : metadata: Option<StorageMetadata>,
429 10490 : ) -> anyhow::Result<()> {
430 10490 : let kind = RequestKind::Put;
431 10490 : let _guard = self.permit(kind).await;
432 :
433 10490 : let started_at = start_measuring_requests(kind);
434 10490 :
435 10490 : let body = Body::wrap_stream(ReaderStream::new(from));
436 10490 : let bytes_stream = ByteStream::new(SdkBody::from(body));
437 :
438 10490 : let res = self
439 10490 : .client
440 10490 : .put_object()
441 10490 : .bucket(self.bucket_name.clone())
442 10490 : .key(self.relative_path_to_s3_object(to))
443 10490 : .set_metadata(metadata.map(|m| m.0))
444 10490 : .content_length(from_size_bytes.try_into()?)
445 10490 : .body(bytes_stream)
446 10490 : .send()
447 37479 : .await;
448 :
449 10482 : let started_at = ScopeGuard::into_inner(started_at);
450 10482 : metrics::BUCKET_METRICS
451 10482 : .req_seconds
452 10482 : .observe_elapsed(kind, &res, started_at);
453 10482 :
454 10482 : res?;
455 :
456 10481 : Ok(())
457 20972 : }
458 :
459 757 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
460 : // if prefix is not none then download file `prefix/from`
461 : // if prefix is none then download file `from`
462 757 : self.download_object(GetObjectRequest {
463 757 : bucket: self.bucket_name.clone(),
464 757 : key: self.relative_path_to_s3_object(from),
465 757 : range: None,
466 757 : })
467 2719 : .await
468 1514 : }
469 :
470 4 : async fn download_byte_range(
471 4 : &self,
472 4 : from: &RemotePath,
473 4 : start_inclusive: u64,
474 4 : end_exclusive: Option<u64>,
475 4 : ) -> Result<Download, DownloadError> {
476 : // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
477 : // and needs both ends to be exclusive
478 4 : let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
479 4 : let range = Some(match end_inclusive {
480 0 : Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
481 4 : None => format!("bytes={start_inclusive}-"),
482 : });
483 :
484 4 : self.download_object(GetObjectRequest {
485 4 : bucket: self.bucket_name.clone(),
486 4 : key: self.relative_path_to_s3_object(from),
487 4 : range,
488 4 : })
489 17 : .await
490 8 : }
491 6648 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
492 6648 : let kind = RequestKind::Delete;
493 6648 : let _guard = self.permit(kind).await;
494 :
495 6648 : let mut delete_objects = Vec::with_capacity(paths.len());
496 14018 : for path in paths {
497 7370 : let obj_id = ObjectIdentifier::builder()
498 7370 : .set_key(Some(self.relative_path_to_s3_object(path)))
499 7370 : .build();
500 7370 : delete_objects.push(obj_id);
501 7370 : }
502 :
503 6648 : for chunk in delete_objects.chunks(MAX_DELETE_OBJECTS_REQUEST_SIZE) {
504 6648 : let started_at = start_measuring_requests(kind);
505 :
506 6648 : let resp = self
507 6648 : .client
508 6648 : .delete_objects()
509 6648 : .bucket(self.bucket_name.clone())
510 6648 : .delete(Delete::builder().set_objects(Some(chunk.to_vec())).build())
511 6648 : .send()
512 30692 : .await;
513 :
514 6648 : let started_at = ScopeGuard::into_inner(started_at);
515 6648 : metrics::BUCKET_METRICS
516 6648 : .req_seconds
517 6648 : .observe_elapsed(kind, &resp, started_at);
518 6648 :
519 6648 : match resp {
520 6648 : Ok(resp) => {
521 6648 : metrics::BUCKET_METRICS
522 6648 : .deleted_objects_total
523 6648 : .inc_by(chunk.len() as u64);
524 6648 : if let Some(errors) = resp.errors {
525 0 : return Err(anyhow::format_err!(
526 0 : "Failed to delete {} objects",
527 0 : errors.len()
528 0 : ));
529 6648 : }
530 : }
531 0 : Err(e) => {
532 0 : return Err(e.into());
533 : }
534 : }
535 : }
536 6648 : Ok(())
537 13296 : }
538 :
539 6610 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
540 6610 : let paths = std::array::from_ref(path);
541 30569 : self.delete_objects(paths).await
542 13220 : }
543 : }
544 :
545 : /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].
546 18107 : fn start_counting_cancelled_wait(
547 18107 : kind: RequestKind,
548 18107 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
549 18107 : scopeguard::guard_on_success(std::time::Instant::now(), move |_| {
550 0 : metrics::BUCKET_METRICS.cancelled_waits.get(kind).inc()
551 18107 : })
552 18107 : }
553 :
554 : /// On drop (cancellation) add time to [`metrics::BucketMetrics::req_seconds`].
555 18107 : fn start_measuring_requests(
556 18107 : kind: RequestKind,
557 18107 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
558 18107 : scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| {
559 3 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
560 3 : kind,
561 3 : AttemptOutcome::Cancelled,
562 3 : started_at,
563 3 : )
564 18107 : })
565 18107 : }
566 :
567 : #[cfg(test)]
568 : mod tests {
569 : use std::num::NonZeroUsize;
570 : use std::path::Path;
571 :
572 : use crate::{RemotePath, S3Bucket, S3Config};
573 :
574 1 : #[test]
575 1 : fn relative_path() {
576 1 : let all_paths = vec!["", "some/path", "some/path/"];
577 1 : let all_paths: Vec<RemotePath> = all_paths
578 1 : .iter()
579 3 : .map(|x| RemotePath::new(Path::new(x)).expect("bad path"))
580 1 : .collect();
581 1 : let prefixes = [
582 1 : None,
583 1 : Some(""),
584 1 : Some("test/prefix"),
585 1 : Some("test/prefix/"),
586 1 : Some("/test/prefix/"),
587 1 : ];
588 1 : let expected_outputs = vec![
589 1 : vec!["", "some/path", "some/path"],
590 1 : vec!["/", "/some/path", "/some/path"],
591 1 : vec![
592 1 : "test/prefix/",
593 1 : "test/prefix/some/path",
594 1 : "test/prefix/some/path",
595 1 : ],
596 1 : vec![
597 1 : "test/prefix/",
598 1 : "test/prefix/some/path",
599 1 : "test/prefix/some/path",
600 1 : ],
601 1 : vec![
602 1 : "test/prefix/",
603 1 : "test/prefix/some/path",
604 1 : "test/prefix/some/path",
605 1 : ],
606 1 : ];
607 :
608 5 : for (prefix_idx, prefix) in prefixes.iter().enumerate() {
609 5 : let config = S3Config {
610 5 : bucket_name: "bucket".to_owned(),
611 5 : bucket_region: "region".to_owned(),
612 5 : prefix_in_bucket: prefix.map(str::to_string),
613 5 : endpoint: None,
614 5 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
615 5 : max_keys_per_list_response: Some(5),
616 5 : };
617 5 : let storage = S3Bucket::new(&config).expect("remote storage init");
618 15 : for (test_path_idx, test_path) in all_paths.iter().enumerate() {
619 15 : let result = storage.relative_path_to_s3_object(test_path);
620 15 : let expected = expected_outputs[prefix_idx][test_path_idx];
621 15 : assert_eq!(result, expected);
622 : }
623 : }
624 1 : }
625 : }
|