Line data Source code
1 : use std::collections::HashSet;
2 : use std::num::NonZeroU32;
3 : use std::ops::Bound;
4 : use std::sync::Arc;
5 :
6 : use anyhow::Context;
7 : use camino::Utf8Path;
8 : use futures::StreamExt;
9 : use remote_storage::{DownloadError, DownloadOpts, ListingMode, ListingObject, RemotePath};
10 : use test_context::test_context;
11 : use tokio_util::sync::CancellationToken;
12 : use tracing::debug;
13 :
14 : use super::{
15 : MaybeEnabledStorage, MaybeEnabledStorageWithSimpleTestBlobs, MaybeEnabledStorageWithTestBlobs,
16 : };
17 : use crate::common::{download_to_vec, upload_stream, wrap_stream};
18 :
19 : /// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries.
20 : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified.
21 : /// See the client creation in [`create_s3_client`] for details on the required env vars.
22 : /// If real S3 tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the
23 : /// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details.
24 : ///
25 : /// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_remote_data`]
26 : /// where
27 : /// * `random_prefix_part` is set for the entire S3 client during the S3 client creation in [`create_s3_client`], to avoid multiple test runs interference
28 : /// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket
29 : ///
30 : /// Then, verifies that the client does return correct prefixes when queried:
31 : /// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only
32 : /// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}`
33 : ///
34 : /// In the `MaybeEnabledStorageWithTestBlobs::setup`, we set the `max_keys_in_list_response` param to limit the keys in a single response.
35 : /// This way, we are able to test the pagination, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to S3,
36 : /// as the current default AWS S3 pagination limit is 1000.
37 : /// (see <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>).
38 : ///
39 : /// Lastly, the test attempts to clean up and remove all uploaded S3 files.
40 : /// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished.
41 6 : #[test_context(MaybeEnabledStorageWithTestBlobs)]
42 : #[tokio::test]
43 : async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> anyhow::Result<()> {
44 : let ctx = match ctx {
45 : MaybeEnabledStorageWithTestBlobs::Enabled(ctx) => ctx,
46 : MaybeEnabledStorageWithTestBlobs::Disabled => return Ok(()),
47 : MaybeEnabledStorageWithTestBlobs::UploadsFailed(e, _) => {
48 : anyhow::bail!("S3 init failed: {e:?}")
49 : }
50 : };
51 :
52 : let cancel = CancellationToken::new();
53 :
54 : let test_client = Arc::clone(&ctx.enabled.client);
55 : let expected_remote_prefixes = ctx.remote_prefixes.clone();
56 :
57 : let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix))
58 : .context("common_prefix construction")?;
59 : let root_remote_prefixes = test_client
60 : .list(None, ListingMode::WithDelimiter, None, &cancel)
61 : .await?
62 : .prefixes
63 : .into_iter()
64 : .collect::<HashSet<_>>();
65 : assert_eq!(
66 : root_remote_prefixes,
67 : HashSet::from([base_prefix.clone()]),
68 : "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}"
69 : );
70 :
71 : let nested_remote_prefixes = test_client
72 : .list(
73 : Some(&base_prefix.add_trailing_slash()),
74 : ListingMode::WithDelimiter,
75 : None,
76 : &cancel,
77 : )
78 : .await?
79 : .prefixes
80 : .into_iter()
81 : .collect::<HashSet<_>>();
82 : let remote_only_prefixes = nested_remote_prefixes
83 : .difference(&expected_remote_prefixes)
84 : .collect::<HashSet<_>>();
85 : let missing_uploaded_prefixes = expected_remote_prefixes
86 : .difference(&nested_remote_prefixes)
87 : .collect::<HashSet<_>>();
88 : assert_eq!(
89 : remote_only_prefixes.len() + missing_uploaded_prefixes.len(),
90 : 0,
91 : "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
92 : );
93 :
94 : // list_streaming
95 :
96 : let prefix_with_slash = base_prefix.add_trailing_slash();
97 : let mut nested_remote_prefixes_st = test_client.list_streaming(
98 : Some(&prefix_with_slash),
99 : ListingMode::WithDelimiter,
100 : None,
101 : &cancel,
102 : );
103 : let mut nested_remote_prefixes_combined = HashSet::new();
104 : let mut segments = 0;
105 : let mut segment_max_size = 0;
106 : while let Some(st) = nested_remote_prefixes_st.next().await {
107 : let st = st?;
108 : segment_max_size = segment_max_size.max(st.prefixes.len());
109 : nested_remote_prefixes_combined.extend(st.prefixes.into_iter());
110 : segments += 1;
111 : }
112 : assert!(segments > 1, "less than 2 segments: {segments}");
113 : assert!(
114 : segment_max_size * 2 <= nested_remote_prefixes_combined.len(),
115 : "double of segment_max_size={segment_max_size} larger number of remote prefixes of {}",
116 : nested_remote_prefixes_combined.len()
117 : );
118 : let remote_only_prefixes = nested_remote_prefixes_combined
119 : .difference(&expected_remote_prefixes)
120 : .collect::<HashSet<_>>();
121 : let missing_uploaded_prefixes = expected_remote_prefixes
122 : .difference(&nested_remote_prefixes_combined)
123 : .collect::<HashSet<_>>();
124 : assert_eq!(
125 : remote_only_prefixes.len() + missing_uploaded_prefixes.len(),
126 : 0,
127 : "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
128 : );
129 :
130 : Ok(())
131 : }
132 :
133 : /// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries.
134 : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set.
135 : /// See `s3_pagination_should_work` for more information.
136 : ///
137 : /// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`]
138 : /// Then performs the following queries:
139 : /// 1. `list(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
140 : /// 2. `list("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt`
141 6 : #[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
142 : #[tokio::test]
143 : async fn list_no_delimiter_works(
144 : ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs,
145 : ) -> anyhow::Result<()> {
146 : let ctx = match ctx {
147 : MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
148 : MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
149 : MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
150 : anyhow::bail!("S3 init failed: {e:?}")
151 : }
152 : };
153 : let cancel = CancellationToken::new();
154 : let test_client = Arc::clone(&ctx.enabled.client);
155 : let base_prefix =
156 : RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?;
157 : let root_files = test_client
158 : .list(None, ListingMode::NoDelimiter, None, &cancel)
159 : .await
160 : .context("client list root files failure")?
161 : .keys
162 : .into_iter()
163 63 : .map(|o| o.key)
164 : .collect::<HashSet<_>>();
165 : assert_eq!(
166 : root_files,
167 : ctx.remote_blobs.clone(),
168 : "remote storage list on root mismatches with the uploads."
169 : );
170 :
171 : // Test that max_keys limit works. In total there are about 21 files (see
172 : // upload_simple_remote_data call in test_real_s3.rs).
173 : let limited_root_files = test_client
174 : .list(
175 : None,
176 : ListingMode::NoDelimiter,
177 : Some(NonZeroU32::new(2).unwrap()),
178 : &cancel,
179 : )
180 : .await
181 : .context("client list root files failure")?;
182 : assert_eq!(limited_root_files.keys.len(), 2);
183 :
184 : let nested_remote_files = test_client
185 : .list(Some(&base_prefix), ListingMode::NoDelimiter, None, &cancel)
186 : .await
187 : .context("client list nested files failure")?
188 : .keys
189 : .into_iter()
190 21 : .map(|o| o.key)
191 : .collect::<HashSet<_>>();
192 : let trim_remote_blobs: HashSet<_> = ctx
193 : .remote_blobs
194 : .iter()
195 63 : .map(|x| x.get_path())
196 63 : .filter(|x| x.starts_with("folder1"))
197 21 : .map(|x| RemotePath::new(x).expect("must be valid path"))
198 : .collect();
199 : assert_eq!(
200 : nested_remote_files, trim_remote_blobs,
201 : "remote storage list on subdirrectory mismatches with the uploads."
202 : );
203 : Ok(())
204 : }
205 :
206 : /// Tests that giving a partial prefix returns all matches (e.g. "/foo" yields "/foobar/baz"),
207 : /// but only with NoDelimiter.
208 6 : #[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
209 : #[tokio::test]
210 : async fn list_partial_prefix(
211 : ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs,
212 : ) -> anyhow::Result<()> {
213 : let ctx = match ctx {
214 : MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
215 : MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
216 : MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
217 : anyhow::bail!("S3 init failed: {e:?}")
218 : }
219 : };
220 :
221 : let cancel = CancellationToken::new();
222 : let test_client = Arc::clone(&ctx.enabled.client);
223 :
224 : // Prefix "fold" should match all "folder{i}" directories with NoDelimiter.
225 : let objects: HashSet<_> = test_client
226 : .list(
227 : Some(&RemotePath::from_string("fold")?),
228 : ListingMode::NoDelimiter,
229 : None,
230 : &cancel,
231 : )
232 : .await?
233 : .keys
234 : .into_iter()
235 63 : .map(|o| o.key)
236 : .collect();
237 : assert_eq!(&objects, &ctx.remote_blobs);
238 :
239 : // Prefix "fold" matches nothing with WithDelimiter.
240 : let objects: HashSet<_> = test_client
241 : .list(
242 : Some(&RemotePath::from_string("fold")?),
243 : ListingMode::WithDelimiter,
244 : None,
245 : &cancel,
246 : )
247 : .await?
248 : .keys
249 : .into_iter()
250 0 : .map(|o| o.key)
251 : .collect();
252 : assert!(objects.is_empty());
253 :
254 : // Prefix "" matches everything.
255 : let objects: HashSet<_> = test_client
256 : .list(
257 : Some(&RemotePath::from_string("")?),
258 : ListingMode::NoDelimiter,
259 : None,
260 : &cancel,
261 : )
262 : .await?
263 : .keys
264 : .into_iter()
265 63 : .map(|o| o.key)
266 : .collect();
267 : assert_eq!(&objects, &ctx.remote_blobs);
268 :
269 : // Prefix "" matches nothing with WithDelimiter.
270 : let objects: HashSet<_> = test_client
271 : .list(
272 : Some(&RemotePath::from_string("")?),
273 : ListingMode::WithDelimiter,
274 : None,
275 : &cancel,
276 : )
277 : .await?
278 : .keys
279 : .into_iter()
280 0 : .map(|o| o.key)
281 : .collect();
282 : assert!(objects.is_empty());
283 :
284 : // Prefix "foo" matches nothing.
285 : let objects: HashSet<_> = test_client
286 : .list(
287 : Some(&RemotePath::from_string("foo")?),
288 : ListingMode::NoDelimiter,
289 : None,
290 : &cancel,
291 : )
292 : .await?
293 : .keys
294 : .into_iter()
295 0 : .map(|o| o.key)
296 : .collect();
297 : assert!(objects.is_empty());
298 :
299 : // Prefix "folder2/blob" matches.
300 : let objects: HashSet<_> = test_client
301 : .list(
302 : Some(&RemotePath::from_string("folder2/blob")?),
303 : ListingMode::NoDelimiter,
304 : None,
305 : &cancel,
306 : )
307 : .await?
308 : .keys
309 : .into_iter()
310 21 : .map(|o| o.key)
311 : .collect();
312 : let expect: HashSet<_> = ctx
313 : .remote_blobs
314 : .iter()
315 63 : .filter(|o| o.get_path().starts_with("folder2"))
316 : .cloned()
317 : .collect();
318 : assert_eq!(&objects, &expect);
319 :
320 : // Prefix "folder2/foo" matches nothing.
321 : let objects: HashSet<_> = test_client
322 : .list(
323 : Some(&RemotePath::from_string("folder2/foo")?),
324 : ListingMode::NoDelimiter,
325 : None,
326 : &cancel,
327 : )
328 : .await?
329 : .keys
330 : .into_iter()
331 0 : .map(|o| o.key)
332 : .collect();
333 : assert!(objects.is_empty());
334 :
335 : Ok(())
336 : }
337 :
338 6 : #[test_context(MaybeEnabledStorage)]
339 : #[tokio::test]
340 : async fn delete_non_exising_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
341 : let ctx = match ctx {
342 : MaybeEnabledStorage::Enabled(ctx) => ctx,
343 : MaybeEnabledStorage::Disabled => return Ok(()),
344 : };
345 :
346 : let cancel = CancellationToken::new();
347 :
348 : let path = RemotePath::new(Utf8Path::new(
349 : format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(),
350 : ))
351 0 : .with_context(|| "RemotePath conversion")?;
352 :
353 : ctx.client
354 : .delete(&path, &cancel)
355 : .await
356 : .expect("should succeed");
357 :
358 : Ok(())
359 : }
360 :
361 6 : #[test_context(MaybeEnabledStorage)]
362 : #[tokio::test]
363 : async fn delete_objects_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
364 : let ctx = match ctx {
365 : MaybeEnabledStorage::Enabled(ctx) => ctx,
366 : MaybeEnabledStorage::Disabled => return Ok(()),
367 : };
368 :
369 : let cancel = CancellationToken::new();
370 :
371 : let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
372 0 : .with_context(|| "RemotePath conversion")?;
373 :
374 : let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
375 0 : .with_context(|| "RemotePath conversion")?;
376 :
377 : let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
378 0 : .with_context(|| "RemotePath conversion")?;
379 :
380 : let (data, len) = upload_stream("remote blob data1".as_bytes().into());
381 : ctx.client.upload(data, len, &path1, None, &cancel).await?;
382 :
383 : let (data, len) = upload_stream("remote blob data2".as_bytes().into());
384 : ctx.client.upload(data, len, &path2, None, &cancel).await?;
385 :
386 : let (data, len) = upload_stream("remote blob data3".as_bytes().into());
387 : ctx.client.upload(data, len, &path3, None, &cancel).await?;
388 :
389 : ctx.client.delete_objects(&[path1, path2], &cancel).await?;
390 :
391 : let prefixes = ctx
392 : .client
393 : .list(None, ListingMode::WithDelimiter, None, &cancel)
394 : .await?
395 : .prefixes;
396 :
397 : assert_eq!(prefixes.len(), 1);
398 :
399 : ctx.client.delete_objects(&[path3], &cancel).await?;
400 :
401 : Ok(())
402 : }
403 :
404 : /// Tests that delete_prefix() will delete all objects matching a prefix, including
405 : /// partial prefixes (i.e. "/foo" matches "/foobar").
406 6 : #[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
407 : #[tokio::test]
408 : async fn delete_prefix(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> anyhow::Result<()> {
409 : let ctx = match ctx {
410 : MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
411 : MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
412 : MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
413 : anyhow::bail!("S3 init failed: {e:?}")
414 : }
415 : };
416 :
417 : let cancel = CancellationToken::new();
418 : let test_client = Arc::clone(&ctx.enabled.client);
419 :
420 : /// Asserts that the S3 listing matches the given paths.
421 : macro_rules! assert_list {
422 : ($expect:expr) => {{
423 : let listing = test_client
424 : .list(None, ListingMode::NoDelimiter, None, &cancel)
425 : .await?
426 : .keys
427 : .into_iter()
428 292 : .map(|o| o.key)
429 : .collect();
430 : assert_eq!($expect, listing);
431 : }};
432 : }
433 :
434 : // We start with the full set of uploaded files.
435 : let mut expect = ctx.remote_blobs.clone();
436 :
437 : // Deleting a non-existing prefix should do nothing.
438 : test_client
439 : .delete_prefix(&RemotePath::from_string("xyz")?, &cancel)
440 : .await?;
441 : assert_list!(expect);
442 :
443 : // Prefixes are case-sensitive.
444 : test_client
445 : .delete_prefix(&RemotePath::from_string("Folder")?, &cancel)
446 : .await?;
447 : assert_list!(expect);
448 :
449 : // Deleting a path which overlaps with an existing object should do nothing. We pick the first
450 : // path in the set as our common prefix.
451 : let path = expect.iter().next().expect("empty set").clone().join("xyz");
452 : test_client.delete_prefix(&path, &cancel).await?;
453 : assert_list!(expect);
454 :
455 : // Deleting an exact path should work. We pick the first path in the set.
456 : let path = expect.iter().next().expect("empty set").clone();
457 : test_client.delete_prefix(&path, &cancel).await?;
458 : expect.remove(&path);
459 : assert_list!(expect);
460 :
461 : // Deleting a prefix should delete all matching objects.
462 : test_client
463 : .delete_prefix(&RemotePath::from_string("folder0/blob_")?, &cancel)
464 : .await?;
465 60 : expect.retain(|p| !p.get_path().as_str().starts_with("folder0/"));
466 : assert_list!(expect);
467 :
468 : // Deleting a common prefix should delete all objects.
469 : test_client
470 : .delete_prefix(&RemotePath::from_string("fold")?, &cancel)
471 : .await?;
472 : expect.clear();
473 : assert_list!(expect);
474 :
475 : Ok(())
476 : }
477 :
478 6 : #[test_context(MaybeEnabledStorage)]
479 : #[tokio::test]
480 : async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
481 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
482 : return Ok(());
483 : };
484 :
485 : let cancel = CancellationToken::new();
486 :
487 : let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))
488 0 : .with_context(|| "RemotePath conversion")?;
489 :
490 : let orig = bytes::Bytes::from_static("remote blob data here".as_bytes());
491 :
492 : let (data, len) = wrap_stream(orig.clone());
493 :
494 : ctx.client.upload(data, len, &path, None, &cancel).await?;
495 :
496 : // Normal download request
497 : let dl = ctx
498 : .client
499 : .download(&path, &DownloadOpts::default(), &cancel)
500 : .await?;
501 : let buf = download_to_vec(dl).await?;
502 : assert_eq!(&buf, &orig);
503 :
504 : // Full range (end specified)
505 : let dl = ctx
506 : .client
507 : .download(
508 : &path,
509 : &DownloadOpts {
510 : byte_start: Bound::Included(0),
511 : byte_end: Bound::Excluded(len as u64),
512 : ..Default::default()
513 : },
514 : &cancel,
515 : )
516 : .await?;
517 : let buf = download_to_vec(dl).await?;
518 : assert_eq!(&buf, &orig);
519 :
520 : // partial range (end specified)
521 : let dl = ctx
522 : .client
523 : .download(
524 : &path,
525 : &DownloadOpts {
526 : byte_start: Bound::Included(4),
527 : byte_end: Bound::Excluded(10),
528 : ..Default::default()
529 : },
530 : &cancel,
531 : )
532 : .await?;
533 : let buf = download_to_vec(dl).await?;
534 : assert_eq!(&buf, &orig[4..10]);
535 :
536 : // partial range (end beyond real end)
537 : let dl = ctx
538 : .client
539 : .download(
540 : &path,
541 : &DownloadOpts {
542 : byte_start: Bound::Included(8),
543 : byte_end: Bound::Excluded(len as u64 * 100),
544 : ..Default::default()
545 : },
546 : &cancel,
547 : )
548 : .await?;
549 : let buf = download_to_vec(dl).await?;
550 : assert_eq!(&buf, &orig[8..]);
551 :
552 : // Partial range (end unspecified)
553 : let dl = ctx
554 : .client
555 : .download(
556 : &path,
557 : &DownloadOpts {
558 : byte_start: Bound::Included(4),
559 : ..Default::default()
560 : },
561 : &cancel,
562 : )
563 : .await?;
564 : let buf = download_to_vec(dl).await?;
565 : assert_eq!(&buf, &orig[4..]);
566 :
567 : // Full range (end unspecified)
568 : let dl = ctx
569 : .client
570 : .download(
571 : &path,
572 : &DownloadOpts {
573 : byte_start: Bound::Included(0),
574 : ..Default::default()
575 : },
576 : &cancel,
577 : )
578 : .await?;
579 : let buf = download_to_vec(dl).await?;
580 : assert_eq!(&buf, &orig);
581 :
582 : debug!("Cleanup: deleting file at path {path:?}");
583 : ctx.client
584 : .delete(&path, &cancel)
585 : .await
586 0 : .with_context(|| format!("{path:?} removal"))?;
587 :
588 : Ok(())
589 : }
590 :
591 : /// Tests that conditional downloads work properly, by returning
592 : /// DownloadError::Unmodified when the object ETag matches the given ETag.
593 6 : #[test_context(MaybeEnabledStorage)]
594 : #[tokio::test]
595 : async fn download_conditional(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
596 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
597 : return Ok(());
598 : };
599 : let cancel = CancellationToken::new();
600 :
601 : // Create a file.
602 : let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))?;
603 : let data = bytes::Bytes::from_static("foo".as_bytes());
604 : let (stream, len) = wrap_stream(data);
605 : ctx.client.upload(stream, len, &path, None, &cancel).await?;
606 :
607 : // Download it to obtain its etag.
608 : let mut opts = DownloadOpts::default();
609 : let download = ctx.client.download(&path, &opts, &cancel).await?;
610 :
611 : // Download with the etag yields DownloadError::Unmodified.
612 : opts.etag = Some(download.etag);
613 : let result = ctx.client.download(&path, &opts, &cancel).await;
614 : assert!(
615 : matches!(result, Err(DownloadError::Unmodified)),
616 : "expected DownloadError::Unmodified, got {result:?}"
617 : );
618 :
619 : // Replace the file contents.
620 : let data = bytes::Bytes::from_static("bar".as_bytes());
621 : let (stream, len) = wrap_stream(data);
622 : ctx.client.upload(stream, len, &path, None, &cancel).await?;
623 :
624 : // A download with the old etag should yield the new file.
625 : let download = ctx.client.download(&path, &opts, &cancel).await?;
626 : assert_ne!(download.etag, opts.etag.unwrap(), "ETag did not change");
627 :
628 : // A download with the new etag should yield Unmodified again.
629 : opts.etag = Some(download.etag);
630 : let result = ctx.client.download(&path, &opts, &cancel).await;
631 : assert!(
632 : matches!(result, Err(DownloadError::Unmodified)),
633 : "expected DownloadError::Unmodified, got {result:?}"
634 : );
635 :
636 : Ok(())
637 : }
638 :
639 6 : #[test_context(MaybeEnabledStorage)]
640 : #[tokio::test]
641 : async fn copy_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
642 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
643 : return Ok(());
644 : };
645 :
646 : let cancel = CancellationToken::new();
647 :
648 : let path = RemotePath::new(Utf8Path::new(
649 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
650 : ))
651 0 : .with_context(|| "RemotePath conversion")?;
652 : let path_dest = RemotePath::new(Utf8Path::new(
653 : format!("{}/file_dest", ctx.base_prefix).as_str(),
654 : ))
655 0 : .with_context(|| "RemotePath conversion")?;
656 :
657 : let orig = bytes::Bytes::from_static("remote blob data content".as_bytes());
658 :
659 : let (data, len) = wrap_stream(orig.clone());
660 :
661 : ctx.client.upload(data, len, &path, None, &cancel).await?;
662 :
663 : // Normal download request
664 : ctx.client.copy_object(&path, &path_dest, &cancel).await?;
665 :
666 : let dl = ctx
667 : .client
668 : .download(&path_dest, &DownloadOpts::default(), &cancel)
669 : .await?;
670 : let buf = download_to_vec(dl).await?;
671 : assert_eq!(&buf, &orig);
672 :
673 : debug!("Cleanup: deleting file at path {path:?}");
674 : ctx.client
675 : .delete_objects(&[path.clone(), path_dest.clone()], &cancel)
676 : .await
677 0 : .with_context(|| format!("{path:?} removal"))?;
678 :
679 : Ok(())
680 : }
681 :
682 : /// Tests that head_object works properly.
683 6 : #[test_context(MaybeEnabledStorage)]
684 : #[tokio::test]
685 : async fn head_object(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
686 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
687 : return Ok(());
688 : };
689 : let cancel = CancellationToken::new();
690 :
691 : let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))?;
692 :
693 : // Errors on missing file.
694 : let result = ctx.client.head_object(&path, &cancel).await;
695 : assert!(
696 : matches!(result, Err(DownloadError::NotFound)),
697 : "expected NotFound, got {result:?}"
698 : );
699 :
700 : // Create the file.
701 : let data = bytes::Bytes::from_static("foo".as_bytes());
702 : let (stream, len) = wrap_stream(data);
703 : ctx.client.upload(stream, len, &path, None, &cancel).await?;
704 :
705 : // Fetch the head metadata.
706 : let object = ctx.client.head_object(&path, &cancel).await?;
707 : assert_eq!(
708 : object,
709 : ListingObject {
710 : key: path.clone(),
711 : last_modified: object.last_modified, // ignore
712 : size: 3
713 : }
714 : );
715 :
716 : // Wait for a couple of seconds, and then update the file to check the last
717 : // modified timestamp.
718 : tokio::time::sleep(std::time::Duration::from_secs(2)).await;
719 :
720 : let data = bytes::Bytes::from_static("bar".as_bytes());
721 : let (stream, len) = wrap_stream(data);
722 : ctx.client.upload(stream, len, &path, None, &cancel).await?;
723 : let new = ctx.client.head_object(&path, &cancel).await?;
724 :
725 : assert!(
726 : !new.last_modified
727 : .duration_since(object.last_modified)?
728 : .is_zero(),
729 : "last_modified did not advance"
730 : );
731 :
732 : Ok(())
733 : }
|