Line data Source code
1 : use std::env;
2 : use std::num::NonZeroUsize;
3 : use std::ops::ControlFlow;
4 : use std::sync::Arc;
5 : use std::time::UNIX_EPOCH;
6 : use std::{collections::HashSet, time::Duration};
7 :
8 : use anyhow::Context;
9 : use remote_storage::{
10 : AzureConfig, GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind,
11 : };
12 : use test_context::AsyncTestContext;
13 : use tracing::info;
14 :
15 : mod common;
16 :
17 : #[path = "common/tests.rs"]
18 : mod tests_azure;
19 :
20 : use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
21 :
22 : const ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_AZURE_REMOTE_STORAGE";
23 :
24 : const BASE_PREFIX: &str = "test";
25 :
26 : struct EnabledAzure {
27 : client: Arc<GenericRemoteStorage>,
28 : base_prefix: &'static str,
29 : }
30 :
31 : impl EnabledAzure {
32 0 : async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
33 0 : let client = create_azure_client(max_keys_in_list_response)
34 0 : .context("Azure client creation")
35 0 : .expect("Azure client creation failed");
36 0 :
37 0 : EnabledAzure {
38 0 : client,
39 0 : base_prefix: BASE_PREFIX,
40 0 : }
41 0 : }
42 :
43 : #[allow(unused)] // this will be needed when moving the timeout integration tests back
44 0 : fn configure_request_timeout(&mut self, timeout: Duration) {
45 0 : match Arc::get_mut(&mut self.client).expect("outer Arc::get_mut") {
46 0 : GenericRemoteStorage::AzureBlob(azure) => {
47 0 : let azure = Arc::get_mut(azure).expect("inner Arc::get_mut");
48 0 : azure.timeout = timeout;
49 0 : }
50 0 : _ => unreachable!(),
51 : }
52 0 : }
53 : }
54 :
55 : enum MaybeEnabledStorage {
56 : Enabled(EnabledAzure),
57 : Disabled,
58 : }
59 :
60 : #[async_trait::async_trait]
61 : impl AsyncTestContext for MaybeEnabledStorage {
62 8 : async fn setup() -> Self {
63 8 : ensure_logging_ready();
64 8 :
65 8 : if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
66 8 : info!(
67 8 : "`{}` env variable is not set, skipping the test",
68 8 : ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME
69 8 : );
70 8 : return Self::Disabled;
71 0 : }
72 0 :
73 0 : Self::Enabled(EnabledAzure::setup(None).await)
74 24 : }
75 : }
76 :
77 : enum MaybeEnabledStorageWithTestBlobs {
78 : Enabled(AzureWithTestBlobs),
79 : Disabled,
80 : UploadsFailed(anyhow::Error, AzureWithTestBlobs),
81 : }
82 :
83 : struct AzureWithTestBlobs {
84 : enabled: EnabledAzure,
85 : remote_prefixes: HashSet<RemotePath>,
86 : remote_blobs: HashSet<RemotePath>,
87 : }
88 :
89 : #[async_trait::async_trait]
90 : impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
91 2 : async fn setup() -> Self {
92 2 : ensure_logging_ready();
93 2 : if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
94 2 : info!(
95 2 : "`{}` env variable is not set, skipping the test",
96 2 : ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME
97 2 : );
98 2 : return Self::Disabled;
99 0 : }
100 0 :
101 0 : let max_keys_in_list_response = 10;
102 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
103 :
104 0 : let enabled = EnabledAzure::setup(Some(max_keys_in_list_response)).await;
105 :
106 0 : match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
107 0 : ControlFlow::Continue(uploads) => {
108 0 : info!("Remote objects created successfully");
109 :
110 0 : Self::Enabled(AzureWithTestBlobs {
111 0 : enabled,
112 0 : remote_prefixes: uploads.prefixes,
113 0 : remote_blobs: uploads.blobs,
114 0 : })
115 : }
116 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
117 0 : anyhow::anyhow!("One or multiple blobs failed to upload to Azure"),
118 0 : AzureWithTestBlobs {
119 0 : enabled,
120 0 : remote_prefixes: uploads.prefixes,
121 0 : remote_blobs: uploads.blobs,
122 0 : },
123 0 : ),
124 : }
125 6 : }
126 :
127 2 : async fn teardown(self) {
128 2 : match self {
129 2 : Self::Disabled => {}
130 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
131 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
132 : }
133 : }
134 4 : }
135 : }
136 :
137 : // NOTE: the setups for the list_prefixes test and the list_files test are very similar
138 : // However, they are not idential. The list_prefixes function is concerned with listing prefixes,
139 : // whereas the list_files function is concerned with listing files.
140 : // See `RemoteStorage::list_files` documentation for more details
141 : enum MaybeEnabledStorageWithSimpleTestBlobs {
142 : Enabled(AzureWithSimpleTestBlobs),
143 : Disabled,
144 : UploadsFailed(anyhow::Error, AzureWithSimpleTestBlobs),
145 : }
146 : struct AzureWithSimpleTestBlobs {
147 : enabled: EnabledAzure,
148 : remote_blobs: HashSet<RemotePath>,
149 : }
150 :
151 : #[async_trait::async_trait]
152 : impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
153 2 : async fn setup() -> Self {
154 2 : ensure_logging_ready();
155 2 : if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
156 2 : info!(
157 2 : "`{}` env variable is not set, skipping the test",
158 2 : ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME
159 2 : );
160 2 : return Self::Disabled;
161 0 : }
162 0 :
163 0 : let max_keys_in_list_response = 10;
164 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
165 :
166 0 : let enabled = EnabledAzure::setup(Some(max_keys_in_list_response)).await;
167 :
168 0 : match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
169 0 : ControlFlow::Continue(uploads) => {
170 0 : info!("Remote objects created successfully");
171 :
172 0 : Self::Enabled(AzureWithSimpleTestBlobs {
173 0 : enabled,
174 0 : remote_blobs: uploads,
175 0 : })
176 : }
177 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
178 0 : anyhow::anyhow!("One or multiple blobs failed to upload to Azure"),
179 0 : AzureWithSimpleTestBlobs {
180 0 : enabled,
181 0 : remote_blobs: uploads,
182 0 : },
183 0 : ),
184 : }
185 6 : }
186 :
187 2 : async fn teardown(self) {
188 2 : match self {
189 2 : Self::Disabled => {}
190 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
191 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
192 : }
193 : }
194 4 : }
195 : }
196 :
197 0 : fn create_azure_client(
198 0 : max_keys_per_list_response: Option<i32>,
199 0 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
200 : use rand::Rng;
201 :
202 0 : let remote_storage_azure_container = env::var("REMOTE_STORAGE_AZURE_CONTAINER").context(
203 0 : "`REMOTE_STORAGE_AZURE_CONTAINER` env var is not set, but real Azure tests are enabled",
204 0 : )?;
205 0 : let remote_storage_azure_region = env::var("REMOTE_STORAGE_AZURE_REGION").context(
206 0 : "`REMOTE_STORAGE_AZURE_REGION` env var is not set, but real Azure tests are enabled",
207 0 : )?;
208 :
209 : // due to how time works, we've had test runners use the same nanos as bucket prefixes.
210 : // millis is just a debugging aid for easier finding the prefix later.
211 0 : let millis = std::time::SystemTime::now()
212 0 : .duration_since(UNIX_EPOCH)
213 0 : .context("random Azure test prefix part calculation")?
214 0 : .as_millis();
215 0 :
216 0 : // because nanos can be the same for two threads so can millis, add randomness
217 0 : let random = rand::thread_rng().gen::<u32>();
218 0 :
219 0 : let remote_storage_config = RemoteStorageConfig {
220 0 : storage: RemoteStorageKind::AzureContainer(AzureConfig {
221 0 : container_name: remote_storage_azure_container,
222 0 : container_region: remote_storage_azure_region,
223 0 : prefix_in_container: Some(format!("test_{millis}_{random:08x}/")),
224 0 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
225 0 : max_keys_per_list_response,
226 0 : }),
227 0 : timeout: Duration::from_secs(120),
228 0 : };
229 0 : Ok(Arc::new(
230 0 : GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
231 : ))
232 0 : }
|