LCOV - code coverage report
Current view: top level - libs/remote_storage/tests - test_real_s3.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 19.4 % 355 69
Test Date: 2023-09-06 10:18:01 Functions: 47.9 % 71 34

            Line data    Source code
       1              : use std::collections::HashSet;
       2              : use std::env;
       3              : use std::num::{NonZeroU32, NonZeroUsize};
       4              : use std::ops::ControlFlow;
       5              : use std::path::{Path, PathBuf};
       6              : use std::sync::Arc;
       7              : use std::time::UNIX_EPOCH;
       8              : 
       9              : use anyhow::Context;
      10              : use once_cell::sync::OnceCell;
      11              : use remote_storage::{
      12              :     GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
      13              : };
      14              : use test_context::{test_context, AsyncTestContext};
      15              : use tokio::task::JoinSet;
      16              : use tracing::{debug, error, info};
      17              : 
      18              : static LOGGING_DONE: OnceCell<()> = OnceCell::new();
      19              : 
      20              : const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
      21              : 
      22              : const BASE_PREFIX: &str = "test";
      23              : 
      24              : /// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries.
      25              : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified.
      26              : /// See the client creation in [`create_s3_client`] for details on the required env vars.
      27              : /// If real S3 tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the
      28              : /// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details.
      29              : ///
      30              : /// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_s3_data`]
      31              : /// where
      32              : /// * `random_prefix_part` is set for the entire S3 client during the S3 client creation in [`create_s3_client`], to avoid multiple test runs interference
      33              : /// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket
      34              : ///
      35              : /// Then, verifies that the client does return correct prefixes when queried:
      36              : /// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only
      37              : /// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}`
      38              : ///
      39              : /// With the real S3 enabled and `#[cfg(test)]` Rust configuration used, the S3 client test adds a `max-keys` param to limit the response keys.
      40              : /// This way, we are able to test the pagination implicitly, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to S3,
      41              : /// since current default AWS S3 pagination limit is 1000.
      42              : /// (see https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax)
      43              : ///
      44              : /// Lastly, the test attempts to clean up and remove all uploaded S3 files.
      45              : /// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished.
      46            2 : #[test_context(MaybeEnabledS3WithTestBlobs)]
      47            1 : #[tokio::test]
      48            2 : async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3WithTestBlobs) -> anyhow::Result<()> {
      49            1 :     let ctx = match ctx {
      50            0 :         MaybeEnabledS3WithTestBlobs::Enabled(ctx) => ctx,
      51            1 :         MaybeEnabledS3WithTestBlobs::Disabled => return Ok(()),
      52            0 :         MaybeEnabledS3WithTestBlobs::UploadsFailed(e, _) => anyhow::bail!("S3 init failed: {e:?}"),
      53              :     };
      54              : 
      55            0 :     let test_client = Arc::clone(&ctx.enabled.client);
      56            0 :     let expected_remote_prefixes = ctx.remote_prefixes.clone();
      57              : 
      58            0 :     let base_prefix = RemotePath::new(Path::new(ctx.enabled.base_prefix))
      59            0 :         .context("common_prefix construction")?;
      60            0 :     let root_remote_prefixes = test_client
      61            0 :         .list_prefixes(None)
      62            0 :         .await
      63            0 :         .context("client list root prefixes failure")?
      64            0 :         .into_iter()
      65            0 :         .collect::<HashSet<_>>();
      66            0 :     assert_eq!(
      67            0 :         root_remote_prefixes, HashSet::from([base_prefix.clone()]),
      68            0 :         "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}"
      69              :     );
      70              : 
      71            0 :     let nested_remote_prefixes = test_client
      72            0 :         .list_prefixes(Some(&base_prefix))
      73            0 :         .await
      74            0 :         .context("client list nested prefixes failure")?
      75            0 :         .into_iter()
      76            0 :         .collect::<HashSet<_>>();
      77            0 :     let remote_only_prefixes = nested_remote_prefixes
      78            0 :         .difference(&expected_remote_prefixes)
      79            0 :         .collect::<HashSet<_>>();
      80            0 :     let missing_uploaded_prefixes = expected_remote_prefixes
      81            0 :         .difference(&nested_remote_prefixes)
      82            0 :         .collect::<HashSet<_>>();
      83            0 :     assert_eq!(
      84            0 :         remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
      85            0 :         "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
      86              :     );
      87              : 
      88            0 :     Ok(())
      89            1 : }
      90              : 
      91              : /// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries.
      92              : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set.
      93              : /// See `s3_pagination_should_work` for more information.
      94              : ///
      95              : /// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_s3_data`]
      96              : /// Then performs the following queries:
      97              : ///    1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
      98              : ///    2. `list_files("folder1")`.  This  should return all files `random_prefix/folder1/blob_{i}.txt`
      99            2 : #[test_context(MaybeEnabledS3WithSimpleTestBlobs)]
     100            1 : #[tokio::test]
     101            2 : async fn s3_list_files_works(ctx: &mut MaybeEnabledS3WithSimpleTestBlobs) -> anyhow::Result<()> {
     102            1 :     let ctx = match ctx {
     103            0 :         MaybeEnabledS3WithSimpleTestBlobs::Enabled(ctx) => ctx,
     104            1 :         MaybeEnabledS3WithSimpleTestBlobs::Disabled => return Ok(()),
     105            0 :         MaybeEnabledS3WithSimpleTestBlobs::UploadsFailed(e, _) => {
     106            0 :             anyhow::bail!("S3 init failed: {e:?}")
     107              :         }
     108              :     };
     109            0 :     let test_client = Arc::clone(&ctx.enabled.client);
     110            0 :     let base_prefix =
     111            0 :         RemotePath::new(Path::new("folder1")).context("common_prefix construction")?;
     112            0 :     let root_files = test_client
     113            0 :         .list_files(None)
     114            0 :         .await
     115            0 :         .context("client list root files failure")?
     116            0 :         .into_iter()
     117            0 :         .collect::<HashSet<_>>();
     118            0 :     assert_eq!(
     119            0 :         root_files,
     120            0 :         ctx.remote_blobs.clone(),
     121            0 :         "remote storage list_files on root mismatches with the uploads."
     122              :     );
     123            0 :     let nested_remote_files = test_client
     124            0 :         .list_files(Some(&base_prefix))
     125            0 :         .await
     126            0 :         .context("client list nested files failure")?
     127            0 :         .into_iter()
     128            0 :         .collect::<HashSet<_>>();
     129            0 :     let trim_remote_blobs: HashSet<_> = ctx
     130            0 :         .remote_blobs
     131            0 :         .iter()
     132            0 :         .map(|x| x.get_path().to_str().expect("must be valid name"))
     133            0 :         .filter(|x| x.starts_with("folder1"))
     134            0 :         .map(|x| RemotePath::new(Path::new(x)).expect("must be valid name"))
     135            0 :         .collect();
     136              :     assert_eq!(
     137              :         nested_remote_files, trim_remote_blobs,
     138            0 :         "remote storage list_files on subdirrectory mismatches with the uploads."
     139              :     );
     140            0 :     Ok(())
     141            1 : }
     142              : 
     143            2 : #[test_context(MaybeEnabledS3)]
     144            1 : #[tokio::test]
     145            1 : async fn s3_delete_non_exising_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> {
     146            1 :     let ctx = match ctx {
     147            0 :         MaybeEnabledS3::Enabled(ctx) => ctx,
     148            1 :         MaybeEnabledS3::Disabled => return Ok(()),
     149              :     };
     150              : 
     151            0 :     let path = RemotePath::new(&PathBuf::from(format!(
     152            0 :         "{}/for_sure_there_is_nothing_there_really",
     153            0 :         ctx.base_prefix,
     154            0 :     )))
     155            0 :     .with_context(|| "RemotePath conversion")?;
     156              : 
     157            0 :     ctx.client.delete(&path).await.expect("should succeed");
     158            0 : 
     159            0 :     Ok(())
     160            1 : }
     161              : 
     162            2 : #[test_context(MaybeEnabledS3)]
     163            1 : #[tokio::test]
     164            2 : async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> {
     165            1 :     let ctx = match ctx {
     166            0 :         MaybeEnabledS3::Enabled(ctx) => ctx,
     167            1 :         MaybeEnabledS3::Disabled => return Ok(()),
     168              :     };
     169              : 
     170            0 :     let path1 = RemotePath::new(&PathBuf::from(format!("{}/path1", ctx.base_prefix,)))
     171            0 :         .with_context(|| "RemotePath conversion")?;
     172              : 
     173            0 :     let path2 = RemotePath::new(&PathBuf::from(format!("{}/path2", ctx.base_prefix,)))
     174            0 :         .with_context(|| "RemotePath conversion")?;
     175              : 
     176            0 :     let path3 = RemotePath::new(&PathBuf::from(format!("{}/path3", ctx.base_prefix,)))
     177            0 :         .with_context(|| "RemotePath conversion")?;
     178              : 
     179            0 :     let data1 = "remote blob data1".as_bytes();
     180            0 :     let data1_len = data1.len();
     181            0 :     let data2 = "remote blob data2".as_bytes();
     182            0 :     let data2_len = data2.len();
     183            0 :     let data3 = "remote blob data3".as_bytes();
     184            0 :     let data3_len = data3.len();
     185            0 :     ctx.client
     186            0 :         .upload(std::io::Cursor::new(data1), data1_len, &path1, None)
     187            0 :         .await?;
     188              : 
     189            0 :     ctx.client
     190            0 :         .upload(std::io::Cursor::new(data2), data2_len, &path2, None)
     191            0 :         .await?;
     192              : 
     193            0 :     ctx.client
     194            0 :         .upload(std::io::Cursor::new(data3), data3_len, &path3, None)
     195            0 :         .await?;
     196              : 
     197            0 :     ctx.client.delete_objects(&[path1, path2]).await?;
     198              : 
     199            0 :     let prefixes = ctx.client.list_prefixes(None).await?;
     200              : 
     201            0 :     assert_eq!(prefixes.len(), 1);
     202              : 
     203            0 :     ctx.client.delete_objects(&[path3]).await?;
     204              : 
     205            0 :     Ok(())
     206            1 : }
     207              : 
     208            4 : fn ensure_logging_ready() {
     209            4 :     LOGGING_DONE.get_or_init(|| {
     210            1 :         utils::logging::init(
     211            1 :             utils::logging::LogFormat::Test,
     212            1 :             utils::logging::TracingErrorLayerEnablement::Disabled,
     213            1 :         )
     214            1 :         .expect("logging init failed");
     215            4 :     });
     216            4 : }
     217              : 
     218              : struct EnabledS3 {
     219              :     client: Arc<GenericRemoteStorage>,
     220              :     base_prefix: &'static str,
     221              : }
     222              : 
     223              : impl EnabledS3 {
     224            0 :     async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
     225            0 :         let client = create_s3_client(max_keys_in_list_response)
     226            0 :             .context("S3 client creation")
     227            0 :             .expect("S3 client creation failed");
     228            0 : 
     229            0 :         EnabledS3 {
     230            0 :             client,
     231            0 :             base_prefix: BASE_PREFIX,
     232            0 :         }
     233            0 :     }
     234              : }
     235              : 
     236              : enum MaybeEnabledS3 {
     237              :     Enabled(EnabledS3),
     238              :     Disabled,
     239              : }
     240              : 
     241              : #[async_trait::async_trait]
     242              : impl AsyncTestContext for MaybeEnabledS3 {
     243            2 :     async fn setup() -> Self {
     244            2 :         ensure_logging_ready();
     245            2 : 
     246            2 :         if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
     247            2 :             info!(
     248            2 :                 "`{}` env variable is not set, skipping the test",
     249            2 :                 ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
     250            2 :             );
     251            2 :             return Self::Disabled;
     252            0 :         }
     253            0 : 
     254            0 :         Self::Enabled(EnabledS3::setup(None).await)
     255            4 :     }
     256              : }
     257              : 
     258              : enum MaybeEnabledS3WithTestBlobs {
     259              :     Enabled(S3WithTestBlobs),
     260              :     Disabled,
     261              :     UploadsFailed(anyhow::Error, S3WithTestBlobs),
     262              : }
     263              : 
     264              : struct S3WithTestBlobs {
     265              :     enabled: EnabledS3,
     266              :     remote_prefixes: HashSet<RemotePath>,
     267              :     remote_blobs: HashSet<RemotePath>,
     268              : }
     269              : 
     270              : #[async_trait::async_trait]
     271              : impl AsyncTestContext for MaybeEnabledS3WithTestBlobs {
     272            1 :     async fn setup() -> Self {
     273            1 :         ensure_logging_ready();
     274            1 :         if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
     275            1 :             info!(
     276            1 :                 "`{}` env variable is not set, skipping the test",
     277            1 :                 ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
     278            1 :             );
     279            1 :             return Self::Disabled;
     280            0 :         }
     281            0 : 
     282            0 :         let max_keys_in_list_response = 10;
     283            0 :         let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
     284              : 
     285            0 :         let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
     286              : 
     287            0 :         match upload_s3_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
     288            0 :             ControlFlow::Continue(uploads) => {
     289            0 :                 info!("Remote objects created successfully");
     290              : 
     291            0 :                 Self::Enabled(S3WithTestBlobs {
     292            0 :                     enabled,
     293            0 :                     remote_prefixes: uploads.prefixes,
     294            0 :                     remote_blobs: uploads.blobs,
     295            0 :                 })
     296              :             }
     297            0 :             ControlFlow::Break(uploads) => Self::UploadsFailed(
     298            0 :                 anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
     299            0 :                 S3WithTestBlobs {
     300            0 :                     enabled,
     301            0 :                     remote_prefixes: uploads.prefixes,
     302            0 :                     remote_blobs: uploads.blobs,
     303            0 :                 },
     304            0 :             ),
     305              :         }
     306            2 :     }
     307              : 
     308            1 :     async fn teardown(self) {
     309            1 :         match self {
     310            1 :             Self::Disabled => {}
     311            0 :             Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
     312            0 :                 cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
     313              :             }
     314              :         }
     315            1 :     }
     316              : }
     317              : 
     318              : // NOTE: the setups for the list_prefixes test and the list_files test are very similar
     319              : // However, they are not idential. The list_prefixes function is concerned with listing prefixes,
     320              : // whereas the list_files function is concerned with listing files.
     321              : // See `RemoteStorage::list_files` documentation for more details
     322              : enum MaybeEnabledS3WithSimpleTestBlobs {
     323              :     Enabled(S3WithSimpleTestBlobs),
     324              :     Disabled,
     325              :     UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
     326              : }
     327              : struct S3WithSimpleTestBlobs {
     328              :     enabled: EnabledS3,
     329              :     remote_blobs: HashSet<RemotePath>,
     330              : }
     331              : 
     332              : #[async_trait::async_trait]
     333              : impl AsyncTestContext for MaybeEnabledS3WithSimpleTestBlobs {
     334            1 :     async fn setup() -> Self {
     335            1 :         ensure_logging_ready();
     336            1 :         if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
     337            1 :             info!(
     338            1 :                 "`{}` env variable is not set, skipping the test",
     339            1 :                 ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
     340            1 :             );
     341            1 :             return Self::Disabled;
     342            0 :         }
     343            0 : 
     344            0 :         let max_keys_in_list_response = 10;
     345            0 :         let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
     346              : 
     347            0 :         let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
     348              : 
     349            0 :         match upload_simple_s3_data(&enabled.client, upload_tasks_count).await {
     350            0 :             ControlFlow::Continue(uploads) => {
     351            0 :                 info!("Remote objects created successfully");
     352              : 
     353            0 :                 Self::Enabled(S3WithSimpleTestBlobs {
     354            0 :                     enabled,
     355            0 :                     remote_blobs: uploads,
     356            0 :                 })
     357              :             }
     358            0 :             ControlFlow::Break(uploads) => Self::UploadsFailed(
     359            0 :                 anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
     360            0 :                 S3WithSimpleTestBlobs {
     361            0 :                     enabled,
     362            0 :                     remote_blobs: uploads,
     363            0 :                 },
     364            0 :             ),
     365              :         }
     366            2 :     }
     367              : 
     368            1 :     async fn teardown(self) {
     369            1 :         match self {
     370            1 :             Self::Disabled => {}
     371            0 :             Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
     372            0 :                 cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
     373              :             }
     374              :         }
     375            1 :     }
     376              : }
     377              : 
     378            0 : fn create_s3_client(
     379            0 :     max_keys_per_list_response: Option<i32>,
     380            0 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
     381            0 :     let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
     382            0 :         .context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
     383            0 :     let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
     384            0 :         .context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?;
     385            0 :     let random_prefix_part = std::time::SystemTime::now()
     386            0 :         .duration_since(UNIX_EPOCH)
     387            0 :         .context("random s3 test prefix part calculation")?
     388            0 :         .as_nanos();
     389            0 :     let remote_storage_config = RemoteStorageConfig {
     390            0 :         max_concurrent_syncs: NonZeroUsize::new(100).unwrap(),
     391            0 :         max_sync_errors: NonZeroU32::new(5).unwrap(),
     392            0 :         storage: RemoteStorageKind::AwsS3(S3Config {
     393            0 :             bucket_name: remote_storage_s3_bucket,
     394            0 :             bucket_region: remote_storage_s3_region,
     395            0 :             prefix_in_bucket: Some(format!("pagination_should_work_test_{random_prefix_part}/")),
     396            0 :             endpoint: None,
     397            0 :             concurrency_limit: NonZeroUsize::new(100).unwrap(),
     398            0 :             max_keys_per_list_response,
     399            0 :         }),
     400            0 :     };
     401            0 :     Ok(Arc::new(
     402            0 :         GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
     403              :     ))
     404            0 : }
     405              : 
     406              : struct Uploads {
     407              :     prefixes: HashSet<RemotePath>,
     408              :     blobs: HashSet<RemotePath>,
     409              : }
     410              : 
     411            0 : async fn upload_s3_data(
     412            0 :     client: &Arc<GenericRemoteStorage>,
     413            0 :     base_prefix_str: &'static str,
     414            0 :     upload_tasks_count: usize,
     415            0 : ) -> ControlFlow<Uploads, Uploads> {
     416            0 :     info!("Creating {upload_tasks_count} S3 files");
     417            0 :     let mut upload_tasks = JoinSet::new();
     418            0 :     for i in 1..upload_tasks_count + 1 {
     419            0 :         let task_client = Arc::clone(client);
     420            0 :         upload_tasks.spawn(async move {
     421            0 :             let prefix = PathBuf::from(format!("{base_prefix_str}/sub_prefix_{i}/"));
     422            0 :             let blob_prefix = RemotePath::new(&prefix)
     423            0 :                 .with_context(|| format!("{prefix:?} to RemotePath conversion"))?;
     424            0 :             let blob_path = blob_prefix.join(Path::new(&format!("blob_{i}")));
     425            0 :             debug!("Creating remote item {i} at path {blob_path:?}");
     426              : 
     427            0 :             let data = format!("remote blob data {i}").into_bytes();
     428            0 :             let data_len = data.len();
     429            0 :             task_client
     430            0 :                 .upload(std::io::Cursor::new(data), data_len, &blob_path, None)
     431            0 :                 .await?;
     432              : 
     433            0 :             Ok::<_, anyhow::Error>((blob_prefix, blob_path))
     434            0 :         });
     435            0 :     }
     436              : 
     437            0 :     let mut upload_tasks_failed = false;
     438            0 :     let mut uploaded_prefixes = HashSet::with_capacity(upload_tasks_count);
     439            0 :     let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
     440            0 :     while let Some(task_run_result) = upload_tasks.join_next().await {
     441            0 :         match task_run_result
     442            0 :             .context("task join failed")
     443            0 :             .and_then(|task_result| task_result.context("upload task failed"))
     444              :         {
     445            0 :             Ok((upload_prefix, upload_path)) => {
     446            0 :                 uploaded_prefixes.insert(upload_prefix);
     447            0 :                 uploaded_blobs.insert(upload_path);
     448            0 :             }
     449            0 :             Err(e) => {
     450            0 :                 error!("Upload task failed: {e:?}");
     451            0 :                 upload_tasks_failed = true;
     452              :             }
     453              :         }
     454              :     }
     455              : 
     456            0 :     let uploads = Uploads {
     457            0 :         prefixes: uploaded_prefixes,
     458            0 :         blobs: uploaded_blobs,
     459            0 :     };
     460            0 :     if upload_tasks_failed {
     461            0 :         ControlFlow::Break(uploads)
     462              :     } else {
     463            0 :         ControlFlow::Continue(uploads)
     464              :     }
     465            0 : }
     466              : 
     467            0 : async fn cleanup(client: &Arc<GenericRemoteStorage>, objects_to_delete: HashSet<RemotePath>) {
     468            0 :     info!(
     469            0 :         "Removing {} objects from the remote storage during cleanup",
     470            0 :         objects_to_delete.len()
     471            0 :     );
     472            0 :     let mut delete_tasks = JoinSet::new();
     473            0 :     for object_to_delete in objects_to_delete {
     474            0 :         let task_client = Arc::clone(client);
     475            0 :         delete_tasks.spawn(async move {
     476            0 :             debug!("Deleting remote item at path {object_to_delete:?}");
     477            0 :             task_client
     478            0 :                 .delete(&object_to_delete)
     479            0 :                 .await
     480            0 :                 .with_context(|| format!("{object_to_delete:?} removal"))
     481            0 :         });
     482            0 :     }
     483              : 
     484            0 :     while let Some(task_run_result) = delete_tasks.join_next().await {
     485            0 :         match task_run_result {
     486            0 :             Ok(task_result) => match task_result {
     487            0 :                 Ok(()) => {}
     488            0 :                 Err(e) => error!("Delete task failed: {e:?}"),
     489              :             },
     490            0 :             Err(join_err) => error!("Delete task did not finish correctly: {join_err}"),
     491              :         }
     492              :     }
     493            0 : }
     494              : 
     495              : // Uploads files `folder{j}/blob{i}.txt`. See test description for more details.
     496            0 : async fn upload_simple_s3_data(
     497            0 :     client: &Arc<GenericRemoteStorage>,
     498            0 :     upload_tasks_count: usize,
     499            0 : ) -> ControlFlow<HashSet<RemotePath>, HashSet<RemotePath>> {
     500            0 :     info!("Creating {upload_tasks_count} S3 files");
     501            0 :     let mut upload_tasks = JoinSet::new();
     502            0 :     for i in 1..upload_tasks_count + 1 {
     503            0 :         let task_client = Arc::clone(client);
     504            0 :         upload_tasks.spawn(async move {
     505            0 :             let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i));
     506            0 :             let blob_path = RemotePath::new(&blob_path)
     507            0 :                 .with_context(|| format!("{blob_path:?} to RemotePath conversion"))?;
     508            0 :             debug!("Creating remote item {i} at path {blob_path:?}");
     509              : 
     510            0 :             let data = format!("remote blob data {i}").into_bytes();
     511            0 :             let data_len = data.len();
     512            0 :             task_client
     513            0 :                 .upload(std::io::Cursor::new(data), data_len, &blob_path, None)
     514            0 :                 .await?;
     515              : 
     516            0 :             Ok::<_, anyhow::Error>(blob_path)
     517            0 :         });
     518            0 :     }
     519              : 
     520            0 :     let mut upload_tasks_failed = false;
     521            0 :     let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
     522            0 :     while let Some(task_run_result) = upload_tasks.join_next().await {
     523            0 :         match task_run_result
     524            0 :             .context("task join failed")
     525            0 :             .and_then(|task_result| task_result.context("upload task failed"))
     526              :         {
     527            0 :             Ok(upload_path) => {
     528            0 :                 uploaded_blobs.insert(upload_path);
     529            0 :             }
     530            0 :             Err(e) => {
     531            0 :                 error!("Upload task failed: {e:?}");
     532            0 :                 upload_tasks_failed = true;
     533              :             }
     534              :         }
     535              :     }
     536              : 
     537            0 :     if upload_tasks_failed {
     538            0 :         ControlFlow::Break(uploaded_blobs)
     539              :     } else {
     540            0 :         ControlFlow::Continue(uploaded_blobs)
     541              :     }
     542            0 : }
        

Generated by: LCOV version 2.1-beta