TLA Line data Source code
1 : use std::collections::HashSet;
2 : use std::env;
3 : use std::num::{NonZeroU32, NonZeroUsize};
4 : use std::ops::ControlFlow;
5 : use std::path::PathBuf;
6 : use std::sync::Arc;
7 : use std::time::UNIX_EPOCH;
8 :
9 : use anyhow::Context;
10 : use camino::Utf8Path;
11 : use once_cell::sync::OnceCell;
12 : use remote_storage::{
13 : AzureConfig, Download, GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind,
14 : };
15 : use test_context::{test_context, AsyncTestContext};
16 : use tokio::task::JoinSet;
17 : use tracing::{debug, error, info};
18 :
19 : static LOGGING_DONE: OnceCell<()> = OnceCell::new();
20 :
21 : const ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_AZURE_REMOTE_STORAGE";
22 :
23 : const BASE_PREFIX: &str = "test";
24 :
25 : /// Tests that the Azure client can list all prefixes, even if the response comes paginated and requires multiple HTTP queries.
26 : /// Uses real Azure and requires [`ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME`] and related Azure cred env vars specified.
27 : /// See the client creation in [`create_azure_client`] for details on the required env vars.
28 : /// If real Azure tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the
29 : /// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details.
30 : ///
31 : /// First, the test creates a set of Azure blobs with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_azure_data`]
32 : /// where
33 : /// * `random_prefix_part` is set for the entire Azure client during the Azure client creation in [`create_azure_client`], to avoid multiple test runs interference
34 : /// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket
35 : ///
36 : /// Then, verifies that the client does return correct prefixes when queried:
37 : /// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only
38 : /// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}`
39 : ///
40 : /// With the real Azure enabled and `#[cfg(test)]` Rust configuration used, the Azure client test adds a `max-keys` param to limit the response keys.
41 : /// This way, we are able to test the pagination implicitly, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to Azure.
42 : ///
43 : /// Lastly, the test attempts to clean up and remove all uploaded Azure files.
44 : /// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished.
45 CBC 2 : #[test_context(MaybeEnabledAzureWithTestBlobs)]
46 1 : #[tokio::test]
47 : async fn azure_pagination_should_work(
48 : ctx: &mut MaybeEnabledAzureWithTestBlobs,
49 2 : ) -> anyhow::Result<()> {
50 1 : let ctx = match ctx {
51 UBC 0 : MaybeEnabledAzureWithTestBlobs::Enabled(ctx) => ctx,
52 CBC 1 : MaybeEnabledAzureWithTestBlobs::Disabled => return Ok(()),
53 UBC 0 : MaybeEnabledAzureWithTestBlobs::UploadsFailed(e, _) => {
54 0 : anyhow::bail!("Azure init failed: {e:?}")
55 : }
56 : };
57 :
58 0 : let test_client = Arc::clone(&ctx.enabled.client);
59 0 : let expected_remote_prefixes = ctx.remote_prefixes.clone();
60 :
61 0 : let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix))
62 0 : .context("common_prefix construction")?;
63 0 : let root_remote_prefixes = test_client
64 0 : .list_prefixes(None)
65 0 : .await
66 0 : .context("client list root prefixes failure")?
67 0 : .into_iter()
68 0 : .collect::<HashSet<_>>();
69 0 : assert_eq!(
70 0 : root_remote_prefixes, HashSet::from([base_prefix.clone()]),
71 0 : "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}"
72 : );
73 :
74 0 : let nested_remote_prefixes = test_client
75 0 : .list_prefixes(Some(&base_prefix))
76 0 : .await
77 0 : .context("client list nested prefixes failure")?
78 0 : .into_iter()
79 0 : .collect::<HashSet<_>>();
80 0 : let remote_only_prefixes = nested_remote_prefixes
81 0 : .difference(&expected_remote_prefixes)
82 0 : .collect::<HashSet<_>>();
83 0 : let missing_uploaded_prefixes = expected_remote_prefixes
84 0 : .difference(&nested_remote_prefixes)
85 0 : .collect::<HashSet<_>>();
86 0 : assert_eq!(
87 0 : remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
88 0 : "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
89 : );
90 :
91 0 : Ok(())
92 CBC 1 : }
93 :
94 : /// Tests that Azure client can list all files in a folder, even if the response comes paginated and requirees multiple Azure queries.
95 : /// Uses real Azure and requires [`ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME`] and related Azure cred env vars specified. Test will skip real code and pass if env vars not set.
96 : /// See `Azure_pagination_should_work` for more information.
97 : ///
98 : /// First, create a set of Azure objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_azure_data`]
99 : /// Then performs the following queries:
100 : /// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
101 : /// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt`
102 2 : #[test_context(MaybeEnabledAzureWithSimpleTestBlobs)]
103 1 : #[tokio::test]
104 : async fn azure_list_files_works(
105 : ctx: &mut MaybeEnabledAzureWithSimpleTestBlobs,
106 2 : ) -> anyhow::Result<()> {
107 1 : let ctx = match ctx {
108 UBC 0 : MaybeEnabledAzureWithSimpleTestBlobs::Enabled(ctx) => ctx,
109 CBC 1 : MaybeEnabledAzureWithSimpleTestBlobs::Disabled => return Ok(()),
110 UBC 0 : MaybeEnabledAzureWithSimpleTestBlobs::UploadsFailed(e, _) => {
111 0 : anyhow::bail!("Azure init failed: {e:?}")
112 : }
113 : };
114 0 : let test_client = Arc::clone(&ctx.enabled.client);
115 0 : let base_prefix =
116 0 : RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?;
117 0 : let root_files = test_client
118 0 : .list_files(None)
119 0 : .await
120 0 : .context("client list root files failure")?
121 0 : .into_iter()
122 0 : .collect::<HashSet<_>>();
123 0 : assert_eq!(
124 0 : root_files,
125 0 : ctx.remote_blobs.clone(),
126 0 : "remote storage list_files on root mismatches with the uploads."
127 : );
128 0 : let nested_remote_files = test_client
129 0 : .list_files(Some(&base_prefix))
130 0 : .await
131 0 : .context("client list nested files failure")?
132 0 : .into_iter()
133 0 : .collect::<HashSet<_>>();
134 0 : let trim_remote_blobs: HashSet<_> = ctx
135 0 : .remote_blobs
136 0 : .iter()
137 0 : .map(|x| x.get_path())
138 0 : .filter(|x| x.starts_with("folder1"))
139 0 : .map(|x| RemotePath::new(x).expect("must be valid path"))
140 0 : .collect();
141 : assert_eq!(
142 : nested_remote_files, trim_remote_blobs,
143 0 : "remote storage list_files on subdirrectory mismatches with the uploads."
144 : );
145 0 : Ok(())
146 CBC 1 : }
147 :
148 2 : #[test_context(MaybeEnabledAzure)]
149 1 : #[tokio::test]
150 1 : async fn azure_delete_non_exising_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Result<()> {
151 1 : let ctx = match ctx {
152 UBC 0 : MaybeEnabledAzure::Enabled(ctx) => ctx,
153 CBC 1 : MaybeEnabledAzure::Disabled => return Ok(()),
154 : };
155 :
156 UBC 0 : let path = RemotePath::new(Utf8Path::new(
157 0 : format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(),
158 0 : ))
159 0 : .with_context(|| "RemotePath conversion")?;
160 :
161 0 : ctx.client.delete(&path).await.expect("should succeed");
162 0 :
163 0 : Ok(())
164 CBC 1 : }
165 :
166 2 : #[test_context(MaybeEnabledAzure)]
167 1 : #[tokio::test]
168 2 : async fn azure_delete_objects_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Result<()> {
169 1 : let ctx = match ctx {
170 UBC 0 : MaybeEnabledAzure::Enabled(ctx) => ctx,
171 CBC 1 : MaybeEnabledAzure::Disabled => return Ok(()),
172 : };
173 :
174 UBC 0 : let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
175 0 : .with_context(|| "RemotePath conversion")?;
176 :
177 0 : let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
178 0 : .with_context(|| "RemotePath conversion")?;
179 :
180 0 : let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
181 0 : .with_context(|| "RemotePath conversion")?;
182 :
183 0 : let data1 = "remote blob data1".as_bytes();
184 0 : let data1_len = data1.len();
185 0 : let data2 = "remote blob data2".as_bytes();
186 0 : let data2_len = data2.len();
187 0 : let data3 = "remote blob data3".as_bytes();
188 0 : let data3_len = data3.len();
189 0 : ctx.client
190 0 : .upload(std::io::Cursor::new(data1), data1_len, &path1, None)
191 0 : .await?;
192 :
193 0 : ctx.client
194 0 : .upload(std::io::Cursor::new(data2), data2_len, &path2, None)
195 0 : .await?;
196 :
197 0 : ctx.client
198 0 : .upload(std::io::Cursor::new(data3), data3_len, &path3, None)
199 0 : .await?;
200 :
201 0 : ctx.client.delete_objects(&[path1, path2]).await?;
202 :
203 0 : let prefixes = ctx.client.list_prefixes(None).await?;
204 :
205 0 : assert_eq!(prefixes.len(), 1);
206 :
207 0 : ctx.client.delete_objects(&[path3]).await?;
208 :
209 0 : Ok(())
210 CBC 1 : }
211 :
212 2 : #[test_context(MaybeEnabledAzure)]
213 1 : #[tokio::test]
214 2 : async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Result<()> {
215 1 : let MaybeEnabledAzure::Enabled(ctx) = ctx else {
216 1 : return Ok(());
217 : };
218 :
219 UBC 0 : let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))
220 0 : .with_context(|| "RemotePath conversion")?;
221 :
222 0 : let data = "remote blob data here".as_bytes();
223 0 : let data_len = data.len() as u64;
224 0 :
225 0 : ctx.client
226 0 : .upload(std::io::Cursor::new(data), data.len(), &path, None)
227 0 : .await?;
228 :
229 0 : async fn download_and_compare(mut dl: Download) -> anyhow::Result<Vec<u8>> {
230 0 : let mut buf = Vec::new();
231 0 : tokio::io::copy(&mut dl.download_stream, &mut buf).await?;
232 0 : Ok(buf)
233 0 : }
234 : // Normal download request
235 0 : let dl = ctx.client.download(&path).await?;
236 0 : let buf = download_and_compare(dl).await?;
237 0 : assert_eq!(buf, data);
238 :
239 : // Full range (end specified)
240 0 : let dl = ctx
241 0 : .client
242 0 : .download_byte_range(&path, 0, Some(data_len))
243 0 : .await?;
244 0 : let buf = download_and_compare(dl).await?;
245 0 : assert_eq!(buf, data);
246 :
247 : // partial range (end specified)
248 0 : let dl = ctx.client.download_byte_range(&path, 4, Some(10)).await?;
249 0 : let buf = download_and_compare(dl).await?;
250 0 : assert_eq!(buf, data[4..10]);
251 :
252 : // partial range (end beyond real end)
253 0 : let dl = ctx
254 0 : .client
255 0 : .download_byte_range(&path, 8, Some(data_len * 100))
256 0 : .await?;
257 0 : let buf = download_and_compare(dl).await?;
258 0 : assert_eq!(buf, data[8..]);
259 :
260 : // Partial range (end unspecified)
261 0 : let dl = ctx.client.download_byte_range(&path, 4, None).await?;
262 0 : let buf = download_and_compare(dl).await?;
263 0 : assert_eq!(buf, data[4..]);
264 :
265 : // Full range (end unspecified)
266 0 : let dl = ctx.client.download_byte_range(&path, 0, None).await?;
267 0 : let buf = download_and_compare(dl).await?;
268 0 : assert_eq!(buf, data);
269 :
270 0 : Ok(())
271 CBC 1 : }
272 :
273 5 : fn ensure_logging_ready() {
274 5 : LOGGING_DONE.get_or_init(|| {
275 1 : utils::logging::init(
276 1 : utils::logging::LogFormat::Test,
277 1 : utils::logging::TracingErrorLayerEnablement::Disabled,
278 1 : )
279 1 : .expect("logging init failed");
280 5 : });
281 5 : }
282 :
283 : struct EnabledAzure {
284 : client: Arc<GenericRemoteStorage>,
285 : base_prefix: &'static str,
286 : }
287 :
288 : impl EnabledAzure {
289 UBC 0 : async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
290 0 : let client = create_azure_client(max_keys_in_list_response)
291 0 : .context("Azure client creation")
292 0 : .expect("Azure client creation failed");
293 0 :
294 0 : EnabledAzure {
295 0 : client,
296 0 : base_prefix: BASE_PREFIX,
297 0 : }
298 0 : }
299 : }
300 :
301 : enum MaybeEnabledAzure {
302 : Enabled(EnabledAzure),
303 : Disabled,
304 : }
305 :
306 : #[async_trait::async_trait]
307 : impl AsyncTestContext for MaybeEnabledAzure {
308 CBC 3 : async fn setup() -> Self {
309 3 : ensure_logging_ready();
310 3 :
311 3 : if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
312 3 : info!(
313 3 : "`{}` env variable is not set, skipping the test",
314 3 : ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME
315 3 : );
316 3 : return Self::Disabled;
317 UBC 0 : }
318 0 :
319 0 : Self::Enabled(EnabledAzure::setup(None).await)
320 CBC 6 : }
321 : }
322 :
323 : enum MaybeEnabledAzureWithTestBlobs {
324 : Enabled(AzureWithTestBlobs),
325 : Disabled,
326 : UploadsFailed(anyhow::Error, AzureWithTestBlobs),
327 : }
328 :
329 : struct AzureWithTestBlobs {
330 : enabled: EnabledAzure,
331 : remote_prefixes: HashSet<RemotePath>,
332 : remote_blobs: HashSet<RemotePath>,
333 : }
334 :
335 : #[async_trait::async_trait]
336 : impl AsyncTestContext for MaybeEnabledAzureWithTestBlobs {
337 1 : async fn setup() -> Self {
338 1 : ensure_logging_ready();
339 1 : if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
340 1 : info!(
341 1 : "`{}` env variable is not set, skipping the test",
342 1 : ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME
343 1 : );
344 1 : return Self::Disabled;
345 UBC 0 : }
346 0 :
347 0 : let max_keys_in_list_response = 10;
348 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
349 :
350 0 : let enabled = EnabledAzure::setup(Some(max_keys_in_list_response)).await;
351 :
352 0 : match upload_azure_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
353 0 : ControlFlow::Continue(uploads) => {
354 0 : info!("Remote objects created successfully");
355 :
356 0 : Self::Enabled(AzureWithTestBlobs {
357 0 : enabled,
358 0 : remote_prefixes: uploads.prefixes,
359 0 : remote_blobs: uploads.blobs,
360 0 : })
361 : }
362 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
363 0 : anyhow::anyhow!("One or multiple blobs failed to upload to Azure"),
364 0 : AzureWithTestBlobs {
365 0 : enabled,
366 0 : remote_prefixes: uploads.prefixes,
367 0 : remote_blobs: uploads.blobs,
368 0 : },
369 0 : ),
370 : }
371 CBC 2 : }
372 :
373 1 : async fn teardown(self) {
374 1 : match self {
375 1 : Self::Disabled => {}
376 UBC 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
377 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
378 : }
379 : }
380 CBC 1 : }
381 : }
382 :
383 : // NOTE: the setups for the list_prefixes test and the list_files test are very similar
384 : // However, they are not idential. The list_prefixes function is concerned with listing prefixes,
385 : // whereas the list_files function is concerned with listing files.
386 : // See `RemoteStorage::list_files` documentation for more details
387 : enum MaybeEnabledAzureWithSimpleTestBlobs {
388 : Enabled(AzureWithSimpleTestBlobs),
389 : Disabled,
390 : UploadsFailed(anyhow::Error, AzureWithSimpleTestBlobs),
391 : }
392 : struct AzureWithSimpleTestBlobs {
393 : enabled: EnabledAzure,
394 : remote_blobs: HashSet<RemotePath>,
395 : }
396 :
397 : #[async_trait::async_trait]
398 : impl AsyncTestContext for MaybeEnabledAzureWithSimpleTestBlobs {
399 1 : async fn setup() -> Self {
400 1 : ensure_logging_ready();
401 1 : if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
402 1 : info!(
403 1 : "`{}` env variable is not set, skipping the test",
404 1 : ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME
405 1 : );
406 1 : return Self::Disabled;
407 UBC 0 : }
408 0 :
409 0 : let max_keys_in_list_response = 10;
410 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
411 :
412 0 : let enabled = EnabledAzure::setup(Some(max_keys_in_list_response)).await;
413 :
414 0 : match upload_simple_azure_data(&enabled.client, upload_tasks_count).await {
415 0 : ControlFlow::Continue(uploads) => {
416 0 : info!("Remote objects created successfully");
417 :
418 0 : Self::Enabled(AzureWithSimpleTestBlobs {
419 0 : enabled,
420 0 : remote_blobs: uploads,
421 0 : })
422 : }
423 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
424 0 : anyhow::anyhow!("One or multiple blobs failed to upload to Azure"),
425 0 : AzureWithSimpleTestBlobs {
426 0 : enabled,
427 0 : remote_blobs: uploads,
428 0 : },
429 0 : ),
430 : }
431 CBC 2 : }
432 :
433 1 : async fn teardown(self) {
434 1 : match self {
435 1 : Self::Disabled => {}
436 UBC 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
437 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
438 : }
439 : }
440 CBC 1 : }
441 : }
442 :
443 UBC 0 : fn create_azure_client(
444 0 : max_keys_per_list_response: Option<i32>,
445 0 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
446 : use rand::Rng;
447 :
448 0 : let remote_storage_azure_container = env::var("REMOTE_STORAGE_AZURE_CONTAINER").context(
449 0 : "`REMOTE_STORAGE_AZURE_CONTAINER` env var is not set, but real Azure tests are enabled",
450 0 : )?;
451 0 : let remote_storage_azure_region = env::var("REMOTE_STORAGE_AZURE_REGION").context(
452 0 : "`REMOTE_STORAGE_AZURE_REGION` env var is not set, but real Azure tests are enabled",
453 0 : )?;
454 :
455 : // due to how time works, we've had test runners use the same nanos as bucket prefixes.
456 : // millis is just a debugging aid for easier finding the prefix later.
457 0 : let millis = std::time::SystemTime::now()
458 0 : .duration_since(UNIX_EPOCH)
459 0 : .context("random Azure test prefix part calculation")?
460 0 : .as_millis();
461 0 :
462 0 : // because nanos can be the same for two threads so can millis, add randomness
463 0 : let random = rand::thread_rng().gen::<u32>();
464 0 :
465 0 : let remote_storage_config = RemoteStorageConfig {
466 0 : max_concurrent_syncs: NonZeroUsize::new(100).unwrap(),
467 0 : max_sync_errors: NonZeroU32::new(5).unwrap(),
468 0 : storage: RemoteStorageKind::AzureContainer(AzureConfig {
469 0 : container_name: remote_storage_azure_container,
470 0 : container_region: remote_storage_azure_region,
471 0 : prefix_in_container: Some(format!("test_{millis}_{random:08x}/")),
472 0 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
473 0 : max_keys_per_list_response,
474 0 : }),
475 0 : };
476 0 : Ok(Arc::new(
477 0 : GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
478 : ))
479 0 : }
480 :
481 : struct Uploads {
482 : prefixes: HashSet<RemotePath>,
483 : blobs: HashSet<RemotePath>,
484 : }
485 :
486 0 : async fn upload_azure_data(
487 0 : client: &Arc<GenericRemoteStorage>,
488 0 : base_prefix_str: &'static str,
489 0 : upload_tasks_count: usize,
490 0 : ) -> ControlFlow<Uploads, Uploads> {
491 0 : info!("Creating {upload_tasks_count} Azure files");
492 0 : let mut upload_tasks = JoinSet::new();
493 0 : for i in 1..upload_tasks_count + 1 {
494 0 : let task_client = Arc::clone(client);
495 0 : upload_tasks.spawn(async move {
496 0 : let prefix = format!("{base_prefix_str}/sub_prefix_{i}/");
497 0 : let blob_prefix = RemotePath::new(Utf8Path::new(&prefix))
498 0 : .with_context(|| format!("{prefix:?} to RemotePath conversion"))?;
499 0 : let blob_path = blob_prefix.join(Utf8Path::new(&format!("blob_{i}")));
500 0 : debug!("Creating remote item {i} at path {blob_path:?}");
501 :
502 0 : let data = format!("remote blob data {i}").into_bytes();
503 0 : let data_len = data.len();
504 0 : task_client
505 0 : .upload(std::io::Cursor::new(data), data_len, &blob_path, None)
506 0 : .await?;
507 :
508 0 : Ok::<_, anyhow::Error>((blob_prefix, blob_path))
509 0 : });
510 0 : }
511 :
512 0 : let mut upload_tasks_failed = false;
513 0 : let mut uploaded_prefixes = HashSet::with_capacity(upload_tasks_count);
514 0 : let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
515 0 : while let Some(task_run_result) = upload_tasks.join_next().await {
516 0 : match task_run_result
517 0 : .context("task join failed")
518 0 : .and_then(|task_result| task_result.context("upload task failed"))
519 : {
520 0 : Ok((upload_prefix, upload_path)) => {
521 0 : uploaded_prefixes.insert(upload_prefix);
522 0 : uploaded_blobs.insert(upload_path);
523 0 : }
524 0 : Err(e) => {
525 0 : error!("Upload task failed: {e:?}");
526 0 : upload_tasks_failed = true;
527 : }
528 : }
529 : }
530 :
531 0 : let uploads = Uploads {
532 0 : prefixes: uploaded_prefixes,
533 0 : blobs: uploaded_blobs,
534 0 : };
535 0 : if upload_tasks_failed {
536 0 : ControlFlow::Break(uploads)
537 : } else {
538 0 : ControlFlow::Continue(uploads)
539 : }
540 0 : }
541 :
542 0 : async fn cleanup(client: &Arc<GenericRemoteStorage>, objects_to_delete: HashSet<RemotePath>) {
543 0 : info!(
544 0 : "Removing {} objects from the remote storage during cleanup",
545 0 : objects_to_delete.len()
546 0 : );
547 0 : let mut delete_tasks = JoinSet::new();
548 0 : for object_to_delete in objects_to_delete {
549 0 : let task_client = Arc::clone(client);
550 0 : delete_tasks.spawn(async move {
551 0 : debug!("Deleting remote item at path {object_to_delete:?}");
552 0 : task_client
553 0 : .delete(&object_to_delete)
554 0 : .await
555 0 : .with_context(|| format!("{object_to_delete:?} removal"))
556 0 : });
557 0 : }
558 :
559 0 : while let Some(task_run_result) = delete_tasks.join_next().await {
560 0 : match task_run_result {
561 0 : Ok(task_result) => match task_result {
562 0 : Ok(()) => {}
563 0 : Err(e) => error!("Delete task failed: {e:?}"),
564 : },
565 0 : Err(join_err) => error!("Delete task did not finish correctly: {join_err}"),
566 : }
567 : }
568 0 : }
569 :
570 : // Uploads files `folder{j}/blob{i}.txt`. See test description for more details.
571 0 : async fn upload_simple_azure_data(
572 0 : client: &Arc<GenericRemoteStorage>,
573 0 : upload_tasks_count: usize,
574 0 : ) -> ControlFlow<HashSet<RemotePath>, HashSet<RemotePath>> {
575 0 : info!("Creating {upload_tasks_count} Azure files");
576 0 : let mut upload_tasks = JoinSet::new();
577 0 : for i in 1..upload_tasks_count + 1 {
578 0 : let task_client = Arc::clone(client);
579 0 : upload_tasks.spawn(async move {
580 0 : let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i));
581 0 : let blob_path = RemotePath::new(
582 0 : Utf8Path::from_path(blob_path.as_path()).expect("must be valid blob path"),
583 0 : )
584 0 : .with_context(|| format!("{blob_path:?} to RemotePath conversion"))?;
585 0 : debug!("Creating remote item {i} at path {blob_path:?}");
586 :
587 0 : let data = format!("remote blob data {i}").into_bytes();
588 0 : let data_len = data.len();
589 0 : task_client
590 0 : .upload(std::io::Cursor::new(data), data_len, &blob_path, None)
591 0 : .await?;
592 :
593 0 : Ok::<_, anyhow::Error>(blob_path)
594 0 : });
595 0 : }
596 :
597 0 : let mut upload_tasks_failed = false;
598 0 : let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
599 0 : while let Some(task_run_result) = upload_tasks.join_next().await {
600 0 : match task_run_result
601 0 : .context("task join failed")
602 0 : .and_then(|task_result| task_result.context("upload task failed"))
603 : {
604 0 : Ok(upload_path) => {
605 0 : uploaded_blobs.insert(upload_path);
606 0 : }
607 0 : Err(e) => {
608 0 : error!("Upload task failed: {e:?}");
609 0 : upload_tasks_failed = true;
610 : }
611 : }
612 : }
613 :
614 0 : if upload_tasks_failed {
615 0 : ControlFlow::Break(uploaded_blobs)
616 : } else {
617 0 : ControlFlow::Continue(uploaded_blobs)
618 : }
619 0 : }
|