Line data Source code
1 : use std::collections::HashSet;
2 : use std::env;
3 : use std::num::{NonZeroU32, NonZeroUsize};
4 : use std::ops::ControlFlow;
5 : use std::path::{Path, PathBuf};
6 : use std::sync::Arc;
7 : use std::time::UNIX_EPOCH;
8 :
9 : use anyhow::Context;
10 : use once_cell::sync::OnceCell;
11 : use remote_storage::{
12 : GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
13 : };
14 : use test_context::{test_context, AsyncTestContext};
15 : use tokio::task::JoinSet;
16 : use tracing::{debug, error, info};
17 :
18 : static LOGGING_DONE: OnceCell<()> = OnceCell::new();
19 :
20 : const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
21 :
22 : const BASE_PREFIX: &str = "test";
23 :
24 : /// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries.
25 : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified.
26 : /// See the client creation in [`create_s3_client`] for details on the required env vars.
27 : /// If real S3 tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the
28 : /// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details.
29 : ///
30 : /// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_s3_data`]
31 : /// where
32 : /// * `random_prefix_part` is set for the entire S3 client during the S3 client creation in [`create_s3_client`], to avoid multiple test runs interference
33 : /// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket
34 : ///
35 : /// Then, verifies that the client does return correct prefixes when queried:
36 : /// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only
37 : /// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}`
38 : ///
39 : /// With the real S3 enabled and `#[cfg(test)]` Rust configuration used, the S3 client test adds a `max-keys` param to limit the response keys.
40 : /// This way, we are able to test the pagination implicitly, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to S3,
41 : /// since current default AWS S3 pagination limit is 1000.
42 : /// (see https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax)
43 : ///
44 : /// Lastly, the test attempts to clean up and remove all uploaded S3 files.
45 : /// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished.
46 2 : #[test_context(MaybeEnabledS3WithTestBlobs)]
47 1 : #[tokio::test]
48 2 : async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3WithTestBlobs) -> anyhow::Result<()> {
49 1 : let ctx = match ctx {
50 0 : MaybeEnabledS3WithTestBlobs::Enabled(ctx) => ctx,
51 1 : MaybeEnabledS3WithTestBlobs::Disabled => return Ok(()),
52 0 : MaybeEnabledS3WithTestBlobs::UploadsFailed(e, _) => anyhow::bail!("S3 init failed: {e:?}"),
53 : };
54 :
55 0 : let test_client = Arc::clone(&ctx.enabled.client);
56 0 : let expected_remote_prefixes = ctx.remote_prefixes.clone();
57 :
58 0 : let base_prefix = RemotePath::new(Path::new(ctx.enabled.base_prefix))
59 0 : .context("common_prefix construction")?;
60 0 : let root_remote_prefixes = test_client
61 0 : .list_prefixes(None)
62 0 : .await
63 0 : .context("client list root prefixes failure")?
64 0 : .into_iter()
65 0 : .collect::<HashSet<_>>();
66 0 : assert_eq!(
67 0 : root_remote_prefixes, HashSet::from([base_prefix.clone()]),
68 0 : "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}"
69 : );
70 :
71 0 : let nested_remote_prefixes = test_client
72 0 : .list_prefixes(Some(&base_prefix))
73 0 : .await
74 0 : .context("client list nested prefixes failure")?
75 0 : .into_iter()
76 0 : .collect::<HashSet<_>>();
77 0 : let remote_only_prefixes = nested_remote_prefixes
78 0 : .difference(&expected_remote_prefixes)
79 0 : .collect::<HashSet<_>>();
80 0 : let missing_uploaded_prefixes = expected_remote_prefixes
81 0 : .difference(&nested_remote_prefixes)
82 0 : .collect::<HashSet<_>>();
83 0 : assert_eq!(
84 0 : remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
85 0 : "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
86 : );
87 :
88 0 : Ok(())
89 1 : }
90 :
91 : /// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries.
92 : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set.
93 : /// See `s3_pagination_should_work` for more information.
94 : ///
95 : /// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_s3_data`]
96 : /// Then performs the following queries:
97 : /// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
98 : /// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt`
99 2 : #[test_context(MaybeEnabledS3WithSimpleTestBlobs)]
100 1 : #[tokio::test]
101 2 : async fn s3_list_files_works(ctx: &mut MaybeEnabledS3WithSimpleTestBlobs) -> anyhow::Result<()> {
102 1 : let ctx = match ctx {
103 0 : MaybeEnabledS3WithSimpleTestBlobs::Enabled(ctx) => ctx,
104 1 : MaybeEnabledS3WithSimpleTestBlobs::Disabled => return Ok(()),
105 0 : MaybeEnabledS3WithSimpleTestBlobs::UploadsFailed(e, _) => {
106 0 : anyhow::bail!("S3 init failed: {e:?}")
107 : }
108 : };
109 0 : let test_client = Arc::clone(&ctx.enabled.client);
110 0 : let base_prefix =
111 0 : RemotePath::new(Path::new("folder1")).context("common_prefix construction")?;
112 0 : let root_files = test_client
113 0 : .list_files(None)
114 0 : .await
115 0 : .context("client list root files failure")?
116 0 : .into_iter()
117 0 : .collect::<HashSet<_>>();
118 0 : assert_eq!(
119 0 : root_files,
120 0 : ctx.remote_blobs.clone(),
121 0 : "remote storage list_files on root mismatches with the uploads."
122 : );
123 0 : let nested_remote_files = test_client
124 0 : .list_files(Some(&base_prefix))
125 0 : .await
126 0 : .context("client list nested files failure")?
127 0 : .into_iter()
128 0 : .collect::<HashSet<_>>();
129 0 : let trim_remote_blobs: HashSet<_> = ctx
130 0 : .remote_blobs
131 0 : .iter()
132 0 : .map(|x| x.get_path().to_str().expect("must be valid name"))
133 0 : .filter(|x| x.starts_with("folder1"))
134 0 : .map(|x| RemotePath::new(Path::new(x)).expect("must be valid name"))
135 0 : .collect();
136 : assert_eq!(
137 : nested_remote_files, trim_remote_blobs,
138 0 : "remote storage list_files on subdirrectory mismatches with the uploads."
139 : );
140 0 : Ok(())
141 1 : }
142 :
143 2 : #[test_context(MaybeEnabledS3)]
144 1 : #[tokio::test]
145 1 : async fn s3_delete_non_exising_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> {
146 1 : let ctx = match ctx {
147 0 : MaybeEnabledS3::Enabled(ctx) => ctx,
148 1 : MaybeEnabledS3::Disabled => return Ok(()),
149 : };
150 :
151 0 : let path = RemotePath::new(&PathBuf::from(format!(
152 0 : "{}/for_sure_there_is_nothing_there_really",
153 0 : ctx.base_prefix,
154 0 : )))
155 0 : .with_context(|| "RemotePath conversion")?;
156 :
157 0 : ctx.client.delete(&path).await.expect("should succeed");
158 0 :
159 0 : Ok(())
160 1 : }
161 :
162 2 : #[test_context(MaybeEnabledS3)]
163 1 : #[tokio::test]
164 2 : async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> {
165 1 : let ctx = match ctx {
166 0 : MaybeEnabledS3::Enabled(ctx) => ctx,
167 1 : MaybeEnabledS3::Disabled => return Ok(()),
168 : };
169 :
170 0 : let path1 = RemotePath::new(&PathBuf::from(format!("{}/path1", ctx.base_prefix,)))
171 0 : .with_context(|| "RemotePath conversion")?;
172 :
173 0 : let path2 = RemotePath::new(&PathBuf::from(format!("{}/path2", ctx.base_prefix,)))
174 0 : .with_context(|| "RemotePath conversion")?;
175 :
176 0 : let path3 = RemotePath::new(&PathBuf::from(format!("{}/path3", ctx.base_prefix,)))
177 0 : .with_context(|| "RemotePath conversion")?;
178 :
179 0 : let data1 = "remote blob data1".as_bytes();
180 0 : let data1_len = data1.len();
181 0 : let data2 = "remote blob data2".as_bytes();
182 0 : let data2_len = data2.len();
183 0 : let data3 = "remote blob data3".as_bytes();
184 0 : let data3_len = data3.len();
185 0 : ctx.client
186 0 : .upload(std::io::Cursor::new(data1), data1_len, &path1, None)
187 0 : .await?;
188 :
189 0 : ctx.client
190 0 : .upload(std::io::Cursor::new(data2), data2_len, &path2, None)
191 0 : .await?;
192 :
193 0 : ctx.client
194 0 : .upload(std::io::Cursor::new(data3), data3_len, &path3, None)
195 0 : .await?;
196 :
197 0 : ctx.client.delete_objects(&[path1, path2]).await?;
198 :
199 0 : let prefixes = ctx.client.list_prefixes(None).await?;
200 :
201 0 : assert_eq!(prefixes.len(), 1);
202 :
203 0 : ctx.client.delete_objects(&[path3]).await?;
204 :
205 0 : Ok(())
206 1 : }
207 :
208 4 : fn ensure_logging_ready() {
209 4 : LOGGING_DONE.get_or_init(|| {
210 1 : utils::logging::init(
211 1 : utils::logging::LogFormat::Test,
212 1 : utils::logging::TracingErrorLayerEnablement::Disabled,
213 1 : )
214 1 : .expect("logging init failed");
215 4 : });
216 4 : }
217 :
218 : struct EnabledS3 {
219 : client: Arc<GenericRemoteStorage>,
220 : base_prefix: &'static str,
221 : }
222 :
223 : impl EnabledS3 {
224 0 : async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
225 0 : let client = create_s3_client(max_keys_in_list_response)
226 0 : .context("S3 client creation")
227 0 : .expect("S3 client creation failed");
228 0 :
229 0 : EnabledS3 {
230 0 : client,
231 0 : base_prefix: BASE_PREFIX,
232 0 : }
233 0 : }
234 : }
235 :
236 : enum MaybeEnabledS3 {
237 : Enabled(EnabledS3),
238 : Disabled,
239 : }
240 :
241 : #[async_trait::async_trait]
242 : impl AsyncTestContext for MaybeEnabledS3 {
243 2 : async fn setup() -> Self {
244 2 : ensure_logging_ready();
245 2 :
246 2 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
247 2 : info!(
248 2 : "`{}` env variable is not set, skipping the test",
249 2 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
250 2 : );
251 2 : return Self::Disabled;
252 0 : }
253 0 :
254 0 : Self::Enabled(EnabledS3::setup(None).await)
255 4 : }
256 : }
257 :
258 : enum MaybeEnabledS3WithTestBlobs {
259 : Enabled(S3WithTestBlobs),
260 : Disabled,
261 : UploadsFailed(anyhow::Error, S3WithTestBlobs),
262 : }
263 :
264 : struct S3WithTestBlobs {
265 : enabled: EnabledS3,
266 : remote_prefixes: HashSet<RemotePath>,
267 : remote_blobs: HashSet<RemotePath>,
268 : }
269 :
270 : #[async_trait::async_trait]
271 : impl AsyncTestContext for MaybeEnabledS3WithTestBlobs {
272 1 : async fn setup() -> Self {
273 1 : ensure_logging_ready();
274 1 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
275 1 : info!(
276 1 : "`{}` env variable is not set, skipping the test",
277 1 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
278 1 : );
279 1 : return Self::Disabled;
280 0 : }
281 0 :
282 0 : let max_keys_in_list_response = 10;
283 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
284 :
285 0 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
286 :
287 0 : match upload_s3_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
288 0 : ControlFlow::Continue(uploads) => {
289 0 : info!("Remote objects created successfully");
290 :
291 0 : Self::Enabled(S3WithTestBlobs {
292 0 : enabled,
293 0 : remote_prefixes: uploads.prefixes,
294 0 : remote_blobs: uploads.blobs,
295 0 : })
296 : }
297 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
298 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
299 0 : S3WithTestBlobs {
300 0 : enabled,
301 0 : remote_prefixes: uploads.prefixes,
302 0 : remote_blobs: uploads.blobs,
303 0 : },
304 0 : ),
305 : }
306 2 : }
307 :
308 1 : async fn teardown(self) {
309 1 : match self {
310 1 : Self::Disabled => {}
311 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
312 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
313 : }
314 : }
315 1 : }
316 : }
317 :
318 : // NOTE: the setups for the list_prefixes test and the list_files test are very similar
319 : // However, they are not idential. The list_prefixes function is concerned with listing prefixes,
320 : // whereas the list_files function is concerned with listing files.
321 : // See `RemoteStorage::list_files` documentation for more details
322 : enum MaybeEnabledS3WithSimpleTestBlobs {
323 : Enabled(S3WithSimpleTestBlobs),
324 : Disabled,
325 : UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
326 : }
327 : struct S3WithSimpleTestBlobs {
328 : enabled: EnabledS3,
329 : remote_blobs: HashSet<RemotePath>,
330 : }
331 :
332 : #[async_trait::async_trait]
333 : impl AsyncTestContext for MaybeEnabledS3WithSimpleTestBlobs {
334 1 : async fn setup() -> Self {
335 1 : ensure_logging_ready();
336 1 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
337 1 : info!(
338 1 : "`{}` env variable is not set, skipping the test",
339 1 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
340 1 : );
341 1 : return Self::Disabled;
342 0 : }
343 0 :
344 0 : let max_keys_in_list_response = 10;
345 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
346 :
347 0 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
348 :
349 0 : match upload_simple_s3_data(&enabled.client, upload_tasks_count).await {
350 0 : ControlFlow::Continue(uploads) => {
351 0 : info!("Remote objects created successfully");
352 :
353 0 : Self::Enabled(S3WithSimpleTestBlobs {
354 0 : enabled,
355 0 : remote_blobs: uploads,
356 0 : })
357 : }
358 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
359 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
360 0 : S3WithSimpleTestBlobs {
361 0 : enabled,
362 0 : remote_blobs: uploads,
363 0 : },
364 0 : ),
365 : }
366 2 : }
367 :
368 1 : async fn teardown(self) {
369 1 : match self {
370 1 : Self::Disabled => {}
371 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
372 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
373 : }
374 : }
375 1 : }
376 : }
377 :
378 0 : fn create_s3_client(
379 0 : max_keys_per_list_response: Option<i32>,
380 0 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
381 0 : let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
382 0 : .context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
383 0 : let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
384 0 : .context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?;
385 0 : let random_prefix_part = std::time::SystemTime::now()
386 0 : .duration_since(UNIX_EPOCH)
387 0 : .context("random s3 test prefix part calculation")?
388 0 : .as_nanos();
389 0 : let remote_storage_config = RemoteStorageConfig {
390 0 : max_concurrent_syncs: NonZeroUsize::new(100).unwrap(),
391 0 : max_sync_errors: NonZeroU32::new(5).unwrap(),
392 0 : storage: RemoteStorageKind::AwsS3(S3Config {
393 0 : bucket_name: remote_storage_s3_bucket,
394 0 : bucket_region: remote_storage_s3_region,
395 0 : prefix_in_bucket: Some(format!("pagination_should_work_test_{random_prefix_part}/")),
396 0 : endpoint: None,
397 0 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
398 0 : max_keys_per_list_response,
399 0 : }),
400 0 : };
401 0 : Ok(Arc::new(
402 0 : GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
403 : ))
404 0 : }
405 :
406 : struct Uploads {
407 : prefixes: HashSet<RemotePath>,
408 : blobs: HashSet<RemotePath>,
409 : }
410 :
411 0 : async fn upload_s3_data(
412 0 : client: &Arc<GenericRemoteStorage>,
413 0 : base_prefix_str: &'static str,
414 0 : upload_tasks_count: usize,
415 0 : ) -> ControlFlow<Uploads, Uploads> {
416 0 : info!("Creating {upload_tasks_count} S3 files");
417 0 : let mut upload_tasks = JoinSet::new();
418 0 : for i in 1..upload_tasks_count + 1 {
419 0 : let task_client = Arc::clone(client);
420 0 : upload_tasks.spawn(async move {
421 0 : let prefix = PathBuf::from(format!("{base_prefix_str}/sub_prefix_{i}/"));
422 0 : let blob_prefix = RemotePath::new(&prefix)
423 0 : .with_context(|| format!("{prefix:?} to RemotePath conversion"))?;
424 0 : let blob_path = blob_prefix.join(Path::new(&format!("blob_{i}")));
425 0 : debug!("Creating remote item {i} at path {blob_path:?}");
426 :
427 0 : let data = format!("remote blob data {i}").into_bytes();
428 0 : let data_len = data.len();
429 0 : task_client
430 0 : .upload(std::io::Cursor::new(data), data_len, &blob_path, None)
431 0 : .await?;
432 :
433 0 : Ok::<_, anyhow::Error>((blob_prefix, blob_path))
434 0 : });
435 0 : }
436 :
437 0 : let mut upload_tasks_failed = false;
438 0 : let mut uploaded_prefixes = HashSet::with_capacity(upload_tasks_count);
439 0 : let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
440 0 : while let Some(task_run_result) = upload_tasks.join_next().await {
441 0 : match task_run_result
442 0 : .context("task join failed")
443 0 : .and_then(|task_result| task_result.context("upload task failed"))
444 : {
445 0 : Ok((upload_prefix, upload_path)) => {
446 0 : uploaded_prefixes.insert(upload_prefix);
447 0 : uploaded_blobs.insert(upload_path);
448 0 : }
449 0 : Err(e) => {
450 0 : error!("Upload task failed: {e:?}");
451 0 : upload_tasks_failed = true;
452 : }
453 : }
454 : }
455 :
456 0 : let uploads = Uploads {
457 0 : prefixes: uploaded_prefixes,
458 0 : blobs: uploaded_blobs,
459 0 : };
460 0 : if upload_tasks_failed {
461 0 : ControlFlow::Break(uploads)
462 : } else {
463 0 : ControlFlow::Continue(uploads)
464 : }
465 0 : }
466 :
467 0 : async fn cleanup(client: &Arc<GenericRemoteStorage>, objects_to_delete: HashSet<RemotePath>) {
468 0 : info!(
469 0 : "Removing {} objects from the remote storage during cleanup",
470 0 : objects_to_delete.len()
471 0 : );
472 0 : let mut delete_tasks = JoinSet::new();
473 0 : for object_to_delete in objects_to_delete {
474 0 : let task_client = Arc::clone(client);
475 0 : delete_tasks.spawn(async move {
476 0 : debug!("Deleting remote item at path {object_to_delete:?}");
477 0 : task_client
478 0 : .delete(&object_to_delete)
479 0 : .await
480 0 : .with_context(|| format!("{object_to_delete:?} removal"))
481 0 : });
482 0 : }
483 :
484 0 : while let Some(task_run_result) = delete_tasks.join_next().await {
485 0 : match task_run_result {
486 0 : Ok(task_result) => match task_result {
487 0 : Ok(()) => {}
488 0 : Err(e) => error!("Delete task failed: {e:?}"),
489 : },
490 0 : Err(join_err) => error!("Delete task did not finish correctly: {join_err}"),
491 : }
492 : }
493 0 : }
494 :
495 : // Uploads files `folder{j}/blob{i}.txt`. See test description for more details.
496 0 : async fn upload_simple_s3_data(
497 0 : client: &Arc<GenericRemoteStorage>,
498 0 : upload_tasks_count: usize,
499 0 : ) -> ControlFlow<HashSet<RemotePath>, HashSet<RemotePath>> {
500 0 : info!("Creating {upload_tasks_count} S3 files");
501 0 : let mut upload_tasks = JoinSet::new();
502 0 : for i in 1..upload_tasks_count + 1 {
503 0 : let task_client = Arc::clone(client);
504 0 : upload_tasks.spawn(async move {
505 0 : let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i));
506 0 : let blob_path = RemotePath::new(&blob_path)
507 0 : .with_context(|| format!("{blob_path:?} to RemotePath conversion"))?;
508 0 : debug!("Creating remote item {i} at path {blob_path:?}");
509 :
510 0 : let data = format!("remote blob data {i}").into_bytes();
511 0 : let data_len = data.len();
512 0 : task_client
513 0 : .upload(std::io::Cursor::new(data), data_len, &blob_path, None)
514 0 : .await?;
515 :
516 0 : Ok::<_, anyhow::Error>(blob_path)
517 0 : });
518 0 : }
519 :
520 0 : let mut upload_tasks_failed = false;
521 0 : let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
522 0 : while let Some(task_run_result) = upload_tasks.join_next().await {
523 0 : match task_run_result
524 0 : .context("task join failed")
525 0 : .and_then(|task_result| task_result.context("upload task failed"))
526 : {
527 0 : Ok(upload_path) => {
528 0 : uploaded_blobs.insert(upload_path);
529 0 : }
530 0 : Err(e) => {
531 0 : error!("Upload task failed: {e:?}");
532 0 : upload_tasks_failed = true;
533 : }
534 : }
535 : }
536 :
537 0 : if upload_tasks_failed {
538 0 : ControlFlow::Break(uploaded_blobs)
539 : } else {
540 0 : ControlFlow::Continue(uploaded_blobs)
541 : }
542 0 : }
|