TLA Line data Source code
1 : use std::collections::HashSet;
2 : use std::env;
3 : use std::num::NonZeroUsize;
4 : use std::ops::ControlFlow;
5 : use std::sync::Arc;
6 : use std::time::UNIX_EPOCH;
7 :
8 : use anyhow::Context;
9 : use camino::Utf8Path;
10 : use remote_storage::{
11 : AzureConfig, GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind,
12 : };
13 : use test_context::{test_context, AsyncTestContext};
14 : use tracing::{debug, info};
15 :
16 : mod common;
17 :
18 : use common::{
19 : cleanup, download_to_vec, ensure_logging_ready, upload_remote_data, upload_simple_remote_data,
20 : upload_stream, wrap_stream,
21 : };
22 :
23 : const ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_AZURE_REMOTE_STORAGE";
24 :
25 : const BASE_PREFIX: &str = "test";
26 :
27 : /// Tests that the Azure client can list all prefixes, even if the response comes paginated and requires multiple HTTP queries.
28 : /// Uses real Azure and requires [`ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME`] and related Azure cred env vars specified.
29 : /// See the client creation in [`create_azure_client`] for details on the required env vars.
30 : /// If real Azure tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the
31 : /// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details.
32 : ///
33 : /// First, the test creates a set of Azure blobs with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_remote_data`]
34 : /// where
35 : /// * `random_prefix_part` is set for the entire Azure client during the Azure client creation in [`create_azure_client`], to avoid multiple test runs interference
36 : /// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket
37 : ///
38 : /// Then, verifies that the client does return correct prefixes when queried:
39 : /// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only
40 : /// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}`
41 : ///
42 : /// With the real Azure enabled and `#[cfg(test)]` Rust configuration used, the Azure client test adds a `max-keys` param to limit the response keys.
43 : /// This way, we are able to test the pagination implicitly, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to Azure.
44 : ///
45 : /// Lastly, the test attempts to clean up and remove all uploaded Azure files.
46 : /// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished.
47 CBC 1 : #[test_context(MaybeEnabledAzureWithTestBlobs)]
48 1 : #[tokio::test]
49 : async fn azure_pagination_should_work(
50 : ctx: &mut MaybeEnabledAzureWithTestBlobs,
51 1 : ) -> anyhow::Result<()> {
52 1 : let ctx = match ctx {
53 UBC 0 : MaybeEnabledAzureWithTestBlobs::Enabled(ctx) => ctx,
54 CBC 1 : MaybeEnabledAzureWithTestBlobs::Disabled => return Ok(()),
55 UBC 0 : MaybeEnabledAzureWithTestBlobs::UploadsFailed(e, _) => {
56 0 : anyhow::bail!("Azure init failed: {e:?}")
57 : }
58 : };
59 :
60 0 : let test_client = Arc::clone(&ctx.enabled.client);
61 0 : let expected_remote_prefixes = ctx.remote_prefixes.clone();
62 :
63 0 : let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix))
64 0 : .context("common_prefix construction")?;
65 0 : let root_remote_prefixes = test_client
66 0 : .list_prefixes(None)
67 0 : .await
68 0 : .context("client list root prefixes failure")?
69 0 : .into_iter()
70 0 : .collect::<HashSet<_>>();
71 0 : assert_eq!(
72 0 : root_remote_prefixes, HashSet::from([base_prefix.clone()]),
73 0 : "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}"
74 : );
75 :
76 0 : let nested_remote_prefixes = test_client
77 0 : .list_prefixes(Some(&base_prefix))
78 0 : .await
79 0 : .context("client list nested prefixes failure")?
80 0 : .into_iter()
81 0 : .collect::<HashSet<_>>();
82 0 : let remote_only_prefixes = nested_remote_prefixes
83 0 : .difference(&expected_remote_prefixes)
84 0 : .collect::<HashSet<_>>();
85 0 : let missing_uploaded_prefixes = expected_remote_prefixes
86 0 : .difference(&nested_remote_prefixes)
87 0 : .collect::<HashSet<_>>();
88 0 : assert_eq!(
89 0 : remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
90 0 : "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
91 : );
92 :
93 0 : Ok(())
94 CBC 1 : }
95 :
96 : /// Tests that Azure client can list all files in a folder, even if the response comes paginated and requirees multiple Azure queries.
97 : /// Uses real Azure and requires [`ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME`] and related Azure cred env vars specified. Test will skip real code and pass if env vars not set.
98 : /// See `Azure_pagination_should_work` for more information.
99 : ///
100 : /// First, create a set of Azure objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`]
101 : /// Then performs the following queries:
102 : /// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
103 : /// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt`
104 1 : #[test_context(MaybeEnabledAzureWithSimpleTestBlobs)]
105 1 : #[tokio::test]
106 : async fn azure_list_files_works(
107 : ctx: &mut MaybeEnabledAzureWithSimpleTestBlobs,
108 1 : ) -> anyhow::Result<()> {
109 1 : let ctx = match ctx {
110 UBC 0 : MaybeEnabledAzureWithSimpleTestBlobs::Enabled(ctx) => ctx,
111 CBC 1 : MaybeEnabledAzureWithSimpleTestBlobs::Disabled => return Ok(()),
112 UBC 0 : MaybeEnabledAzureWithSimpleTestBlobs::UploadsFailed(e, _) => {
113 0 : anyhow::bail!("Azure init failed: {e:?}")
114 : }
115 : };
116 0 : let test_client = Arc::clone(&ctx.enabled.client);
117 0 : let base_prefix =
118 0 : RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?;
119 0 : let root_files = test_client
120 0 : .list_files(None)
121 0 : .await
122 0 : .context("client list root files failure")?
123 0 : .into_iter()
124 0 : .collect::<HashSet<_>>();
125 0 : assert_eq!(
126 0 : root_files,
127 0 : ctx.remote_blobs.clone(),
128 0 : "remote storage list_files on root mismatches with the uploads."
129 : );
130 0 : let nested_remote_files = test_client
131 0 : .list_files(Some(&base_prefix))
132 0 : .await
133 0 : .context("client list nested files failure")?
134 0 : .into_iter()
135 0 : .collect::<HashSet<_>>();
136 0 : let trim_remote_blobs: HashSet<_> = ctx
137 0 : .remote_blobs
138 0 : .iter()
139 0 : .map(|x| x.get_path())
140 0 : .filter(|x| x.starts_with("folder1"))
141 0 : .map(|x| RemotePath::new(x).expect("must be valid path"))
142 0 : .collect();
143 : assert_eq!(
144 : nested_remote_files, trim_remote_blobs,
145 0 : "remote storage list_files on subdirrectory mismatches with the uploads."
146 : );
147 0 : Ok(())
148 CBC 1 : }
149 :
150 1 : #[test_context(MaybeEnabledAzure)]
151 1 : #[tokio::test]
152 1 : async fn azure_delete_non_exising_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Result<()> {
153 1 : let ctx = match ctx {
154 UBC 0 : MaybeEnabledAzure::Enabled(ctx) => ctx,
155 CBC 1 : MaybeEnabledAzure::Disabled => return Ok(()),
156 : };
157 :
158 UBC 0 : let path = RemotePath::new(Utf8Path::new(
159 0 : format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(),
160 0 : ))
161 0 : .with_context(|| "RemotePath conversion")?;
162 :
163 0 : ctx.client.delete(&path).await.expect("should succeed");
164 0 :
165 0 : Ok(())
166 CBC 1 : }
167 :
168 1 : #[test_context(MaybeEnabledAzure)]
169 1 : #[tokio::test]
170 1 : async fn azure_delete_objects_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Result<()> {
171 1 : let ctx = match ctx {
172 UBC 0 : MaybeEnabledAzure::Enabled(ctx) => ctx,
173 CBC 1 : MaybeEnabledAzure::Disabled => return Ok(()),
174 : };
175 :
176 UBC 0 : let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
177 0 : .with_context(|| "RemotePath conversion")?;
178 :
179 0 : let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
180 0 : .with_context(|| "RemotePath conversion")?;
181 :
182 0 : let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
183 0 : .with_context(|| "RemotePath conversion")?;
184 :
185 0 : let (data, len) = upload_stream("remote blob data1".as_bytes().into());
186 0 : ctx.client.upload(data, len, &path1, None).await?;
187 :
188 0 : let (data, len) = upload_stream("remote blob data2".as_bytes().into());
189 0 : ctx.client.upload(data, len, &path2, None).await?;
190 :
191 0 : let (data, len) = upload_stream("remote blob data3".as_bytes().into());
192 0 : ctx.client.upload(data, len, &path3, None).await?;
193 :
194 0 : ctx.client.delete_objects(&[path1, path2]).await?;
195 :
196 0 : let prefixes = ctx.client.list_prefixes(None).await?;
197 :
198 0 : assert_eq!(prefixes.len(), 1);
199 :
200 0 : ctx.client.delete_objects(&[path3]).await?;
201 :
202 0 : Ok(())
203 CBC 1 : }
204 :
205 1 : #[test_context(MaybeEnabledAzure)]
206 1 : #[tokio::test]
207 1 : async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Result<()> {
208 1 : let MaybeEnabledAzure::Enabled(ctx) = ctx else {
209 1 : return Ok(());
210 : };
211 :
212 UBC 0 : let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))
213 0 : .with_context(|| "RemotePath conversion")?;
214 :
215 0 : let orig = bytes::Bytes::from_static("remote blob data here".as_bytes());
216 0 :
217 0 : let (data, len) = wrap_stream(orig.clone());
218 0 :
219 0 : ctx.client.upload(data, len, &path, None).await?;
220 :
221 : // Normal download request
222 0 : let dl = ctx.client.download(&path).await?;
223 0 : let buf = download_to_vec(dl).await?;
224 0 : assert_eq!(&buf, &orig);
225 :
226 : // Full range (end specified)
227 0 : let dl = ctx
228 0 : .client
229 0 : .download_byte_range(&path, 0, Some(len as u64))
230 0 : .await?;
231 0 : let buf = download_to_vec(dl).await?;
232 0 : assert_eq!(&buf, &orig);
233 :
234 : // partial range (end specified)
235 0 : let dl = ctx.client.download_byte_range(&path, 4, Some(10)).await?;
236 0 : let buf = download_to_vec(dl).await?;
237 0 : assert_eq!(&buf, &orig[4..10]);
238 :
239 : // partial range (end beyond real end)
240 0 : let dl = ctx
241 0 : .client
242 0 : .download_byte_range(&path, 8, Some(len as u64 * 100))
243 0 : .await?;
244 0 : let buf = download_to_vec(dl).await?;
245 0 : assert_eq!(&buf, &orig[8..]);
246 :
247 : // Partial range (end unspecified)
248 0 : let dl = ctx.client.download_byte_range(&path, 4, None).await?;
249 0 : let buf = download_to_vec(dl).await?;
250 0 : assert_eq!(&buf, &orig[4..]);
251 :
252 : // Full range (end unspecified)
253 0 : let dl = ctx.client.download_byte_range(&path, 0, None).await?;
254 0 : let buf = download_to_vec(dl).await?;
255 0 : assert_eq!(&buf, &orig);
256 :
257 0 : debug!("Cleanup: deleting file at path {path:?}");
258 0 : ctx.client
259 0 : .delete(&path)
260 0 : .await
261 0 : .with_context(|| format!("{path:?} removal"))?;
262 :
263 0 : Ok(())
264 CBC 1 : }
265 :
266 : struct EnabledAzure {
267 : client: Arc<GenericRemoteStorage>,
268 : base_prefix: &'static str,
269 : }
270 :
271 : impl EnabledAzure {
272 UBC 0 : async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
273 0 : let client = create_azure_client(max_keys_in_list_response)
274 0 : .context("Azure client creation")
275 0 : .expect("Azure client creation failed");
276 0 :
277 0 : EnabledAzure {
278 0 : client,
279 0 : base_prefix: BASE_PREFIX,
280 0 : }
281 0 : }
282 : }
283 :
284 : enum MaybeEnabledAzure {
285 : Enabled(EnabledAzure),
286 : Disabled,
287 : }
288 :
289 : #[async_trait::async_trait]
290 : impl AsyncTestContext for MaybeEnabledAzure {
291 CBC 3 : async fn setup() -> Self {
292 3 : ensure_logging_ready();
293 3 :
294 3 : if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
295 3 : info!(
296 3 : "`{}` env variable is not set, skipping the test",
297 3 : ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME
298 3 : );
299 3 : return Self::Disabled;
300 UBC 0 : }
301 0 :
302 0 : Self::Enabled(EnabledAzure::setup(None).await)
303 CBC 6 : }
304 : }
305 :
306 : enum MaybeEnabledAzureWithTestBlobs {
307 : Enabled(AzureWithTestBlobs),
308 : Disabled,
309 : UploadsFailed(anyhow::Error, AzureWithTestBlobs),
310 : }
311 :
312 : struct AzureWithTestBlobs {
313 : enabled: EnabledAzure,
314 : remote_prefixes: HashSet<RemotePath>,
315 : remote_blobs: HashSet<RemotePath>,
316 : }
317 :
318 : #[async_trait::async_trait]
319 : impl AsyncTestContext for MaybeEnabledAzureWithTestBlobs {
320 1 : async fn setup() -> Self {
321 1 : ensure_logging_ready();
322 1 : if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
323 1 : info!(
324 1 : "`{}` env variable is not set, skipping the test",
325 1 : ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME
326 1 : );
327 1 : return Self::Disabled;
328 UBC 0 : }
329 0 :
330 0 : let max_keys_in_list_response = 10;
331 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
332 :
333 0 : let enabled = EnabledAzure::setup(Some(max_keys_in_list_response)).await;
334 :
335 0 : match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
336 0 : ControlFlow::Continue(uploads) => {
337 0 : info!("Remote objects created successfully");
338 :
339 0 : Self::Enabled(AzureWithTestBlobs {
340 0 : enabled,
341 0 : remote_prefixes: uploads.prefixes,
342 0 : remote_blobs: uploads.blobs,
343 0 : })
344 : }
345 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
346 0 : anyhow::anyhow!("One or multiple blobs failed to upload to Azure"),
347 0 : AzureWithTestBlobs {
348 0 : enabled,
349 0 : remote_prefixes: uploads.prefixes,
350 0 : remote_blobs: uploads.blobs,
351 0 : },
352 0 : ),
353 : }
354 CBC 2 : }
355 :
356 1 : async fn teardown(self) {
357 1 : match self {
358 1 : Self::Disabled => {}
359 UBC 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
360 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
361 : }
362 : }
363 CBC 1 : }
364 : }
365 :
366 : // NOTE: the setups for the list_prefixes test and the list_files test are very similar
367 : // However, they are not idential. The list_prefixes function is concerned with listing prefixes,
368 : // whereas the list_files function is concerned with listing files.
369 : // See `RemoteStorage::list_files` documentation for more details
370 : enum MaybeEnabledAzureWithSimpleTestBlobs {
371 : Enabled(AzureWithSimpleTestBlobs),
372 : Disabled,
373 : UploadsFailed(anyhow::Error, AzureWithSimpleTestBlobs),
374 : }
375 : struct AzureWithSimpleTestBlobs {
376 : enabled: EnabledAzure,
377 : remote_blobs: HashSet<RemotePath>,
378 : }
379 :
380 : #[async_trait::async_trait]
381 : impl AsyncTestContext for MaybeEnabledAzureWithSimpleTestBlobs {
382 1 : async fn setup() -> Self {
383 1 : ensure_logging_ready();
384 1 : if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
385 1 : info!(
386 1 : "`{}` env variable is not set, skipping the test",
387 1 : ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME
388 1 : );
389 1 : return Self::Disabled;
390 UBC 0 : }
391 0 :
392 0 : let max_keys_in_list_response = 10;
393 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
394 :
395 0 : let enabled = EnabledAzure::setup(Some(max_keys_in_list_response)).await;
396 :
397 0 : match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
398 0 : ControlFlow::Continue(uploads) => {
399 0 : info!("Remote objects created successfully");
400 :
401 0 : Self::Enabled(AzureWithSimpleTestBlobs {
402 0 : enabled,
403 0 : remote_blobs: uploads,
404 0 : })
405 : }
406 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
407 0 : anyhow::anyhow!("One or multiple blobs failed to upload to Azure"),
408 0 : AzureWithSimpleTestBlobs {
409 0 : enabled,
410 0 : remote_blobs: uploads,
411 0 : },
412 0 : ),
413 : }
414 CBC 2 : }
415 :
416 1 : async fn teardown(self) {
417 1 : match self {
418 1 : Self::Disabled => {}
419 UBC 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
420 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
421 : }
422 : }
423 CBC 1 : }
424 : }
425 :
426 UBC 0 : fn create_azure_client(
427 0 : max_keys_per_list_response: Option<i32>,
428 0 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
429 : use rand::Rng;
430 :
431 0 : let remote_storage_azure_container = env::var("REMOTE_STORAGE_AZURE_CONTAINER").context(
432 0 : "`REMOTE_STORAGE_AZURE_CONTAINER` env var is not set, but real Azure tests are enabled",
433 0 : )?;
434 0 : let remote_storage_azure_region = env::var("REMOTE_STORAGE_AZURE_REGION").context(
435 0 : "`REMOTE_STORAGE_AZURE_REGION` env var is not set, but real Azure tests are enabled",
436 0 : )?;
437 :
438 : // due to how time works, we've had test runners use the same nanos as bucket prefixes.
439 : // millis is just a debugging aid for easier finding the prefix later.
440 0 : let millis = std::time::SystemTime::now()
441 0 : .duration_since(UNIX_EPOCH)
442 0 : .context("random Azure test prefix part calculation")?
443 0 : .as_millis();
444 0 :
445 0 : // because nanos can be the same for two threads so can millis, add randomness
446 0 : let random = rand::thread_rng().gen::<u32>();
447 0 :
448 0 : let remote_storage_config = RemoteStorageConfig {
449 0 : storage: RemoteStorageKind::AzureContainer(AzureConfig {
450 0 : container_name: remote_storage_azure_container,
451 0 : container_region: remote_storage_azure_region,
452 0 : prefix_in_container: Some(format!("test_{millis}_{random:08x}/")),
453 0 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
454 0 : max_keys_per_list_response,
455 0 : }),
456 0 : };
457 0 : Ok(Arc::new(
458 0 : GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
459 : ))
460 0 : }
|