TLA Line data Source code
1 : use std::collections::HashSet;
2 : use std::env;
3 : use std::num::{NonZeroU32, NonZeroUsize};
4 : use std::ops::ControlFlow;
5 : use std::path::PathBuf;
6 : use std::sync::Arc;
7 : use std::time::UNIX_EPOCH;
8 :
9 : use anyhow::Context;
10 : use camino::Utf8Path;
11 : use once_cell::sync::OnceCell;
12 : use remote_storage::{
13 : GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
14 : };
15 : use test_context::{test_context, AsyncTestContext};
16 : use tokio::task::JoinSet;
17 : use tracing::{debug, error, info};
18 :
19 : static LOGGING_DONE: OnceCell<()> = OnceCell::new();
20 :
21 : const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
22 :
23 : const BASE_PREFIX: &str = "test";
24 :
25 : /// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries.
26 : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified.
27 : /// See the client creation in [`create_s3_client`] for details on the required env vars.
28 : /// If real S3 tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the
29 : /// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details.
30 : ///
31 : /// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_s3_data`]
32 : /// where
33 : /// * `random_prefix_part` is set for the entire S3 client during the S3 client creation in [`create_s3_client`], to avoid multiple test runs interference
34 : /// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket
35 : ///
36 : /// Then, verifies that the client does return correct prefixes when queried:
37 : /// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only
38 : /// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}`
39 : ///
40 : /// With the real S3 enabled and `#[cfg(test)]` Rust configuration used, the S3 client test adds a `max-keys` param to limit the response keys.
41 : /// This way, we are able to test the pagination implicitly, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to S3,
42 : /// since current default AWS S3 pagination limit is 1000.
43 : /// (see https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax)
44 : ///
45 : /// Lastly, the test attempts to clean up and remove all uploaded S3 files.
46 : /// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished.
47 CBC 2 : #[test_context(MaybeEnabledS3WithTestBlobs)]
48 1 : #[tokio::test]
49 2 : async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3WithTestBlobs) -> anyhow::Result<()> {
50 1 : let ctx = match ctx {
51 UBC 0 : MaybeEnabledS3WithTestBlobs::Enabled(ctx) => ctx,
52 CBC 1 : MaybeEnabledS3WithTestBlobs::Disabled => return Ok(()),
53 UBC 0 : MaybeEnabledS3WithTestBlobs::UploadsFailed(e, _) => anyhow::bail!("S3 init failed: {e:?}"),
54 : };
55 :
56 0 : let test_client = Arc::clone(&ctx.enabled.client);
57 0 : let expected_remote_prefixes = ctx.remote_prefixes.clone();
58 :
59 0 : let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix))
60 0 : .context("common_prefix construction")?;
61 0 : let root_remote_prefixes = test_client
62 0 : .list_prefixes(None)
63 0 : .await
64 0 : .context("client list root prefixes failure")?
65 0 : .into_iter()
66 0 : .collect::<HashSet<_>>();
67 0 : assert_eq!(
68 0 : root_remote_prefixes, HashSet::from([base_prefix.clone()]),
69 0 : "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}"
70 : );
71 :
72 0 : let nested_remote_prefixes = test_client
73 0 : .list_prefixes(Some(&base_prefix))
74 0 : .await
75 0 : .context("client list nested prefixes failure")?
76 0 : .into_iter()
77 0 : .collect::<HashSet<_>>();
78 0 : let remote_only_prefixes = nested_remote_prefixes
79 0 : .difference(&expected_remote_prefixes)
80 0 : .collect::<HashSet<_>>();
81 0 : let missing_uploaded_prefixes = expected_remote_prefixes
82 0 : .difference(&nested_remote_prefixes)
83 0 : .collect::<HashSet<_>>();
84 0 : assert_eq!(
85 0 : remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
86 0 : "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
87 : );
88 :
89 0 : Ok(())
90 CBC 1 : }
91 :
92 : /// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries.
93 : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set.
94 : /// See `s3_pagination_should_work` for more information.
95 : ///
96 : /// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_s3_data`]
97 : /// Then performs the following queries:
98 : /// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
99 : /// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt`
100 2 : #[test_context(MaybeEnabledS3WithSimpleTestBlobs)]
101 1 : #[tokio::test]
102 2 : async fn s3_list_files_works(ctx: &mut MaybeEnabledS3WithSimpleTestBlobs) -> anyhow::Result<()> {
103 1 : let ctx = match ctx {
104 UBC 0 : MaybeEnabledS3WithSimpleTestBlobs::Enabled(ctx) => ctx,
105 CBC 1 : MaybeEnabledS3WithSimpleTestBlobs::Disabled => return Ok(()),
106 UBC 0 : MaybeEnabledS3WithSimpleTestBlobs::UploadsFailed(e, _) => {
107 0 : anyhow::bail!("S3 init failed: {e:?}")
108 : }
109 : };
110 0 : let test_client = Arc::clone(&ctx.enabled.client);
111 0 : let base_prefix =
112 0 : RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?;
113 0 : let root_files = test_client
114 0 : .list_files(None)
115 0 : .await
116 0 : .context("client list root files failure")?
117 0 : .into_iter()
118 0 : .collect::<HashSet<_>>();
119 0 : assert_eq!(
120 0 : root_files,
121 0 : ctx.remote_blobs.clone(),
122 0 : "remote storage list_files on root mismatches with the uploads."
123 : );
124 0 : let nested_remote_files = test_client
125 0 : .list_files(Some(&base_prefix))
126 0 : .await
127 0 : .context("client list nested files failure")?
128 0 : .into_iter()
129 0 : .collect::<HashSet<_>>();
130 0 : let trim_remote_blobs: HashSet<_> = ctx
131 0 : .remote_blobs
132 0 : .iter()
133 0 : .map(|x| x.get_path())
134 0 : .filter(|x| x.starts_with("folder1"))
135 0 : .map(|x| RemotePath::new(x).expect("must be valid path"))
136 0 : .collect();
137 : assert_eq!(
138 : nested_remote_files, trim_remote_blobs,
139 0 : "remote storage list_files on subdirrectory mismatches with the uploads."
140 : );
141 0 : Ok(())
142 CBC 1 : }
143 :
144 2 : #[test_context(MaybeEnabledS3)]
145 1 : #[tokio::test]
146 1 : async fn s3_delete_non_exising_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> {
147 1 : let ctx = match ctx {
148 UBC 0 : MaybeEnabledS3::Enabled(ctx) => ctx,
149 CBC 1 : MaybeEnabledS3::Disabled => return Ok(()),
150 : };
151 :
152 UBC 0 : let path = RemotePath::new(Utf8Path::new(
153 0 : format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(),
154 0 : ))
155 0 : .with_context(|| "RemotePath conversion")?;
156 :
157 0 : ctx.client.delete(&path).await.expect("should succeed");
158 0 :
159 0 : Ok(())
160 CBC 1 : }
161 :
162 2 : #[test_context(MaybeEnabledS3)]
163 1 : #[tokio::test]
164 2 : async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> {
165 1 : let ctx = match ctx {
166 UBC 0 : MaybeEnabledS3::Enabled(ctx) => ctx,
167 CBC 1 : MaybeEnabledS3::Disabled => return Ok(()),
168 : };
169 :
170 UBC 0 : let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
171 0 : .with_context(|| "RemotePath conversion")?;
172 :
173 0 : let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
174 0 : .with_context(|| "RemotePath conversion")?;
175 :
176 0 : let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
177 0 : .with_context(|| "RemotePath conversion")?;
178 :
179 0 : let data1 = "remote blob data1".as_bytes();
180 0 : let data1_len = data1.len();
181 0 : let data2 = "remote blob data2".as_bytes();
182 0 : let data2_len = data2.len();
183 0 : let data3 = "remote blob data3".as_bytes();
184 0 : let data3_len = data3.len();
185 0 : ctx.client
186 0 : .upload(std::io::Cursor::new(data1), data1_len, &path1, None)
187 0 : .await?;
188 :
189 0 : ctx.client
190 0 : .upload(std::io::Cursor::new(data2), data2_len, &path2, None)
191 0 : .await?;
192 :
193 0 : ctx.client
194 0 : .upload(std::io::Cursor::new(data3), data3_len, &path3, None)
195 0 : .await?;
196 :
197 0 : ctx.client.delete_objects(&[path1, path2]).await?;
198 :
199 0 : let prefixes = ctx.client.list_prefixes(None).await?;
200 :
201 0 : assert_eq!(prefixes.len(), 1);
202 :
203 0 : ctx.client.delete_objects(&[path3]).await?;
204 :
205 0 : Ok(())
206 CBC 1 : }
207 :
208 4 : fn ensure_logging_ready() {
209 4 : LOGGING_DONE.get_or_init(|| {
210 1 : utils::logging::init(
211 1 : utils::logging::LogFormat::Test,
212 1 : utils::logging::TracingErrorLayerEnablement::Disabled,
213 1 : )
214 1 : .expect("logging init failed");
215 4 : });
216 4 : }
217 :
218 : struct EnabledS3 {
219 : client: Arc<GenericRemoteStorage>,
220 : base_prefix: &'static str,
221 : }
222 :
223 : impl EnabledS3 {
224 UBC 0 : async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
225 0 : let client = create_s3_client(max_keys_in_list_response)
226 0 : .context("S3 client creation")
227 0 : .expect("S3 client creation failed");
228 0 :
229 0 : EnabledS3 {
230 0 : client,
231 0 : base_prefix: BASE_PREFIX,
232 0 : }
233 0 : }
234 : }
235 :
236 : enum MaybeEnabledS3 {
237 : Enabled(EnabledS3),
238 : Disabled,
239 : }
240 :
241 : #[async_trait::async_trait]
242 : impl AsyncTestContext for MaybeEnabledS3 {
243 CBC 2 : async fn setup() -> Self {
244 2 : ensure_logging_ready();
245 2 :
246 2 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
247 2 : info!(
248 2 : "`{}` env variable is not set, skipping the test",
249 2 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
250 2 : );
251 2 : return Self::Disabled;
252 UBC 0 : }
253 0 :
254 0 : Self::Enabled(EnabledS3::setup(None).await)
255 CBC 4 : }
256 : }
257 :
258 : enum MaybeEnabledS3WithTestBlobs {
259 : Enabled(S3WithTestBlobs),
260 : Disabled,
261 : UploadsFailed(anyhow::Error, S3WithTestBlobs),
262 : }
263 :
264 : struct S3WithTestBlobs {
265 : enabled: EnabledS3,
266 : remote_prefixes: HashSet<RemotePath>,
267 : remote_blobs: HashSet<RemotePath>,
268 : }
269 :
270 : #[async_trait::async_trait]
271 : impl AsyncTestContext for MaybeEnabledS3WithTestBlobs {
272 1 : async fn setup() -> Self {
273 1 : ensure_logging_ready();
274 1 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
275 1 : info!(
276 1 : "`{}` env variable is not set, skipping the test",
277 1 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
278 1 : );
279 1 : return Self::Disabled;
280 UBC 0 : }
281 0 :
282 0 : let max_keys_in_list_response = 10;
283 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
284 :
285 0 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
286 :
287 0 : match upload_s3_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
288 0 : ControlFlow::Continue(uploads) => {
289 0 : info!("Remote objects created successfully");
290 :
291 0 : Self::Enabled(S3WithTestBlobs {
292 0 : enabled,
293 0 : remote_prefixes: uploads.prefixes,
294 0 : remote_blobs: uploads.blobs,
295 0 : })
296 : }
297 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
298 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
299 0 : S3WithTestBlobs {
300 0 : enabled,
301 0 : remote_prefixes: uploads.prefixes,
302 0 : remote_blobs: uploads.blobs,
303 0 : },
304 0 : ),
305 : }
306 CBC 2 : }
307 :
308 1 : async fn teardown(self) {
309 1 : match self {
310 1 : Self::Disabled => {}
311 UBC 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
312 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
313 : }
314 : }
315 CBC 1 : }
316 : }
317 :
318 : // NOTE: the setups for the list_prefixes test and the list_files test are very similar
319 : // However, they are not idential. The list_prefixes function is concerned with listing prefixes,
320 : // whereas the list_files function is concerned with listing files.
321 : // See `RemoteStorage::list_files` documentation for more details
322 : enum MaybeEnabledS3WithSimpleTestBlobs {
323 : Enabled(S3WithSimpleTestBlobs),
324 : Disabled,
325 : UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
326 : }
327 : struct S3WithSimpleTestBlobs {
328 : enabled: EnabledS3,
329 : remote_blobs: HashSet<RemotePath>,
330 : }
331 :
332 : #[async_trait::async_trait]
333 : impl AsyncTestContext for MaybeEnabledS3WithSimpleTestBlobs {
334 1 : async fn setup() -> Self {
335 1 : ensure_logging_ready();
336 1 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
337 1 : info!(
338 1 : "`{}` env variable is not set, skipping the test",
339 1 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
340 1 : );
341 1 : return Self::Disabled;
342 UBC 0 : }
343 0 :
344 0 : let max_keys_in_list_response = 10;
345 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
346 :
347 0 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
348 :
349 0 : match upload_simple_s3_data(&enabled.client, upload_tasks_count).await {
350 0 : ControlFlow::Continue(uploads) => {
351 0 : info!("Remote objects created successfully");
352 :
353 0 : Self::Enabled(S3WithSimpleTestBlobs {
354 0 : enabled,
355 0 : remote_blobs: uploads,
356 0 : })
357 : }
358 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
359 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
360 0 : S3WithSimpleTestBlobs {
361 0 : enabled,
362 0 : remote_blobs: uploads,
363 0 : },
364 0 : ),
365 : }
366 CBC 2 : }
367 :
368 1 : async fn teardown(self) {
369 1 : match self {
370 1 : Self::Disabled => {}
371 UBC 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
372 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
373 : }
374 : }
375 CBC 1 : }
376 : }
377 :
378 UBC 0 : fn create_s3_client(
379 0 : max_keys_per_list_response: Option<i32>,
380 0 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
381 : use rand::Rng;
382 :
383 0 : let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
384 0 : .context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
385 0 : let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
386 0 : .context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?;
387 :
388 : // due to how time works, we've had test runners use the same nanos as bucket prefixes.
389 : // millis is just a debugging aid for easier finding the prefix later.
390 0 : let millis = std::time::SystemTime::now()
391 0 : .duration_since(UNIX_EPOCH)
392 0 : .context("random s3 test prefix part calculation")?
393 0 : .as_millis();
394 0 :
395 0 : // because nanos can be the same for two threads so can millis, add randomness
396 0 : let random = rand::thread_rng().gen::<u32>();
397 0 :
398 0 : let remote_storage_config = RemoteStorageConfig {
399 0 : max_concurrent_syncs: NonZeroUsize::new(100).unwrap(),
400 0 : max_sync_errors: NonZeroU32::new(5).unwrap(),
401 0 : storage: RemoteStorageKind::AwsS3(S3Config {
402 0 : bucket_name: remote_storage_s3_bucket,
403 0 : bucket_region: remote_storage_s3_region,
404 0 : prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
405 0 : endpoint: None,
406 0 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
407 0 : max_keys_per_list_response,
408 0 : }),
409 0 : };
410 0 : Ok(Arc::new(
411 0 : GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
412 : ))
413 0 : }
414 :
415 : struct Uploads {
416 : prefixes: HashSet<RemotePath>,
417 : blobs: HashSet<RemotePath>,
418 : }
419 :
420 0 : async fn upload_s3_data(
421 0 : client: &Arc<GenericRemoteStorage>,
422 0 : base_prefix_str: &'static str,
423 0 : upload_tasks_count: usize,
424 0 : ) -> ControlFlow<Uploads, Uploads> {
425 0 : info!("Creating {upload_tasks_count} S3 files");
426 0 : let mut upload_tasks = JoinSet::new();
427 0 : for i in 1..upload_tasks_count + 1 {
428 0 : let task_client = Arc::clone(client);
429 0 : upload_tasks.spawn(async move {
430 0 : let prefix = format!("{base_prefix_str}/sub_prefix_{i}/");
431 0 : let blob_prefix = RemotePath::new(Utf8Path::new(&prefix))
432 0 : .with_context(|| format!("{prefix:?} to RemotePath conversion"))?;
433 0 : let blob_path = blob_prefix.join(Utf8Path::new(&format!("blob_{i}")));
434 0 : debug!("Creating remote item {i} at path {blob_path:?}");
435 :
436 0 : let data = format!("remote blob data {i}").into_bytes();
437 0 : let data_len = data.len();
438 0 : task_client
439 0 : .upload(std::io::Cursor::new(data), data_len, &blob_path, None)
440 0 : .await?;
441 :
442 0 : Ok::<_, anyhow::Error>((blob_prefix, blob_path))
443 0 : });
444 0 : }
445 :
446 0 : let mut upload_tasks_failed = false;
447 0 : let mut uploaded_prefixes = HashSet::with_capacity(upload_tasks_count);
448 0 : let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
449 0 : while let Some(task_run_result) = upload_tasks.join_next().await {
450 0 : match task_run_result
451 0 : .context("task join failed")
452 0 : .and_then(|task_result| task_result.context("upload task failed"))
453 : {
454 0 : Ok((upload_prefix, upload_path)) => {
455 0 : uploaded_prefixes.insert(upload_prefix);
456 0 : uploaded_blobs.insert(upload_path);
457 0 : }
458 0 : Err(e) => {
459 0 : error!("Upload task failed: {e:?}");
460 0 : upload_tasks_failed = true;
461 : }
462 : }
463 : }
464 :
465 0 : let uploads = Uploads {
466 0 : prefixes: uploaded_prefixes,
467 0 : blobs: uploaded_blobs,
468 0 : };
469 0 : if upload_tasks_failed {
470 0 : ControlFlow::Break(uploads)
471 : } else {
472 0 : ControlFlow::Continue(uploads)
473 : }
474 0 : }
475 :
476 0 : async fn cleanup(client: &Arc<GenericRemoteStorage>, objects_to_delete: HashSet<RemotePath>) {
477 0 : info!(
478 0 : "Removing {} objects from the remote storage during cleanup",
479 0 : objects_to_delete.len()
480 0 : );
481 0 : let mut delete_tasks = JoinSet::new();
482 0 : for object_to_delete in objects_to_delete {
483 0 : let task_client = Arc::clone(client);
484 0 : delete_tasks.spawn(async move {
485 0 : debug!("Deleting remote item at path {object_to_delete:?}");
486 0 : task_client
487 0 : .delete(&object_to_delete)
488 0 : .await
489 0 : .with_context(|| format!("{object_to_delete:?} removal"))
490 0 : });
491 0 : }
492 :
493 0 : while let Some(task_run_result) = delete_tasks.join_next().await {
494 0 : match task_run_result {
495 0 : Ok(task_result) => match task_result {
496 0 : Ok(()) => {}
497 0 : Err(e) => error!("Delete task failed: {e:?}"),
498 : },
499 0 : Err(join_err) => error!("Delete task did not finish correctly: {join_err}"),
500 : }
501 : }
502 0 : }
503 :
504 : // Uploads files `folder{j}/blob{i}.txt`. See test description for more details.
505 0 : async fn upload_simple_s3_data(
506 0 : client: &Arc<GenericRemoteStorage>,
507 0 : upload_tasks_count: usize,
508 0 : ) -> ControlFlow<HashSet<RemotePath>, HashSet<RemotePath>> {
509 0 : info!("Creating {upload_tasks_count} S3 files");
510 0 : let mut upload_tasks = JoinSet::new();
511 0 : for i in 1..upload_tasks_count + 1 {
512 0 : let task_client = Arc::clone(client);
513 0 : upload_tasks.spawn(async move {
514 0 : let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i));
515 0 : let blob_path = RemotePath::new(
516 0 : Utf8Path::from_path(blob_path.as_path()).expect("must be valid blob path"),
517 0 : )
518 0 : .with_context(|| format!("{blob_path:?} to RemotePath conversion"))?;
519 0 : debug!("Creating remote item {i} at path {blob_path:?}");
520 :
521 0 : let data = format!("remote blob data {i}").into_bytes();
522 0 : let data_len = data.len();
523 0 : task_client
524 0 : .upload(std::io::Cursor::new(data), data_len, &blob_path, None)
525 0 : .await?;
526 :
527 0 : Ok::<_, anyhow::Error>(blob_path)
528 0 : });
529 0 : }
530 :
531 0 : let mut upload_tasks_failed = false;
532 0 : let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
533 0 : while let Some(task_run_result) = upload_tasks.join_next().await {
534 0 : match task_run_result
535 0 : .context("task join failed")
536 0 : .and_then(|task_result| task_result.context("upload task failed"))
537 : {
538 0 : Ok(upload_path) => {
539 0 : uploaded_blobs.insert(upload_path);
540 0 : }
541 0 : Err(e) => {
542 0 : error!("Upload task failed: {e:?}");
543 0 : upload_tasks_failed = true;
544 : }
545 : }
546 : }
547 :
548 0 : if upload_tasks_failed {
549 0 : ControlFlow::Break(uploaded_blobs)
550 : } else {
551 0 : ControlFlow::Continue(uploaded_blobs)
552 : }
553 0 : }
|