Line data Source code
1 : use std::env;
2 : use std::num::NonZeroUsize;
3 : use std::ops::ControlFlow;
4 : use std::sync::Arc;
5 : use std::time::UNIX_EPOCH;
6 : use std::{collections::HashSet, time::Duration};
7 :
8 : use anyhow::Context;
9 : use remote_storage::{
10 : AzureConfig, GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind,
11 : };
12 : use test_context::AsyncTestContext;
13 : use tracing::info;
14 :
15 : mod common;
16 :
17 : #[path = "common/tests.rs"]
18 : mod tests_azure;
19 :
20 : use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
21 :
22 : const ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_AZURE_REMOTE_STORAGE";
23 :
24 : const BASE_PREFIX: &str = "test";
25 :
26 : struct EnabledAzure {
27 : client: Arc<GenericRemoteStorage>,
28 : base_prefix: &'static str,
29 : }
30 :
31 : impl EnabledAzure {
32 10 : async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
33 10 : let client = create_azure_client(max_keys_in_list_response)
34 0 : .await
35 10 : .context("Azure client creation")
36 10 : .expect("Azure client creation failed");
37 10 :
38 10 : EnabledAzure {
39 10 : client,
40 10 : base_prefix: BASE_PREFIX,
41 10 : }
42 10 : }
43 :
44 : #[allow(unused)] // this will be needed when moving the timeout integration tests back
45 0 : fn configure_request_timeout(&mut self, timeout: Duration) {
46 0 : match Arc::get_mut(&mut self.client).expect("outer Arc::get_mut") {
47 0 : GenericRemoteStorage::AzureBlob(azure) => {
48 0 : let azure = Arc::get_mut(azure).expect("inner Arc::get_mut");
49 0 : azure.timeout = timeout;
50 0 : }
51 0 : _ => unreachable!(),
52 : }
53 0 : }
54 : }
55 :
56 : enum MaybeEnabledStorage {
57 : Enabled(EnabledAzure),
58 : Disabled,
59 : }
60 :
61 : impl AsyncTestContext for MaybeEnabledStorage {
62 18 : async fn setup() -> Self {
63 18 : ensure_logging_ready();
64 18 :
65 18 : if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
66 12 : info!(
67 0 : "`{}` env variable is not set, skipping the test",
68 : ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME
69 : );
70 12 : return Self::Disabled;
71 6 : }
72 6 :
73 6 : Self::Enabled(EnabledAzure::setup(None).await)
74 18 : }
75 : }
76 :
77 : enum MaybeEnabledStorageWithTestBlobs {
78 : Enabled(AzureWithTestBlobs),
79 : Disabled,
80 : UploadsFailed(anyhow::Error, AzureWithTestBlobs),
81 : }
82 :
83 : struct AzureWithTestBlobs {
84 : enabled: EnabledAzure,
85 : remote_prefixes: HashSet<RemotePath>,
86 : remote_blobs: HashSet<RemotePath>,
87 : }
88 :
89 : impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
90 3 : async fn setup() -> Self {
91 3 : ensure_logging_ready();
92 3 : if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
93 2 : info!(
94 0 : "`{}` env variable is not set, skipping the test",
95 : ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME
96 : );
97 2 : return Self::Disabled;
98 1 : }
99 1 :
100 1 : let max_keys_in_list_response = 10;
101 1 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
102 :
103 1 : let enabled = EnabledAzure::setup(Some(max_keys_in_list_response)).await;
104 :
105 17 : match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
106 1 : ControlFlow::Continue(uploads) => {
107 1 : info!("Remote objects created successfully");
108 :
109 1 : Self::Enabled(AzureWithTestBlobs {
110 1 : enabled,
111 1 : remote_prefixes: uploads.prefixes,
112 1 : remote_blobs: uploads.blobs,
113 1 : })
114 : }
115 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
116 0 : anyhow::anyhow!("One or multiple blobs failed to upload to Azure"),
117 0 : AzureWithTestBlobs {
118 0 : enabled,
119 0 : remote_prefixes: uploads.prefixes,
120 0 : remote_blobs: uploads.blobs,
121 0 : },
122 0 : ),
123 : }
124 3 : }
125 :
126 3 : async fn teardown(self) {
127 3 : match self {
128 2 : Self::Disabled => {}
129 1 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
130 20 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
131 : }
132 : }
133 3 : }
134 : }
135 :
136 : enum MaybeEnabledStorageWithSimpleTestBlobs {
137 : Enabled(AzureWithSimpleTestBlobs),
138 : Disabled,
139 : UploadsFailed(anyhow::Error, AzureWithSimpleTestBlobs),
140 : }
141 : struct AzureWithSimpleTestBlobs {
142 : enabled: EnabledAzure,
143 : remote_blobs: HashSet<RemotePath>,
144 : }
145 :
146 : impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
147 9 : async fn setup() -> Self {
148 9 : ensure_logging_ready();
149 9 : if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
150 6 : info!(
151 0 : "`{}` env variable is not set, skipping the test",
152 : ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME
153 : );
154 6 : return Self::Disabled;
155 3 : }
156 3 :
157 3 : let max_keys_in_list_response = 10;
158 3 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
159 :
160 3 : let enabled = EnabledAzure::setup(Some(max_keys_in_list_response)).await;
161 :
162 57 : match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
163 3 : ControlFlow::Continue(uploads) => {
164 3 : info!("Remote objects created successfully");
165 :
166 3 : Self::Enabled(AzureWithSimpleTestBlobs {
167 3 : enabled,
168 3 : remote_blobs: uploads,
169 3 : })
170 : }
171 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
172 0 : anyhow::anyhow!("One or multiple blobs failed to upload to Azure"),
173 0 : AzureWithSimpleTestBlobs {
174 0 : enabled,
175 0 : remote_blobs: uploads,
176 0 : },
177 0 : ),
178 : }
179 9 : }
180 :
181 9 : async fn teardown(self) {
182 9 : match self {
183 6 : Self::Disabled => {}
184 3 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
185 59 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
186 : }
187 : }
188 9 : }
189 : }
190 :
191 10 : async fn create_azure_client(
192 10 : max_keys_per_list_response: Option<i32>,
193 10 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
194 : use rand::Rng;
195 :
196 10 : let remote_storage_azure_container = env::var("REMOTE_STORAGE_AZURE_CONTAINER").context(
197 10 : "`REMOTE_STORAGE_AZURE_CONTAINER` env var is not set, but real Azure tests are enabled",
198 10 : )?;
199 10 : let remote_storage_azure_region = env::var("REMOTE_STORAGE_AZURE_REGION").context(
200 10 : "`REMOTE_STORAGE_AZURE_REGION` env var is not set, but real Azure tests are enabled",
201 10 : )?;
202 :
203 : // due to how time works, we've had test runners use the same nanos as bucket prefixes.
204 : // millis is just a debugging aid for easier finding the prefix later.
205 10 : let millis = std::time::SystemTime::now()
206 10 : .duration_since(UNIX_EPOCH)
207 10 : .context("random Azure test prefix part calculation")?
208 10 : .as_millis();
209 10 :
210 10 : // because nanos can be the same for two threads so can millis, add randomness
211 10 : let random = rand::thread_rng().gen::<u32>();
212 10 :
213 10 : let remote_storage_config = RemoteStorageConfig {
214 10 : storage: RemoteStorageKind::AzureContainer(AzureConfig {
215 10 : container_name: remote_storage_azure_container,
216 10 : storage_account: None,
217 10 : container_region: remote_storage_azure_region,
218 10 : prefix_in_container: Some(format!("test_{millis}_{random:08x}/")),
219 10 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
220 10 : max_keys_per_list_response,
221 10 : }),
222 10 : timeout: Duration::from_secs(120),
223 10 : };
224 10 : Ok(Arc::new(
225 10 : GenericRemoteStorage::from_config(&remote_storage_config)
226 0 : .await
227 10 : .context("remote storage init")?,
228 : ))
229 10 : }
|