|             Line data    Source code 
       1              : use std::collections::HashSet;
       2              : use std::env;
       3              : use std::num::NonZeroUsize;
       4              : use std::ops::ControlFlow;
       5              : use std::sync::Arc;
       6              : use std::time::{Duration, UNIX_EPOCH};
       7              : 
       8              : use anyhow::Context;
       9              : use remote_storage::{
      10              :     AzureConfig, GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind,
      11              : };
      12              : use test_context::AsyncTestContext;
      13              : use tracing::info;
      14              : 
      15              : mod common;
      16              : 
      17              : #[path = "common/tests.rs"]
      18              : mod tests_azure;
      19              : 
      20              : use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
      21              : 
      22              : const ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_AZURE_REMOTE_STORAGE";
      23              : 
      24              : const BASE_PREFIX: &str = "test";
      25              : 
      26              : struct EnabledAzure {
      27              :     client: Arc<GenericRemoteStorage>,
      28              :     base_prefix: &'static str,
      29              : }
      30              : 
      31              : impl EnabledAzure {
      32           10 :     async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
      33           10 :         let client = create_azure_client(max_keys_in_list_response)
      34           10 :             .await
      35           10 :             .context("Azure client creation")
      36           10 :             .expect("Azure client creation failed");
      37              : 
      38           10 :         EnabledAzure {
      39           10 :             client,
      40           10 :             base_prefix: BASE_PREFIX,
      41           10 :         }
      42           10 :     }
      43              : 
      44              :     #[allow(unused)] // this will be needed when moving the timeout integration tests back
      45            0 :     fn configure_request_timeout(&mut self, timeout: Duration) {
      46            0 :         match Arc::get_mut(&mut self.client).expect("outer Arc::get_mut") {
      47            0 :             GenericRemoteStorage::AzureBlob(azure) => {
      48            0 :                 let azure = Arc::get_mut(azure).expect("inner Arc::get_mut");
      49            0 :                 azure.timeout = timeout;
      50            0 :             }
      51            0 :             _ => unreachable!(),
      52              :         }
      53            0 :     }
      54              : }
      55              : 
      56              : enum MaybeEnabledStorage {
      57              :     Enabled(EnabledAzure),
      58              :     Disabled,
      59              : }
      60              : 
      61              : impl AsyncTestContext for MaybeEnabledStorage {
      62           18 :     async fn setup() -> Self {
      63           18 :         ensure_logging_ready();
      64              : 
      65           18 :         if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
      66           12 :             info!(
      67            0 :                 "`{}` env variable is not set, skipping the test",
      68              :                 ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME
      69              :             );
      70           12 :             return Self::Disabled;
      71            6 :         }
      72              : 
      73            6 :         Self::Enabled(EnabledAzure::setup(None).await)
      74           18 :     }
      75              : }
      76              : 
      77              : enum MaybeEnabledStorageWithTestBlobs {
      78              :     Enabled(AzureWithTestBlobs),
      79              :     Disabled,
      80              :     UploadsFailed(anyhow::Error, AzureWithTestBlobs),
      81              : }
      82              : 
      83              : struct AzureWithTestBlobs {
      84              :     enabled: EnabledAzure,
      85              :     remote_prefixes: HashSet<RemotePath>,
      86              :     remote_blobs: HashSet<RemotePath>,
      87              : }
      88              : 
      89              : impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
      90            3 :     async fn setup() -> Self {
      91            3 :         ensure_logging_ready();
      92            3 :         if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
      93            2 :             info!(
      94            0 :                 "`{}` env variable is not set, skipping the test",
      95              :                 ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME
      96              :             );
      97            2 :             return Self::Disabled;
      98            1 :         }
      99              : 
     100            1 :         let max_keys_in_list_response = 10;
     101            1 :         let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
     102              : 
     103            1 :         let enabled = EnabledAzure::setup(Some(max_keys_in_list_response)).await;
     104              : 
     105            1 :         match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
     106            1 :             ControlFlow::Continue(uploads) => {
     107            1 :                 info!("Remote objects created successfully");
     108              : 
     109            1 :                 Self::Enabled(AzureWithTestBlobs {
     110            1 :                     enabled,
     111            1 :                     remote_prefixes: uploads.prefixes,
     112            1 :                     remote_blobs: uploads.blobs,
     113            1 :                 })
     114              :             }
     115            0 :             ControlFlow::Break(uploads) => Self::UploadsFailed(
     116            0 :                 anyhow::anyhow!("One or multiple blobs failed to upload to Azure"),
     117            0 :                 AzureWithTestBlobs {
     118            0 :                     enabled,
     119            0 :                     remote_prefixes: uploads.prefixes,
     120            0 :                     remote_blobs: uploads.blobs,
     121            0 :                 },
     122            0 :             ),
     123              :         }
     124            3 :     }
     125              : 
     126            3 :     async fn teardown(self) {
     127            3 :         match self {
     128            2 :             Self::Disabled => {}
     129            1 :             Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
     130            1 :                 cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
     131              :             }
     132              :         }
     133            3 :     }
     134              : }
     135              : 
     136              : enum MaybeEnabledStorageWithSimpleTestBlobs {
     137              :     Enabled(AzureWithSimpleTestBlobs),
     138              :     Disabled,
     139              :     UploadsFailed(anyhow::Error, AzureWithSimpleTestBlobs),
     140              : }
     141              : struct AzureWithSimpleTestBlobs {
     142              :     enabled: EnabledAzure,
     143              :     remote_blobs: HashSet<RemotePath>,
     144              : }
     145              : 
     146              : impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
     147            9 :     async fn setup() -> Self {
     148            9 :         ensure_logging_ready();
     149            9 :         if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
     150            6 :             info!(
     151            0 :                 "`{}` env variable is not set, skipping the test",
     152              :                 ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME
     153              :             );
     154            6 :             return Self::Disabled;
     155            3 :         }
     156              : 
     157            3 :         let max_keys_in_list_response = 10;
     158            3 :         let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
     159              : 
     160            3 :         let enabled = EnabledAzure::setup(Some(max_keys_in_list_response)).await;
     161              : 
     162            3 :         match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
     163            3 :             ControlFlow::Continue(uploads) => {
     164            3 :                 info!("Remote objects created successfully");
     165              : 
     166            3 :                 Self::Enabled(AzureWithSimpleTestBlobs {
     167            3 :                     enabled,
     168            3 :                     remote_blobs: uploads,
     169            3 :                 })
     170              :             }
     171            0 :             ControlFlow::Break(uploads) => Self::UploadsFailed(
     172            0 :                 anyhow::anyhow!("One or multiple blobs failed to upload to Azure"),
     173            0 :                 AzureWithSimpleTestBlobs {
     174            0 :                     enabled,
     175            0 :                     remote_blobs: uploads,
     176            0 :                 },
     177            0 :             ),
     178              :         }
     179            9 :     }
     180              : 
     181            9 :     async fn teardown(self) {
     182            9 :         match self {
     183            6 :             Self::Disabled => {}
     184            3 :             Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
     185            3 :                 cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
     186              :             }
     187              :         }
     188            9 :     }
     189              : }
     190              : 
     191           10 : async fn create_azure_client(
     192           10 :     max_keys_per_list_response: Option<i32>,
     193           10 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
     194              :     use rand::Rng;
     195              : 
     196           10 :     let remote_storage_azure_container = env::var("REMOTE_STORAGE_AZURE_CONTAINER").context(
     197              :         "`REMOTE_STORAGE_AZURE_CONTAINER` env var is not set, but real Azure tests are enabled",
     198            0 :     )?;
     199           10 :     let remote_storage_azure_region = env::var("REMOTE_STORAGE_AZURE_REGION").context(
     200              :         "`REMOTE_STORAGE_AZURE_REGION` env var is not set, but real Azure tests are enabled",
     201            0 :     )?;
     202              : 
     203              :     // due to how time works, we've had test runners use the same nanos as bucket prefixes.
     204              :     // millis is just a debugging aid for easier finding the prefix later.
     205           10 :     let millis = std::time::SystemTime::now()
     206           10 :         .duration_since(UNIX_EPOCH)
     207           10 :         .context("random Azure test prefix part calculation")?
     208           10 :         .as_millis();
     209              : 
     210              :     // because nanos can be the same for two threads so can millis, add randomness
     211           10 :     let random = rand::rng().random::<u32>();
     212              : 
     213           10 :     let remote_storage_config = RemoteStorageConfig {
     214           10 :         storage: RemoteStorageKind::AzureContainer(AzureConfig {
     215           10 :             container_name: remote_storage_azure_container,
     216           10 :             storage_account: None,
     217           10 :             container_region: remote_storage_azure_region,
     218           10 :             prefix_in_container: Some(format!("test_{millis}_{random:08x}/")),
     219           10 :             concurrency_limit: NonZeroUsize::new(100).unwrap(),
     220           10 :             max_keys_per_list_response,
     221           10 :             conn_pool_size: 8,
     222           10 :             /* BEGIN_HADRON */
     223           10 :             put_block_size_mb: Some(1),
     224           10 :             /* END_HADRON */
     225           10 :         }),
     226           10 :         timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
     227           10 :         small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
     228           10 :     };
     229           10 :     Ok(Arc::new(
     230           10 :         GenericRemoteStorage::from_config(&remote_storage_config)
     231           10 :             .await
     232           10 :             .context("remote storage init")?,
     233              :     ))
     234           10 : }
         |