LCOV - differential code coverage report
Current view: top level - libs/remote_storage/tests - test_real_azure.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 18.5 % 406 75 331 75
Current Date: 2023-10-19 02:04:12 Functions: 49.4 % 79 39 40 39
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use std::collections::HashSet;
       2                 : use std::env;
       3                 : use std::num::{NonZeroU32, NonZeroUsize};
       4                 : use std::ops::ControlFlow;
       5                 : use std::path::PathBuf;
       6                 : use std::sync::Arc;
       7                 : use std::time::UNIX_EPOCH;
       8                 : 
       9                 : use anyhow::Context;
      10                 : use camino::Utf8Path;
      11                 : use once_cell::sync::OnceCell;
      12                 : use remote_storage::{
      13                 :     AzureConfig, Download, GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind,
      14                 : };
      15                 : use test_context::{test_context, AsyncTestContext};
      16                 : use tokio::task::JoinSet;
      17                 : use tracing::{debug, error, info};
      18                 : 
      19                 : static LOGGING_DONE: OnceCell<()> = OnceCell::new();
      20                 : 
      21                 : const ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_AZURE_REMOTE_STORAGE";
      22                 : 
      23                 : const BASE_PREFIX: &str = "test";
      24                 : 
      25                 : /// Tests that the Azure client can list all prefixes, even if the response comes paginated and requires multiple HTTP queries.
      26                 : /// Uses real Azure and requires [`ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME`] and related Azure cred env vars specified.
      27                 : /// See the client creation in [`create_azure_client`] for details on the required env vars.
      28                 : /// If real Azure tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the
      29                 : /// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details.
      30                 : ///
      31                 : /// First, the test creates a set of Azure blobs with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_azure_data`]
      32                 : /// where
      33                 : /// * `random_prefix_part` is set for the entire Azure client during the Azure client creation in [`create_azure_client`], to avoid multiple test runs interference
      34                 : /// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket
      35                 : ///
      36                 : /// Then, verifies that the client does return correct prefixes when queried:
      37                 : /// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only
      38                 : /// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}`
      39                 : ///
      40                 : /// With the real Azure enabled and `#[cfg(test)]` Rust configuration used, the Azure client test adds a `max-keys` param to limit the response keys.
      41                 : /// This way, we are able to test the pagination implicitly, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to Azure.
      42                 : ///
      43                 : /// Lastly, the test attempts to clean up and remove all uploaded Azure files.
      44                 : /// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished.
      45 CBC           2 : #[test_context(MaybeEnabledAzureWithTestBlobs)]
      46               1 : #[tokio::test]
      47                 : async fn azure_pagination_should_work(
      48                 :     ctx: &mut MaybeEnabledAzureWithTestBlobs,
      49               2 : ) -> anyhow::Result<()> {
      50               1 :     let ctx = match ctx {
      51 UBC           0 :         MaybeEnabledAzureWithTestBlobs::Enabled(ctx) => ctx,
      52 CBC           1 :         MaybeEnabledAzureWithTestBlobs::Disabled => return Ok(()),
      53 UBC           0 :         MaybeEnabledAzureWithTestBlobs::UploadsFailed(e, _) => {
      54               0 :             anyhow::bail!("Azure init failed: {e:?}")
      55                 :         }
      56                 :     };
      57                 : 
      58               0 :     let test_client = Arc::clone(&ctx.enabled.client);
      59               0 :     let expected_remote_prefixes = ctx.remote_prefixes.clone();
      60                 : 
      61               0 :     let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix))
      62               0 :         .context("common_prefix construction")?;
      63               0 :     let root_remote_prefixes = test_client
      64               0 :         .list_prefixes(None)
      65               0 :         .await
      66               0 :         .context("client list root prefixes failure")?
      67               0 :         .into_iter()
      68               0 :         .collect::<HashSet<_>>();
      69               0 :     assert_eq!(
      70               0 :         root_remote_prefixes, HashSet::from([base_prefix.clone()]),
      71               0 :         "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}"
      72                 :     );
      73                 : 
      74               0 :     let nested_remote_prefixes = test_client
      75               0 :         .list_prefixes(Some(&base_prefix))
      76               0 :         .await
      77               0 :         .context("client list nested prefixes failure")?
      78               0 :         .into_iter()
      79               0 :         .collect::<HashSet<_>>();
      80               0 :     let remote_only_prefixes = nested_remote_prefixes
      81               0 :         .difference(&expected_remote_prefixes)
      82               0 :         .collect::<HashSet<_>>();
      83               0 :     let missing_uploaded_prefixes = expected_remote_prefixes
      84               0 :         .difference(&nested_remote_prefixes)
      85               0 :         .collect::<HashSet<_>>();
      86               0 :     assert_eq!(
      87               0 :         remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
      88               0 :         "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
      89                 :     );
      90                 : 
      91               0 :     Ok(())
      92 CBC           1 : }
      93                 : 
      94                 : /// Tests that Azure client can list all files in a folder, even if the response comes paginated and requirees multiple Azure queries.
      95                 : /// Uses real Azure and requires [`ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME`] and related Azure cred env vars specified. Test will skip real code and pass if env vars not set.
      96                 : /// See `Azure_pagination_should_work` for more information.
      97                 : ///
      98                 : /// First, create a set of Azure objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_azure_data`]
      99                 : /// Then performs the following queries:
     100                 : ///    1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
     101                 : ///    2. `list_files("folder1")`.  This  should return all files `random_prefix/folder1/blob_{i}.txt`
     102               2 : #[test_context(MaybeEnabledAzureWithSimpleTestBlobs)]
     103               1 : #[tokio::test]
     104                 : async fn azure_list_files_works(
     105                 :     ctx: &mut MaybeEnabledAzureWithSimpleTestBlobs,
     106               2 : ) -> anyhow::Result<()> {
     107               1 :     let ctx = match ctx {
     108 UBC           0 :         MaybeEnabledAzureWithSimpleTestBlobs::Enabled(ctx) => ctx,
     109 CBC           1 :         MaybeEnabledAzureWithSimpleTestBlobs::Disabled => return Ok(()),
     110 UBC           0 :         MaybeEnabledAzureWithSimpleTestBlobs::UploadsFailed(e, _) => {
     111               0 :             anyhow::bail!("Azure init failed: {e:?}")
     112                 :         }
     113                 :     };
     114               0 :     let test_client = Arc::clone(&ctx.enabled.client);
     115               0 :     let base_prefix =
     116               0 :         RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?;
     117               0 :     let root_files = test_client
     118               0 :         .list_files(None)
     119               0 :         .await
     120               0 :         .context("client list root files failure")?
     121               0 :         .into_iter()
     122               0 :         .collect::<HashSet<_>>();
     123               0 :     assert_eq!(
     124               0 :         root_files,
     125               0 :         ctx.remote_blobs.clone(),
     126               0 :         "remote storage list_files on root mismatches with the uploads."
     127                 :     );
     128               0 :     let nested_remote_files = test_client
     129               0 :         .list_files(Some(&base_prefix))
     130               0 :         .await
     131               0 :         .context("client list nested files failure")?
     132               0 :         .into_iter()
     133               0 :         .collect::<HashSet<_>>();
     134               0 :     let trim_remote_blobs: HashSet<_> = ctx
     135               0 :         .remote_blobs
     136               0 :         .iter()
     137               0 :         .map(|x| x.get_path())
     138               0 :         .filter(|x| x.starts_with("folder1"))
     139               0 :         .map(|x| RemotePath::new(x).expect("must be valid path"))
     140               0 :         .collect();
     141                 :     assert_eq!(
     142                 :         nested_remote_files, trim_remote_blobs,
     143               0 :         "remote storage list_files on subdirrectory mismatches with the uploads."
     144                 :     );
     145               0 :     Ok(())
     146 CBC           1 : }
     147                 : 
     148               2 : #[test_context(MaybeEnabledAzure)]
     149               1 : #[tokio::test]
     150               1 : async fn azure_delete_non_exising_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Result<()> {
     151               1 :     let ctx = match ctx {
     152 UBC           0 :         MaybeEnabledAzure::Enabled(ctx) => ctx,
     153 CBC           1 :         MaybeEnabledAzure::Disabled => return Ok(()),
     154                 :     };
     155                 : 
     156 UBC           0 :     let path = RemotePath::new(Utf8Path::new(
     157               0 :         format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(),
     158               0 :     ))
     159               0 :     .with_context(|| "RemotePath conversion")?;
     160                 : 
     161               0 :     ctx.client.delete(&path).await.expect("should succeed");
     162               0 : 
     163               0 :     Ok(())
     164 CBC           1 : }
     165                 : 
     166               2 : #[test_context(MaybeEnabledAzure)]
     167               1 : #[tokio::test]
     168               2 : async fn azure_delete_objects_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Result<()> {
     169               1 :     let ctx = match ctx {
     170 UBC           0 :         MaybeEnabledAzure::Enabled(ctx) => ctx,
     171 CBC           1 :         MaybeEnabledAzure::Disabled => return Ok(()),
     172                 :     };
     173                 : 
     174 UBC           0 :     let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
     175               0 :         .with_context(|| "RemotePath conversion")?;
     176                 : 
     177               0 :     let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
     178               0 :         .with_context(|| "RemotePath conversion")?;
     179                 : 
     180               0 :     let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
     181               0 :         .with_context(|| "RemotePath conversion")?;
     182                 : 
     183               0 :     let data1 = "remote blob data1".as_bytes();
     184               0 :     let data1_len = data1.len();
     185               0 :     let data2 = "remote blob data2".as_bytes();
     186               0 :     let data2_len = data2.len();
     187               0 :     let data3 = "remote blob data3".as_bytes();
     188               0 :     let data3_len = data3.len();
     189               0 :     ctx.client
     190               0 :         .upload(std::io::Cursor::new(data1), data1_len, &path1, None)
     191               0 :         .await?;
     192                 : 
     193               0 :     ctx.client
     194               0 :         .upload(std::io::Cursor::new(data2), data2_len, &path2, None)
     195               0 :         .await?;
     196                 : 
     197               0 :     ctx.client
     198               0 :         .upload(std::io::Cursor::new(data3), data3_len, &path3, None)
     199               0 :         .await?;
     200                 : 
     201               0 :     ctx.client.delete_objects(&[path1, path2]).await?;
     202                 : 
     203               0 :     let prefixes = ctx.client.list_prefixes(None).await?;
     204                 : 
     205               0 :     assert_eq!(prefixes.len(), 1);
     206                 : 
     207               0 :     ctx.client.delete_objects(&[path3]).await?;
     208                 : 
     209               0 :     Ok(())
     210 CBC           1 : }
     211                 : 
     212               2 : #[test_context(MaybeEnabledAzure)]
     213               1 : #[tokio::test]
     214               2 : async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Result<()> {
     215               1 :     let MaybeEnabledAzure::Enabled(ctx) = ctx else {
     216               1 :         return Ok(());
     217                 :     };
     218                 : 
     219 UBC           0 :     let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))
     220               0 :         .with_context(|| "RemotePath conversion")?;
     221                 : 
     222               0 :     let data = "remote blob data here".as_bytes();
     223               0 :     let data_len = data.len() as u64;
     224               0 : 
     225               0 :     ctx.client
     226               0 :         .upload(std::io::Cursor::new(data), data.len(), &path, None)
     227               0 :         .await?;
     228                 : 
     229               0 :     async fn download_and_compare(mut dl: Download) -> anyhow::Result<Vec<u8>> {
     230               0 :         let mut buf = Vec::new();
     231               0 :         tokio::io::copy(&mut dl.download_stream, &mut buf).await?;
     232               0 :         Ok(buf)
     233               0 :     }
     234                 :     // Normal download request
     235               0 :     let dl = ctx.client.download(&path).await?;
     236               0 :     let buf = download_and_compare(dl).await?;
     237               0 :     assert_eq!(buf, data);
     238                 : 
     239                 :     // Full range (end specified)
     240               0 :     let dl = ctx
     241               0 :         .client
     242               0 :         .download_byte_range(&path, 0, Some(data_len))
     243               0 :         .await?;
     244               0 :     let buf = download_and_compare(dl).await?;
     245               0 :     assert_eq!(buf, data);
     246                 : 
     247                 :     // partial range (end specified)
     248               0 :     let dl = ctx.client.download_byte_range(&path, 4, Some(10)).await?;
     249               0 :     let buf = download_and_compare(dl).await?;
     250               0 :     assert_eq!(buf, data[4..10]);
     251                 : 
     252                 :     // partial range (end beyond real end)
     253               0 :     let dl = ctx
     254               0 :         .client
     255               0 :         .download_byte_range(&path, 8, Some(data_len * 100))
     256               0 :         .await?;
     257               0 :     let buf = download_and_compare(dl).await?;
     258               0 :     assert_eq!(buf, data[8..]);
     259                 : 
     260                 :     // Partial range (end unspecified)
     261               0 :     let dl = ctx.client.download_byte_range(&path, 4, None).await?;
     262               0 :     let buf = download_and_compare(dl).await?;
     263               0 :     assert_eq!(buf, data[4..]);
     264                 : 
     265                 :     // Full range (end unspecified)
     266               0 :     let dl = ctx.client.download_byte_range(&path, 0, None).await?;
     267               0 :     let buf = download_and_compare(dl).await?;
     268               0 :     assert_eq!(buf, data);
     269                 : 
     270               0 :     Ok(())
     271 CBC           1 : }
     272                 : 
     273               5 : fn ensure_logging_ready() {
     274               5 :     LOGGING_DONE.get_or_init(|| {
     275               1 :         utils::logging::init(
     276               1 :             utils::logging::LogFormat::Test,
     277               1 :             utils::logging::TracingErrorLayerEnablement::Disabled,
     278               1 :         )
     279               1 :         .expect("logging init failed");
     280               5 :     });
     281               5 : }
     282                 : 
     283                 : struct EnabledAzure {
     284                 :     client: Arc<GenericRemoteStorage>,
     285                 :     base_prefix: &'static str,
     286                 : }
     287                 : 
     288                 : impl EnabledAzure {
     289 UBC           0 :     async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
     290               0 :         let client = create_azure_client(max_keys_in_list_response)
     291               0 :             .context("Azure client creation")
     292               0 :             .expect("Azure client creation failed");
     293               0 : 
     294               0 :         EnabledAzure {
     295               0 :             client,
     296               0 :             base_prefix: BASE_PREFIX,
     297               0 :         }
     298               0 :     }
     299                 : }
     300                 : 
     301                 : enum MaybeEnabledAzure {
     302                 :     Enabled(EnabledAzure),
     303                 :     Disabled,
     304                 : }
     305                 : 
     306                 : #[async_trait::async_trait]
     307                 : impl AsyncTestContext for MaybeEnabledAzure {
     308 CBC           3 :     async fn setup() -> Self {
     309               3 :         ensure_logging_ready();
     310               3 : 
     311               3 :         if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
     312               3 :             info!(
     313               3 :                 "`{}` env variable is not set, skipping the test",
     314               3 :                 ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME
     315               3 :             );
     316               3 :             return Self::Disabled;
     317 UBC           0 :         }
     318               0 : 
     319               0 :         Self::Enabled(EnabledAzure::setup(None).await)
     320 CBC           6 :     }
     321                 : }
     322                 : 
     323                 : enum MaybeEnabledAzureWithTestBlobs {
     324                 :     Enabled(AzureWithTestBlobs),
     325                 :     Disabled,
     326                 :     UploadsFailed(anyhow::Error, AzureWithTestBlobs),
     327                 : }
     328                 : 
     329                 : struct AzureWithTestBlobs {
     330                 :     enabled: EnabledAzure,
     331                 :     remote_prefixes: HashSet<RemotePath>,
     332                 :     remote_blobs: HashSet<RemotePath>,
     333                 : }
     334                 : 
     335                 : #[async_trait::async_trait]
     336                 : impl AsyncTestContext for MaybeEnabledAzureWithTestBlobs {
     337               1 :     async fn setup() -> Self {
     338               1 :         ensure_logging_ready();
     339               1 :         if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
     340               1 :             info!(
     341               1 :                 "`{}` env variable is not set, skipping the test",
     342               1 :                 ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME
     343               1 :             );
     344               1 :             return Self::Disabled;
     345 UBC           0 :         }
     346               0 : 
     347               0 :         let max_keys_in_list_response = 10;
     348               0 :         let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
     349                 : 
     350               0 :         let enabled = EnabledAzure::setup(Some(max_keys_in_list_response)).await;
     351                 : 
     352               0 :         match upload_azure_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
     353               0 :             ControlFlow::Continue(uploads) => {
     354               0 :                 info!("Remote objects created successfully");
     355                 : 
     356               0 :                 Self::Enabled(AzureWithTestBlobs {
     357               0 :                     enabled,
     358               0 :                     remote_prefixes: uploads.prefixes,
     359               0 :                     remote_blobs: uploads.blobs,
     360               0 :                 })
     361                 :             }
     362               0 :             ControlFlow::Break(uploads) => Self::UploadsFailed(
     363               0 :                 anyhow::anyhow!("One or multiple blobs failed to upload to Azure"),
     364               0 :                 AzureWithTestBlobs {
     365               0 :                     enabled,
     366               0 :                     remote_prefixes: uploads.prefixes,
     367               0 :                     remote_blobs: uploads.blobs,
     368               0 :                 },
     369               0 :             ),
     370                 :         }
     371 CBC           2 :     }
     372                 : 
     373               1 :     async fn teardown(self) {
     374               1 :         match self {
     375               1 :             Self::Disabled => {}
     376 UBC           0 :             Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
     377               0 :                 cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
     378                 :             }
     379                 :         }
     380 CBC           1 :     }
     381                 : }
     382                 : 
     383                 : // NOTE: the setups for the list_prefixes test and the list_files test are very similar
     384                 : // However, they are not idential. The list_prefixes function is concerned with listing prefixes,
     385                 : // whereas the list_files function is concerned with listing files.
     386                 : // See `RemoteStorage::list_files` documentation for more details
     387                 : enum MaybeEnabledAzureWithSimpleTestBlobs {
     388                 :     Enabled(AzureWithSimpleTestBlobs),
     389                 :     Disabled,
     390                 :     UploadsFailed(anyhow::Error, AzureWithSimpleTestBlobs),
     391                 : }
     392                 : struct AzureWithSimpleTestBlobs {
     393                 :     enabled: EnabledAzure,
     394                 :     remote_blobs: HashSet<RemotePath>,
     395                 : }
     396                 : 
     397                 : #[async_trait::async_trait]
     398                 : impl AsyncTestContext for MaybeEnabledAzureWithSimpleTestBlobs {
     399               1 :     async fn setup() -> Self {
     400               1 :         ensure_logging_ready();
     401               1 :         if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
     402               1 :             info!(
     403               1 :                 "`{}` env variable is not set, skipping the test",
     404               1 :                 ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME
     405               1 :             );
     406               1 :             return Self::Disabled;
     407 UBC           0 :         }
     408               0 : 
     409               0 :         let max_keys_in_list_response = 10;
     410               0 :         let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
     411                 : 
     412               0 :         let enabled = EnabledAzure::setup(Some(max_keys_in_list_response)).await;
     413                 : 
     414               0 :         match upload_simple_azure_data(&enabled.client, upload_tasks_count).await {
     415               0 :             ControlFlow::Continue(uploads) => {
     416               0 :                 info!("Remote objects created successfully");
     417                 : 
     418               0 :                 Self::Enabled(AzureWithSimpleTestBlobs {
     419               0 :                     enabled,
     420               0 :                     remote_blobs: uploads,
     421               0 :                 })
     422                 :             }
     423               0 :             ControlFlow::Break(uploads) => Self::UploadsFailed(
     424               0 :                 anyhow::anyhow!("One or multiple blobs failed to upload to Azure"),
     425               0 :                 AzureWithSimpleTestBlobs {
     426               0 :                     enabled,
     427               0 :                     remote_blobs: uploads,
     428               0 :                 },
     429               0 :             ),
     430                 :         }
     431 CBC           2 :     }
     432                 : 
     433               1 :     async fn teardown(self) {
     434               1 :         match self {
     435               1 :             Self::Disabled => {}
     436 UBC           0 :             Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
     437               0 :                 cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
     438                 :             }
     439                 :         }
     440 CBC           1 :     }
     441                 : }
     442                 : 
     443 UBC           0 : fn create_azure_client(
     444               0 :     max_keys_per_list_response: Option<i32>,
     445               0 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
     446                 :     use rand::Rng;
     447                 : 
     448               0 :     let remote_storage_azure_container = env::var("REMOTE_STORAGE_AZURE_CONTAINER").context(
     449               0 :         "`REMOTE_STORAGE_AZURE_CONTAINER` env var is not set, but real Azure tests are enabled",
     450               0 :     )?;
     451               0 :     let remote_storage_azure_region = env::var("REMOTE_STORAGE_AZURE_REGION").context(
     452               0 :         "`REMOTE_STORAGE_AZURE_REGION` env var is not set, but real Azure tests are enabled",
     453               0 :     )?;
     454                 : 
     455                 :     // due to how time works, we've had test runners use the same nanos as bucket prefixes.
     456                 :     // millis is just a debugging aid for easier finding the prefix later.
     457               0 :     let millis = std::time::SystemTime::now()
     458               0 :         .duration_since(UNIX_EPOCH)
     459               0 :         .context("random Azure test prefix part calculation")?
     460               0 :         .as_millis();
     461               0 : 
     462               0 :     // because nanos can be the same for two threads so can millis, add randomness
     463               0 :     let random = rand::thread_rng().gen::<u32>();
     464               0 : 
     465               0 :     let remote_storage_config = RemoteStorageConfig {
     466               0 :         max_concurrent_syncs: NonZeroUsize::new(100).unwrap(),
     467               0 :         max_sync_errors: NonZeroU32::new(5).unwrap(),
     468               0 :         storage: RemoteStorageKind::AzureContainer(AzureConfig {
     469               0 :             container_name: remote_storage_azure_container,
     470               0 :             container_region: remote_storage_azure_region,
     471               0 :             prefix_in_container: Some(format!("test_{millis}_{random:08x}/")),
     472               0 :             concurrency_limit: NonZeroUsize::new(100).unwrap(),
     473               0 :             max_keys_per_list_response,
     474               0 :         }),
     475               0 :     };
     476               0 :     Ok(Arc::new(
     477               0 :         GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
     478                 :     ))
     479               0 : }
     480                 : 
     481                 : struct Uploads {
     482                 :     prefixes: HashSet<RemotePath>,
     483                 :     blobs: HashSet<RemotePath>,
     484                 : }
     485                 : 
     486               0 : async fn upload_azure_data(
     487               0 :     client: &Arc<GenericRemoteStorage>,
     488               0 :     base_prefix_str: &'static str,
     489               0 :     upload_tasks_count: usize,
     490               0 : ) -> ControlFlow<Uploads, Uploads> {
     491               0 :     info!("Creating {upload_tasks_count} Azure files");
     492               0 :     let mut upload_tasks = JoinSet::new();
     493               0 :     for i in 1..upload_tasks_count + 1 {
     494               0 :         let task_client = Arc::clone(client);
     495               0 :         upload_tasks.spawn(async move {
     496               0 :             let prefix = format!("{base_prefix_str}/sub_prefix_{i}/");
     497               0 :             let blob_prefix = RemotePath::new(Utf8Path::new(&prefix))
     498               0 :                 .with_context(|| format!("{prefix:?} to RemotePath conversion"))?;
     499               0 :             let blob_path = blob_prefix.join(Utf8Path::new(&format!("blob_{i}")));
     500               0 :             debug!("Creating remote item {i} at path {blob_path:?}");
     501                 : 
     502               0 :             let data = format!("remote blob data {i}").into_bytes();
     503               0 :             let data_len = data.len();
     504               0 :             task_client
     505               0 :                 .upload(std::io::Cursor::new(data), data_len, &blob_path, None)
     506               0 :                 .await?;
     507                 : 
     508               0 :             Ok::<_, anyhow::Error>((blob_prefix, blob_path))
     509               0 :         });
     510               0 :     }
     511                 : 
     512               0 :     let mut upload_tasks_failed = false;
     513               0 :     let mut uploaded_prefixes = HashSet::with_capacity(upload_tasks_count);
     514               0 :     let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
     515               0 :     while let Some(task_run_result) = upload_tasks.join_next().await {
     516               0 :         match task_run_result
     517               0 :             .context("task join failed")
     518               0 :             .and_then(|task_result| task_result.context("upload task failed"))
     519                 :         {
     520               0 :             Ok((upload_prefix, upload_path)) => {
     521               0 :                 uploaded_prefixes.insert(upload_prefix);
     522               0 :                 uploaded_blobs.insert(upload_path);
     523               0 :             }
     524               0 :             Err(e) => {
     525               0 :                 error!("Upload task failed: {e:?}");
     526               0 :                 upload_tasks_failed = true;
     527                 :             }
     528                 :         }
     529                 :     }
     530                 : 
     531               0 :     let uploads = Uploads {
     532               0 :         prefixes: uploaded_prefixes,
     533               0 :         blobs: uploaded_blobs,
     534               0 :     };
     535               0 :     if upload_tasks_failed {
     536               0 :         ControlFlow::Break(uploads)
     537                 :     } else {
     538               0 :         ControlFlow::Continue(uploads)
     539                 :     }
     540               0 : }
     541                 : 
     542               0 : async fn cleanup(client: &Arc<GenericRemoteStorage>, objects_to_delete: HashSet<RemotePath>) {
     543               0 :     info!(
     544               0 :         "Removing {} objects from the remote storage during cleanup",
     545               0 :         objects_to_delete.len()
     546               0 :     );
     547               0 :     let mut delete_tasks = JoinSet::new();
     548               0 :     for object_to_delete in objects_to_delete {
     549               0 :         let task_client = Arc::clone(client);
     550               0 :         delete_tasks.spawn(async move {
     551               0 :             debug!("Deleting remote item at path {object_to_delete:?}");
     552               0 :             task_client
     553               0 :                 .delete(&object_to_delete)
     554               0 :                 .await
     555               0 :                 .with_context(|| format!("{object_to_delete:?} removal"))
     556               0 :         });
     557               0 :     }
     558                 : 
     559               0 :     while let Some(task_run_result) = delete_tasks.join_next().await {
     560               0 :         match task_run_result {
     561               0 :             Ok(task_result) => match task_result {
     562               0 :                 Ok(()) => {}
     563               0 :                 Err(e) => error!("Delete task failed: {e:?}"),
     564                 :             },
     565               0 :             Err(join_err) => error!("Delete task did not finish correctly: {join_err}"),
     566                 :         }
     567                 :     }
     568               0 : }
     569                 : 
     570                 : // Uploads files `folder{j}/blob{i}.txt`. See test description for more details.
     571               0 : async fn upload_simple_azure_data(
     572               0 :     client: &Arc<GenericRemoteStorage>,
     573               0 :     upload_tasks_count: usize,
     574               0 : ) -> ControlFlow<HashSet<RemotePath>, HashSet<RemotePath>> {
     575               0 :     info!("Creating {upload_tasks_count} Azure files");
     576               0 :     let mut upload_tasks = JoinSet::new();
     577               0 :     for i in 1..upload_tasks_count + 1 {
     578               0 :         let task_client = Arc::clone(client);
     579               0 :         upload_tasks.spawn(async move {
     580               0 :             let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i));
     581               0 :             let blob_path = RemotePath::new(
     582               0 :                 Utf8Path::from_path(blob_path.as_path()).expect("must be valid blob path"),
     583               0 :             )
     584               0 :             .with_context(|| format!("{blob_path:?} to RemotePath conversion"))?;
     585               0 :             debug!("Creating remote item {i} at path {blob_path:?}");
     586                 : 
     587               0 :             let data = format!("remote blob data {i}").into_bytes();
     588               0 :             let data_len = data.len();
     589               0 :             task_client
     590               0 :                 .upload(std::io::Cursor::new(data), data_len, &blob_path, None)
     591               0 :                 .await?;
     592                 : 
     593               0 :             Ok::<_, anyhow::Error>(blob_path)
     594               0 :         });
     595               0 :     }
     596                 : 
     597               0 :     let mut upload_tasks_failed = false;
     598               0 :     let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
     599               0 :     while let Some(task_run_result) = upload_tasks.join_next().await {
     600               0 :         match task_run_result
     601               0 :             .context("task join failed")
     602               0 :             .and_then(|task_result| task_result.context("upload task failed"))
     603                 :         {
     604               0 :             Ok(upload_path) => {
     605               0 :                 uploaded_blobs.insert(upload_path);
     606               0 :             }
     607               0 :             Err(e) => {
     608               0 :                 error!("Upload task failed: {e:?}");
     609               0 :                 upload_tasks_failed = true;
     610                 :             }
     611                 :         }
     612                 :     }
     613                 : 
     614               0 :     if upload_tasks_failed {
     615               0 :         ControlFlow::Break(uploaded_blobs)
     616                 :     } else {
     617               0 :         ControlFlow::Continue(uploaded_blobs)
     618                 :     }
     619               0 : }
        

Generated by: LCOV version 2.1-beta