TLA Line data Source code
1 : //! AWS S3 storage wrapper around `rusoto` library.
2 : //!
3 : //! Respects `prefix_in_bucket` property from [`S3Config`],
4 : //! allowing multiple api users to independently work with the same S3 bucket, if
5 : //! their bucket prefixes are both specified and different.
6 :
7 : use std::borrow::Cow;
8 :
9 : use anyhow::Context;
10 : use aws_config::{
11 : environment::credentials::EnvironmentVariableCredentialsProvider,
12 : imds::credentials::ImdsCredentialsProvider, meta::credentials::CredentialsProviderChain,
13 : provider_config::ProviderConfig, web_identity_token::WebIdentityTokenCredentialsProvider,
14 : };
15 : use aws_credential_types::cache::CredentialsCache;
16 : use aws_sdk_s3::{
17 : config::{Config, Region},
18 : error::SdkError,
19 : operation::get_object::GetObjectError,
20 : primitives::ByteStream,
21 : types::{Delete, ObjectIdentifier},
22 : Client,
23 : };
24 : use aws_smithy_http::body::SdkBody;
25 : use hyper::Body;
26 : use scopeguard::ScopeGuard;
27 : use tokio::io::{self, AsyncRead};
28 : use tokio_util::io::ReaderStream;
29 : use tracing::debug;
30 :
31 : use super::StorageMetadata;
32 : use crate::{
33 : ConcurrencyLimiter, Download, DownloadError, RemotePath, RemoteStorage, S3Config,
34 : MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
35 : };
36 :
37 : pub(super) mod metrics;
38 :
39 : use self::metrics::AttemptOutcome;
40 : pub(super) use self::metrics::RequestKind;
41 :
42 : /// AWS S3 storage.
43 : pub struct S3Bucket {
44 : client: Client,
45 : bucket_name: String,
46 : prefix_in_bucket: Option<String>,
47 : max_keys_per_list_response: Option<i32>,
48 : concurrency_limiter: ConcurrencyLimiter,
49 : }
50 :
51 UBC 0 : #[derive(Default)]
52 : struct GetObjectRequest {
53 : bucket: String,
54 : key: String,
55 : range: Option<String>,
56 : }
57 : impl S3Bucket {
58 : /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
59 CBC 270 : pub fn new(aws_config: &S3Config) -> anyhow::Result<Self> {
60 270 : debug!(
61 UBC 0 : "Creating s3 remote storage for S3 bucket {}",
62 0 : aws_config.bucket_name
63 0 : );
64 :
65 CBC 270 : let region = Some(Region::new(aws_config.bucket_region.clone()));
66 270 :
67 270 : let credentials_provider = {
68 270 : // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
69 270 : CredentialsProviderChain::first_try(
70 270 : "env",
71 270 : EnvironmentVariableCredentialsProvider::new(),
72 270 : )
73 270 : // uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
74 270 : // needed to access remote extensions bucket
75 270 : .or_else("token", {
76 270 : let provider_conf = ProviderConfig::without_region().with_region(region.clone());
77 270 :
78 270 : WebIdentityTokenCredentialsProvider::builder()
79 270 : .configure(&provider_conf)
80 270 : .build()
81 270 : })
82 270 : // uses imds v2
83 270 : .or_else("imds", ImdsCredentialsProvider::builder().build())
84 270 : };
85 270 :
86 270 : let mut config_builder = Config::builder()
87 270 : .region(region)
88 270 : .credentials_cache(CredentialsCache::lazy())
89 270 : .credentials_provider(credentials_provider);
90 :
91 270 : if let Some(custom_endpoint) = aws_config.endpoint.clone() {
92 137 : config_builder = config_builder
93 137 : .endpoint_url(custom_endpoint)
94 137 : .force_path_style(true);
95 137 : }
96 270 : let client = Client::from_conf(config_builder.build());
97 270 :
98 270 : let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| {
99 269 : let mut prefix = prefix;
100 270 : while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
101 1 : prefix = &prefix[1..]
102 : }
103 :
104 269 : let mut prefix = prefix.to_string();
105 271 : while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
106 2 : prefix.pop();
107 2 : }
108 269 : prefix
109 270 : });
110 270 : Ok(Self {
111 270 : client,
112 270 : bucket_name: aws_config.bucket_name.clone(),
113 270 : max_keys_per_list_response: aws_config.max_keys_per_list_response,
114 270 : prefix_in_bucket,
115 270 : concurrency_limiter: ConcurrencyLimiter::new(aws_config.concurrency_limit.get()),
116 270 : })
117 270 : }
118 :
119 1304 : fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
120 1304 : let relative_path =
121 1304 : match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
122 1304 : Some(stripped) => stripped,
123 : // we rely on AWS to return properly prefixed paths
124 : // for requests with a certain prefix
125 UBC 0 : None => panic!(
126 0 : "Key {} does not start with bucket prefix {:?}",
127 0 : key, self.prefix_in_bucket
128 0 : ),
129 : };
130 CBC 1304 : RemotePath(
131 1304 : relative_path
132 1304 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
133 1304 : .collect(),
134 1304 : )
135 1304 : }
136 :
137 24181 : pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
138 24181 : assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
139 24181 : let path_string = path
140 24181 : .get_path()
141 24181 : .as_str()
142 24181 : .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR);
143 24181 : match &self.prefix_in_bucket {
144 24178 : Some(prefix) => prefix.clone() + "/" + path_string,
145 3 : None => path_string.to_string(),
146 : }
147 24181 : }
148 :
149 16943 : async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> {
150 16943 : let started_at = start_counting_cancelled_wait(kind);
151 16943 : let permit = self
152 16943 : .concurrency_limiter
153 16943 : .acquire(kind)
154 2412 : .await
155 16943 : .expect("semaphore is never closed");
156 16943 :
157 16943 : let started_at = ScopeGuard::into_inner(started_at);
158 16943 : metrics::BUCKET_METRICS
159 16943 : .wait_seconds
160 16943 : .observe_elapsed(kind, started_at);
161 16943 :
162 16943 : permit
163 16943 : }
164 :
165 1198 : async fn owned_permit(&self, kind: RequestKind) -> tokio::sync::OwnedSemaphorePermit {
166 1198 : let started_at = start_counting_cancelled_wait(kind);
167 1198 : let permit = self
168 1198 : .concurrency_limiter
169 1198 : .acquire_owned(kind)
170 UBC 0 : .await
171 CBC 1198 : .expect("semaphore is never closed");
172 1198 :
173 1198 : let started_at = ScopeGuard::into_inner(started_at);
174 1198 : metrics::BUCKET_METRICS
175 1198 : .wait_seconds
176 1198 : .observe_elapsed(kind, started_at);
177 1198 : permit
178 1198 : }
179 :
180 1198 : async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
181 1198 : let kind = RequestKind::Get;
182 1198 : let permit = self.owned_permit(kind).await;
183 :
184 1198 : let started_at = start_measuring_requests(kind);
185 :
186 1198 : let get_object = self
187 1198 : .client
188 1198 : .get_object()
189 1198 : .bucket(request.bucket)
190 1198 : .key(request.key)
191 1198 : .set_range(request.range)
192 1198 : .send()
193 4079 : .await;
194 :
195 1198 : let started_at = ScopeGuard::into_inner(started_at);
196 1198 :
197 1198 : if get_object.is_err() {
198 286 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
199 286 : kind,
200 286 : AttemptOutcome::Err,
201 286 : started_at,
202 286 : );
203 912 : }
204 :
205 286 : match get_object {
206 912 : Ok(object_output) => {
207 912 : let metadata = object_output.metadata().cloned().map(StorageMetadata);
208 912 : Ok(Download {
209 912 : metadata,
210 912 : download_stream: Box::pin(io::BufReader::new(TimedDownload::new(
211 912 : started_at,
212 912 : RatelimitedAsyncRead::new(permit, object_output.body.into_async_read()),
213 912 : ))),
214 912 : })
215 : }
216 286 : Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
217 286 : Err(DownloadError::NotFound)
218 : }
219 UBC 0 : Err(e) => Err(DownloadError::Other(
220 0 : anyhow::Error::new(e).context("download s3 object"),
221 0 : )),
222 : }
223 CBC 1198 : }
224 : }
225 :
226 : pin_project_lite::pin_project! {
227 : /// An `AsyncRead` adapter which carries a permit for the lifetime of the value.
228 : struct RatelimitedAsyncRead<S> {
229 : permit: tokio::sync::OwnedSemaphorePermit,
230 : #[pin]
231 : inner: S,
232 : }
233 : }
234 :
235 : impl<S: AsyncRead> RatelimitedAsyncRead<S> {
236 912 : fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self {
237 912 : RatelimitedAsyncRead { permit, inner }
238 912 : }
239 : }
240 :
241 : impl<S: AsyncRead> AsyncRead for RatelimitedAsyncRead<S> {
242 278043 : fn poll_read(
243 278043 : self: std::pin::Pin<&mut Self>,
244 278043 : cx: &mut std::task::Context<'_>,
245 278043 : buf: &mut io::ReadBuf<'_>,
246 278043 : ) -> std::task::Poll<std::io::Result<()>> {
247 278043 : let this = self.project();
248 278043 : this.inner.poll_read(cx, buf)
249 278043 : }
250 : }
251 :
252 : pin_project_lite::pin_project! {
253 : /// Times and tracks the outcome of the request.
254 : struct TimedDownload<S> {
255 : started_at: std::time::Instant,
256 : outcome: metrics::AttemptOutcome,
257 : #[pin]
258 : inner: S
259 : }
260 :
261 : impl<S> PinnedDrop for TimedDownload<S> {
262 : fn drop(mut this: Pin<&mut Self>) {
263 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
264 : }
265 : }
266 : }
267 :
268 : impl<S: AsyncRead> TimedDownload<S> {
269 912 : fn new(started_at: std::time::Instant, inner: S) -> Self {
270 912 : TimedDownload {
271 912 : started_at,
272 912 : outcome: metrics::AttemptOutcome::Cancelled,
273 912 : inner,
274 912 : }
275 912 : }
276 : }
277 :
278 : impl<S: AsyncRead> AsyncRead for TimedDownload<S> {
279 278043 : fn poll_read(
280 278043 : self: std::pin::Pin<&mut Self>,
281 278043 : cx: &mut std::task::Context<'_>,
282 278043 : buf: &mut io::ReadBuf<'_>,
283 278043 : ) -> std::task::Poll<std::io::Result<()>> {
284 278043 : let this = self.project();
285 278043 : let before = buf.filled().len();
286 278043 : let read = std::task::ready!(this.inner.poll_read(cx, buf));
287 :
288 202139 : let read_eof = buf.filled().len() == before;
289 :
290 202139 : match read {
291 902 : Ok(()) if read_eof => *this.outcome = AttemptOutcome::Ok,
292 201237 : Ok(()) => { /* still in progress */ }
293 UBC 0 : Err(_) => *this.outcome = AttemptOutcome::Err,
294 : }
295 :
296 CBC 202139 : std::task::Poll::Ready(read)
297 278043 : }
298 : }
299 :
300 : #[async_trait::async_trait]
301 : impl RemoteStorage for S3Bucket {
302 : /// See the doc for `RemoteStorage::list_prefixes`
303 : /// Note: it wont include empty "directories"
304 22 : async fn list_prefixes(
305 22 : &self,
306 22 : prefix: Option<&RemotePath>,
307 22 : ) -> Result<Vec<RemotePath>, DownloadError> {
308 22 : let kind = RequestKind::List;
309 22 :
310 22 : // get the passed prefix or if it is not set use prefix_in_bucket value
311 22 : let list_prefix = prefix
312 22 : .map(|p| self.relative_path_to_s3_object(p))
313 22 : .or_else(|| self.prefix_in_bucket.clone())
314 22 : .map(|mut p| {
315 22 : // required to end with a separator
316 22 : // otherwise request will return only the entry of a prefix
317 22 : if !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
318 22 : p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
319 22 : }
320 22 : p
321 22 : });
322 22 :
323 22 : let mut document_keys = Vec::new();
324 22 :
325 22 : let mut continuation_token = None;
326 :
327 : loop {
328 22 : let _guard = self.permit(kind).await;
329 22 : let started_at = start_measuring_requests(kind);
330 :
331 22 : let fetch_response = self
332 22 : .client
333 22 : .list_objects_v2()
334 22 : .bucket(self.bucket_name.clone())
335 22 : .set_prefix(list_prefix.clone())
336 22 : .set_continuation_token(continuation_token)
337 22 : .delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string())
338 22 : .set_max_keys(self.max_keys_per_list_response)
339 22 : .send()
340 67 : .await
341 22 : .context("Failed to list S3 prefixes")
342 22 : .map_err(DownloadError::Other);
343 22 :
344 22 : let started_at = ScopeGuard::into_inner(started_at);
345 22 :
346 22 : metrics::BUCKET_METRICS
347 22 : .req_seconds
348 22 : .observe_elapsed(kind, &fetch_response, started_at);
349 :
350 22 : let fetch_response = fetch_response?;
351 :
352 22 : document_keys.extend(
353 22 : fetch_response
354 22 : .common_prefixes
355 22 : .unwrap_or_default()
356 22 : .into_iter()
357 32 : .filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))),
358 22 : );
359 :
360 22 : continuation_token = match fetch_response.next_continuation_token {
361 UBC 0 : Some(new_token) => Some(new_token),
362 CBC 22 : None => break,
363 22 : };
364 22 : }
365 22 :
366 22 : Ok(document_keys)
367 44 : }
368 :
369 : /// See the doc for `RemoteStorage::list_files`
370 193 : async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
371 193 : let kind = RequestKind::List;
372 193 :
373 193 : let folder_name = folder
374 193 : .map(|p| self.relative_path_to_s3_object(p))
375 193 : .or_else(|| self.prefix_in_bucket.clone());
376 193 :
377 193 : // AWS may need to break the response into several parts
378 193 : let mut continuation_token = None;
379 193 : let mut all_files = vec![];
380 : loop {
381 193 : let _guard = self.permit(kind).await;
382 193 : let started_at = start_measuring_requests(kind);
383 :
384 193 : let response = self
385 193 : .client
386 193 : .list_objects_v2()
387 193 : .bucket(self.bucket_name.clone())
388 193 : .set_prefix(folder_name.clone())
389 193 : .set_continuation_token(continuation_token)
390 193 : .set_max_keys(self.max_keys_per_list_response)
391 193 : .send()
392 817 : .await
393 193 : .context("Failed to list files in S3 bucket");
394 193 :
395 193 : let started_at = ScopeGuard::into_inner(started_at);
396 193 : metrics::BUCKET_METRICS
397 193 : .req_seconds
398 193 : .observe_elapsed(kind, &response, started_at);
399 :
400 193 : let response = response?;
401 :
402 1272 : for object in response.contents().unwrap_or_default() {
403 1272 : let object_path = object.key().expect("response does not contain a key");
404 1272 : let remote_path = self.s3_object_to_relative_path(object_path);
405 1272 : all_files.push(remote_path);
406 1272 : }
407 193 : match response.next_continuation_token {
408 UBC 0 : Some(new_token) => continuation_token = Some(new_token),
409 CBC 193 : None => break,
410 193 : }
411 193 : }
412 193 : Ok(all_files)
413 386 : }
414 :
415 14483 : async fn upload(
416 14483 : &self,
417 14483 : from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
418 14483 : from_size_bytes: usize,
419 14483 : to: &RemotePath,
420 14483 : metadata: Option<StorageMetadata>,
421 14483 : ) -> anyhow::Result<()> {
422 14483 : let kind = RequestKind::Put;
423 14483 : let _guard = self.permit(kind).await;
424 :
425 14483 : let started_at = start_measuring_requests(kind);
426 14483 :
427 14483 : let body = Body::wrap_stream(ReaderStream::new(from));
428 14483 : let bytes_stream = ByteStream::new(SdkBody::from(body));
429 :
430 14483 : let res = self
431 14483 : .client
432 14483 : .put_object()
433 14483 : .bucket(self.bucket_name.clone())
434 14483 : .key(self.relative_path_to_s3_object(to))
435 14483 : .set_metadata(metadata.map(|m| m.0))
436 14483 : .content_length(from_size_bytes.try_into()?)
437 14483 : .body(bytes_stream)
438 14483 : .send()
439 48259 : .await;
440 :
441 14412 : let started_at = ScopeGuard::into_inner(started_at);
442 14412 : metrics::BUCKET_METRICS
443 14412 : .req_seconds
444 14412 : .observe_elapsed(kind, &res, started_at);
445 14412 :
446 14412 : res?;
447 :
448 14409 : Ok(())
449 28895 : }
450 :
451 1194 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
452 : // if prefix is not none then download file `prefix/from`
453 : // if prefix is none then download file `from`
454 1194 : self.download_object(GetObjectRequest {
455 1194 : bucket: self.bucket_name.clone(),
456 1194 : key: self.relative_path_to_s3_object(from),
457 1194 : range: None,
458 1194 : })
459 4060 : .await
460 2388 : }
461 :
462 4 : async fn download_byte_range(
463 4 : &self,
464 4 : from: &RemotePath,
465 4 : start_inclusive: u64,
466 4 : end_exclusive: Option<u64>,
467 4 : ) -> Result<Download, DownloadError> {
468 : // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
469 : // and needs both ends to be exclusive
470 4 : let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
471 4 : let range = Some(match end_inclusive {
472 UBC 0 : Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
473 CBC 4 : None => format!("bytes={start_inclusive}-"),
474 : });
475 :
476 4 : self.download_object(GetObjectRequest {
477 4 : bucket: self.bucket_name.clone(),
478 4 : key: self.relative_path_to_s3_object(from),
479 4 : range,
480 4 : })
481 19 : .await
482 8 : }
483 2245 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
484 2245 : let kind = RequestKind::Delete;
485 2245 : let _guard = self.permit(kind).await;
486 :
487 2245 : let mut delete_objects = Vec::with_capacity(paths.len());
488 10515 : for path in paths {
489 8270 : let obj_id = ObjectIdentifier::builder()
490 8270 : .set_key(Some(self.relative_path_to_s3_object(path)))
491 8270 : .build();
492 8270 : delete_objects.push(obj_id);
493 8270 : }
494 :
495 2245 : for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
496 2245 : let started_at = start_measuring_requests(kind);
497 :
498 2245 : let resp = self
499 2245 : .client
500 2245 : .delete_objects()
501 2245 : .bucket(self.bucket_name.clone())
502 2245 : .delete(Delete::builder().set_objects(Some(chunk.to_vec())).build())
503 2245 : .send()
504 9181 : .await;
505 :
506 2245 : let started_at = ScopeGuard::into_inner(started_at);
507 2245 : metrics::BUCKET_METRICS
508 2245 : .req_seconds
509 2245 : .observe_elapsed(kind, &resp, started_at);
510 2245 :
511 2245 : match resp {
512 2245 : Ok(resp) => {
513 2245 : metrics::BUCKET_METRICS
514 2245 : .deleted_objects_total
515 2245 : .inc_by(chunk.len() as u64);
516 2245 : if let Some(errors) = resp.errors {
517 : // Log a bounded number of the errors within the response:
518 : // these requests can carry 1000 keys so logging each one
519 : // would be too verbose, especially as errors may lead us
520 : // to retry repeatedly.
521 : const LOG_UP_TO_N_ERRORS: usize = 10;
522 LBC (1) : for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
523 (1) : tracing::warn!(
524 (1) : "DeleteObjects key {} failed: {}: {}",
525 (1) : e.key.as_ref().map(Cow::from).unwrap_or("".into()),
526 (1) : e.code.as_ref().map(Cow::from).unwrap_or("".into()),
527 (1) : e.message.as_ref().map(Cow::from).unwrap_or("".into())
528 (1) : );
529 : }
530 :
531 (1) : return Err(anyhow::format_err!(
532 (1) : "Failed to delete {} objects",
533 (1) : errors.len()
534 (1) : ));
535 CBC 2245 : }
536 : }
537 UBC 0 : Err(e) => {
538 0 : return Err(e.into());
539 : }
540 : }
541 : }
542 CBC 2245 : Ok(())
543 4490 : }
544 :
545 1931 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
546 1931 : let paths = std::array::from_ref(path);
547 7911 : self.delete_objects(paths).await
548 3862 : }
549 : }
550 :
551 : /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].
552 18141 : fn start_counting_cancelled_wait(
553 18141 : kind: RequestKind,
554 18141 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
555 18141 : scopeguard::guard_on_success(std::time::Instant::now(), move |_| {
556 UBC 0 : metrics::BUCKET_METRICS.cancelled_waits.get(kind).inc()
557 CBC 18141 : })
558 18141 : }
559 :
560 : /// On drop (cancellation) add time to [`metrics::BucketMetrics::req_seconds`].
561 18141 : fn start_measuring_requests(
562 18141 : kind: RequestKind,
563 18141 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
564 18141 : scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| {
565 UBC 0 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
566 0 : kind,
567 0 : AttemptOutcome::Cancelled,
568 0 : started_at,
569 0 : )
570 CBC 18141 : })
571 18141 : }
572 :
573 : #[cfg(test)]
574 : mod tests {
575 : use camino::Utf8Path;
576 : use std::num::NonZeroUsize;
577 :
578 : use crate::{RemotePath, S3Bucket, S3Config};
579 :
580 1 : #[test]
581 1 : fn relative_path() {
582 1 : let all_paths = ["", "some/path", "some/path/"];
583 1 : let all_paths: Vec<RemotePath> = all_paths
584 1 : .iter()
585 3 : .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
586 1 : .collect();
587 1 : let prefixes = [
588 1 : None,
589 1 : Some(""),
590 1 : Some("test/prefix"),
591 1 : Some("test/prefix/"),
592 1 : Some("/test/prefix/"),
593 1 : ];
594 1 : let expected_outputs = vec![
595 1 : vec!["", "some/path", "some/path"],
596 1 : vec!["/", "/some/path", "/some/path"],
597 1 : vec![
598 1 : "test/prefix/",
599 1 : "test/prefix/some/path",
600 1 : "test/prefix/some/path",
601 1 : ],
602 1 : vec![
603 1 : "test/prefix/",
604 1 : "test/prefix/some/path",
605 1 : "test/prefix/some/path",
606 1 : ],
607 1 : vec![
608 1 : "test/prefix/",
609 1 : "test/prefix/some/path",
610 1 : "test/prefix/some/path",
611 1 : ],
612 1 : ];
613 :
614 5 : for (prefix_idx, prefix) in prefixes.iter().enumerate() {
615 5 : let config = S3Config {
616 5 : bucket_name: "bucket".to_owned(),
617 5 : bucket_region: "region".to_owned(),
618 5 : prefix_in_bucket: prefix.map(str::to_string),
619 5 : endpoint: None,
620 5 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
621 5 : max_keys_per_list_response: Some(5),
622 5 : };
623 5 : let storage = S3Bucket::new(&config).expect("remote storage init");
624 15 : for (test_path_idx, test_path) in all_paths.iter().enumerate() {
625 15 : let result = storage.relative_path_to_s3_object(test_path);
626 15 : let expected = expected_outputs[prefix_idx][test_path_idx];
627 15 : assert_eq!(result, expected);
628 : }
629 : }
630 1 : }
631 : }
|