Line data Source code
1 : use anyhow::Context;
2 : use camino::Utf8Path;
3 : use futures::StreamExt;
4 : use remote_storage::{DownloadError, DownloadOpts, ListingMode, ListingObject, RemotePath};
5 : use std::ops::Bound;
6 : use std::sync::Arc;
7 : use std::{collections::HashSet, num::NonZeroU32};
8 : use test_context::test_context;
9 : use tokio_util::sync::CancellationToken;
10 : use tracing::debug;
11 :
12 : use crate::common::{download_to_vec, upload_stream, wrap_stream};
13 :
14 : use super::{
15 : MaybeEnabledStorage, MaybeEnabledStorageWithSimpleTestBlobs, MaybeEnabledStorageWithTestBlobs,
16 : };
17 :
18 : /// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries.
19 : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified.
20 : /// See the client creation in [`create_s3_client`] for details on the required env vars.
21 : /// If real S3 tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the
22 : /// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details.
23 : ///
24 : /// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_remote_data`]
25 : /// where
26 : /// * `random_prefix_part` is set for the entire S3 client during the S3 client creation in [`create_s3_client`], to avoid multiple test runs interference
27 : /// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket
28 : ///
29 : /// Then, verifies that the client does return correct prefixes when queried:
30 : /// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only
31 : /// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}`
32 : ///
33 : /// In the `MaybeEnabledStorageWithTestBlobs::setup`, we set the `max_keys_in_list_response` param to limit the keys in a single response.
34 : /// This way, we are able to test the pagination, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to S3,
35 : /// as the current default AWS S3 pagination limit is 1000.
36 : /// (see <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>).
37 : ///
38 : /// Lastly, the test attempts to clean up and remove all uploaded S3 files.
39 : /// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished.
40 6 : #[test_context(MaybeEnabledStorageWithTestBlobs)]
41 : #[tokio::test]
42 : async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> anyhow::Result<()> {
43 : let ctx = match ctx {
44 : MaybeEnabledStorageWithTestBlobs::Enabled(ctx) => ctx,
45 : MaybeEnabledStorageWithTestBlobs::Disabled => return Ok(()),
46 : MaybeEnabledStorageWithTestBlobs::UploadsFailed(e, _) => {
47 : anyhow::bail!("S3 init failed: {e:?}")
48 : }
49 : };
50 :
51 : let cancel = CancellationToken::new();
52 :
53 : let test_client = Arc::clone(&ctx.enabled.client);
54 : let expected_remote_prefixes = ctx.remote_prefixes.clone();
55 :
56 : let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix))
57 : .context("common_prefix construction")?;
58 : let root_remote_prefixes = test_client
59 : .list(None, ListingMode::WithDelimiter, None, &cancel)
60 : .await?
61 : .prefixes
62 : .into_iter()
63 : .collect::<HashSet<_>>();
64 : assert_eq!(
65 : root_remote_prefixes, HashSet::from([base_prefix.clone()]),
66 : "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}"
67 : );
68 :
69 : let nested_remote_prefixes = test_client
70 : .list(
71 : Some(&base_prefix.add_trailing_slash()),
72 : ListingMode::WithDelimiter,
73 : None,
74 : &cancel,
75 : )
76 : .await?
77 : .prefixes
78 : .into_iter()
79 : .collect::<HashSet<_>>();
80 : let remote_only_prefixes = nested_remote_prefixes
81 : .difference(&expected_remote_prefixes)
82 : .collect::<HashSet<_>>();
83 : let missing_uploaded_prefixes = expected_remote_prefixes
84 : .difference(&nested_remote_prefixes)
85 : .collect::<HashSet<_>>();
86 : assert_eq!(
87 : remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
88 : "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
89 : );
90 :
91 : // list_streaming
92 :
93 : let prefix_with_slash = base_prefix.add_trailing_slash();
94 : let mut nested_remote_prefixes_st = test_client.list_streaming(
95 : Some(&prefix_with_slash),
96 : ListingMode::WithDelimiter,
97 : None,
98 : &cancel,
99 : );
100 : let mut nested_remote_prefixes_combined = HashSet::new();
101 : let mut segments = 0;
102 : let mut segment_max_size = 0;
103 : while let Some(st) = nested_remote_prefixes_st.next().await {
104 : let st = st?;
105 : segment_max_size = segment_max_size.max(st.prefixes.len());
106 : nested_remote_prefixes_combined.extend(st.prefixes.into_iter());
107 : segments += 1;
108 : }
109 : assert!(segments > 1, "less than 2 segments: {segments}");
110 : assert!(
111 : segment_max_size * 2 <= nested_remote_prefixes_combined.len(),
112 : "double of segment_max_size={segment_max_size} larger number of remote prefixes of {}",
113 : nested_remote_prefixes_combined.len()
114 : );
115 : let remote_only_prefixes = nested_remote_prefixes_combined
116 : .difference(&expected_remote_prefixes)
117 : .collect::<HashSet<_>>();
118 : let missing_uploaded_prefixes = expected_remote_prefixes
119 : .difference(&nested_remote_prefixes_combined)
120 : .collect::<HashSet<_>>();
121 : assert_eq!(
122 : remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
123 : "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
124 : );
125 :
126 : Ok(())
127 : }
128 :
129 : /// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries.
130 : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set.
131 : /// See `s3_pagination_should_work` for more information.
132 : ///
133 : /// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`]
134 : /// Then performs the following queries:
135 : /// 1. `list(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
136 : /// 2. `list("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt`
137 6 : #[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
138 : #[tokio::test]
139 : async fn list_no_delimiter_works(
140 : ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs,
141 : ) -> anyhow::Result<()> {
142 : let ctx = match ctx {
143 : MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
144 : MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
145 : MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
146 : anyhow::bail!("S3 init failed: {e:?}")
147 : }
148 : };
149 : let cancel = CancellationToken::new();
150 : let test_client = Arc::clone(&ctx.enabled.client);
151 : let base_prefix =
152 : RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?;
153 : let root_files = test_client
154 : .list(None, ListingMode::NoDelimiter, None, &cancel)
155 : .await
156 : .context("client list root files failure")?
157 : .keys
158 : .into_iter()
159 63 : .map(|o| o.key)
160 : .collect::<HashSet<_>>();
161 : assert_eq!(
162 : root_files,
163 : ctx.remote_blobs.clone(),
164 : "remote storage list on root mismatches with the uploads."
165 : );
166 :
167 : // Test that max_keys limit works. In total there are about 21 files (see
168 : // upload_simple_remote_data call in test_real_s3.rs).
169 : let limited_root_files = test_client
170 : .list(
171 : None,
172 : ListingMode::NoDelimiter,
173 : Some(NonZeroU32::new(2).unwrap()),
174 : &cancel,
175 : )
176 : .await
177 : .context("client list root files failure")?;
178 : assert_eq!(limited_root_files.keys.len(), 2);
179 :
180 : let nested_remote_files = test_client
181 : .list(Some(&base_prefix), ListingMode::NoDelimiter, None, &cancel)
182 : .await
183 : .context("client list nested files failure")?
184 : .keys
185 : .into_iter()
186 21 : .map(|o| o.key)
187 : .collect::<HashSet<_>>();
188 : let trim_remote_blobs: HashSet<_> = ctx
189 : .remote_blobs
190 : .iter()
191 63 : .map(|x| x.get_path())
192 63 : .filter(|x| x.starts_with("folder1"))
193 21 : .map(|x| RemotePath::new(x).expect("must be valid path"))
194 : .collect();
195 : assert_eq!(
196 : nested_remote_files, trim_remote_blobs,
197 : "remote storage list on subdirrectory mismatches with the uploads."
198 : );
199 : Ok(())
200 : }
201 :
202 : /// Tests that giving a partial prefix returns all matches (e.g. "/foo" yields "/foobar/baz"),
203 : /// but only with NoDelimiter.
204 6 : #[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
205 : #[tokio::test]
206 : async fn list_partial_prefix(
207 : ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs,
208 : ) -> anyhow::Result<()> {
209 : let ctx = match ctx {
210 : MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
211 : MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
212 : MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
213 : anyhow::bail!("S3 init failed: {e:?}")
214 : }
215 : };
216 :
217 : let cancel = CancellationToken::new();
218 : let test_client = Arc::clone(&ctx.enabled.client);
219 :
220 : // Prefix "fold" should match all "folder{i}" directories with NoDelimiter.
221 : let objects: HashSet<_> = test_client
222 : .list(
223 : Some(&RemotePath::from_string("fold")?),
224 : ListingMode::NoDelimiter,
225 : None,
226 : &cancel,
227 : )
228 : .await?
229 : .keys
230 : .into_iter()
231 63 : .map(|o| o.key)
232 : .collect();
233 : assert_eq!(&objects, &ctx.remote_blobs);
234 :
235 : // Prefix "fold" matches nothing with WithDelimiter.
236 : let objects: HashSet<_> = test_client
237 : .list(
238 : Some(&RemotePath::from_string("fold")?),
239 : ListingMode::WithDelimiter,
240 : None,
241 : &cancel,
242 : )
243 : .await?
244 : .keys
245 : .into_iter()
246 0 : .map(|o| o.key)
247 : .collect();
248 : assert!(objects.is_empty());
249 :
250 : // Prefix "" matches everything.
251 : let objects: HashSet<_> = test_client
252 : .list(
253 : Some(&RemotePath::from_string("")?),
254 : ListingMode::NoDelimiter,
255 : None,
256 : &cancel,
257 : )
258 : .await?
259 : .keys
260 : .into_iter()
261 63 : .map(|o| o.key)
262 : .collect();
263 : assert_eq!(&objects, &ctx.remote_blobs);
264 :
265 : // Prefix "" matches nothing with WithDelimiter.
266 : let objects: HashSet<_> = test_client
267 : .list(
268 : Some(&RemotePath::from_string("")?),
269 : ListingMode::WithDelimiter,
270 : None,
271 : &cancel,
272 : )
273 : .await?
274 : .keys
275 : .into_iter()
276 0 : .map(|o| o.key)
277 : .collect();
278 : assert!(objects.is_empty());
279 :
280 : // Prefix "foo" matches nothing.
281 : let objects: HashSet<_> = test_client
282 : .list(
283 : Some(&RemotePath::from_string("foo")?),
284 : ListingMode::NoDelimiter,
285 : None,
286 : &cancel,
287 : )
288 : .await?
289 : .keys
290 : .into_iter()
291 0 : .map(|o| o.key)
292 : .collect();
293 : assert!(objects.is_empty());
294 :
295 : // Prefix "folder2/blob" matches.
296 : let objects: HashSet<_> = test_client
297 : .list(
298 : Some(&RemotePath::from_string("folder2/blob")?),
299 : ListingMode::NoDelimiter,
300 : None,
301 : &cancel,
302 : )
303 : .await?
304 : .keys
305 : .into_iter()
306 21 : .map(|o| o.key)
307 : .collect();
308 : let expect: HashSet<_> = ctx
309 : .remote_blobs
310 : .iter()
311 63 : .filter(|o| o.get_path().starts_with("folder2"))
312 : .cloned()
313 : .collect();
314 : assert_eq!(&objects, &expect);
315 :
316 : // Prefix "folder2/foo" matches nothing.
317 : let objects: HashSet<_> = test_client
318 : .list(
319 : Some(&RemotePath::from_string("folder2/foo")?),
320 : ListingMode::NoDelimiter,
321 : None,
322 : &cancel,
323 : )
324 : .await?
325 : .keys
326 : .into_iter()
327 0 : .map(|o| o.key)
328 : .collect();
329 : assert!(objects.is_empty());
330 :
331 : Ok(())
332 : }
333 :
334 6 : #[test_context(MaybeEnabledStorage)]
335 : #[tokio::test]
336 : async fn delete_non_exising_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
337 : let ctx = match ctx {
338 : MaybeEnabledStorage::Enabled(ctx) => ctx,
339 : MaybeEnabledStorage::Disabled => return Ok(()),
340 : };
341 :
342 : let cancel = CancellationToken::new();
343 :
344 : let path = RemotePath::new(Utf8Path::new(
345 : format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(),
346 : ))
347 0 : .with_context(|| "RemotePath conversion")?;
348 :
349 : ctx.client
350 : .delete(&path, &cancel)
351 : .await
352 : .expect("should succeed");
353 :
354 : Ok(())
355 : }
356 :
357 6 : #[test_context(MaybeEnabledStorage)]
358 : #[tokio::test]
359 : async fn delete_objects_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
360 : let ctx = match ctx {
361 : MaybeEnabledStorage::Enabled(ctx) => ctx,
362 : MaybeEnabledStorage::Disabled => return Ok(()),
363 : };
364 :
365 : let cancel = CancellationToken::new();
366 :
367 : let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
368 0 : .with_context(|| "RemotePath conversion")?;
369 :
370 : let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
371 0 : .with_context(|| "RemotePath conversion")?;
372 :
373 : let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
374 0 : .with_context(|| "RemotePath conversion")?;
375 :
376 : let (data, len) = upload_stream("remote blob data1".as_bytes().into());
377 : ctx.client.upload(data, len, &path1, None, &cancel).await?;
378 :
379 : let (data, len) = upload_stream("remote blob data2".as_bytes().into());
380 : ctx.client.upload(data, len, &path2, None, &cancel).await?;
381 :
382 : let (data, len) = upload_stream("remote blob data3".as_bytes().into());
383 : ctx.client.upload(data, len, &path3, None, &cancel).await?;
384 :
385 : ctx.client.delete_objects(&[path1, path2], &cancel).await?;
386 :
387 : let prefixes = ctx
388 : .client
389 : .list(None, ListingMode::WithDelimiter, None, &cancel)
390 : .await?
391 : .prefixes;
392 :
393 : assert_eq!(prefixes.len(), 1);
394 :
395 : ctx.client.delete_objects(&[path3], &cancel).await?;
396 :
397 : Ok(())
398 : }
399 :
400 : /// Tests that delete_prefix() will delete all objects matching a prefix, including
401 : /// partial prefixes (i.e. "/foo" matches "/foobar").
402 6 : #[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
403 : #[tokio::test]
404 : async fn delete_prefix(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> anyhow::Result<()> {
405 : let ctx = match ctx {
406 : MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
407 : MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
408 : MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
409 : anyhow::bail!("S3 init failed: {e:?}")
410 : }
411 : };
412 :
413 : let cancel = CancellationToken::new();
414 : let test_client = Arc::clone(&ctx.enabled.client);
415 :
416 : /// Asserts that the S3 listing matches the given paths.
417 : macro_rules! assert_list {
418 : ($expect:expr) => {{
419 : let listing = test_client
420 : .list(None, ListingMode::NoDelimiter, None, &cancel)
421 : .await?
422 : .keys
423 : .into_iter()
424 292 : .map(|o| o.key)
425 : .collect();
426 : assert_eq!($expect, listing);
427 : }};
428 : }
429 :
430 : // We start with the full set of uploaded files.
431 : let mut expect = ctx.remote_blobs.clone();
432 :
433 : // Deleting a non-existing prefix should do nothing.
434 : test_client
435 : .delete_prefix(&RemotePath::from_string("xyz")?, &cancel)
436 : .await?;
437 : assert_list!(expect);
438 :
439 : // Prefixes are case-sensitive.
440 : test_client
441 : .delete_prefix(&RemotePath::from_string("Folder")?, &cancel)
442 : .await?;
443 : assert_list!(expect);
444 :
445 : // Deleting a path which overlaps with an existing object should do nothing. We pick the first
446 : // path in the set as our common prefix.
447 : let path = expect.iter().next().expect("empty set").clone().join("xyz");
448 : test_client.delete_prefix(&path, &cancel).await?;
449 : assert_list!(expect);
450 :
451 : // Deleting an exact path should work. We pick the first path in the set.
452 : let path = expect.iter().next().expect("empty set").clone();
453 : test_client.delete_prefix(&path, &cancel).await?;
454 : expect.remove(&path);
455 : assert_list!(expect);
456 :
457 : // Deleting a prefix should delete all matching objects.
458 : test_client
459 : .delete_prefix(&RemotePath::from_string("folder0/blob_")?, &cancel)
460 : .await?;
461 60 : expect.retain(|p| !p.get_path().as_str().starts_with("folder0/"));
462 : assert_list!(expect);
463 :
464 : // Deleting a common prefix should delete all objects.
465 : test_client
466 : .delete_prefix(&RemotePath::from_string("fold")?, &cancel)
467 : .await?;
468 : expect.clear();
469 : assert_list!(expect);
470 :
471 : Ok(())
472 : }
473 :
474 6 : #[test_context(MaybeEnabledStorage)]
475 : #[tokio::test]
476 : async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
477 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
478 : return Ok(());
479 : };
480 :
481 : let cancel = CancellationToken::new();
482 :
483 : let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))
484 0 : .with_context(|| "RemotePath conversion")?;
485 :
486 : let orig = bytes::Bytes::from_static("remote blob data here".as_bytes());
487 :
488 : let (data, len) = wrap_stream(orig.clone());
489 :
490 : ctx.client.upload(data, len, &path, None, &cancel).await?;
491 :
492 : // Normal download request
493 : let dl = ctx
494 : .client
495 : .download(&path, &DownloadOpts::default(), &cancel)
496 : .await?;
497 : let buf = download_to_vec(dl).await?;
498 : assert_eq!(&buf, &orig);
499 :
500 : // Full range (end specified)
501 : let dl = ctx
502 : .client
503 : .download(
504 : &path,
505 : &DownloadOpts {
506 : byte_start: Bound::Included(0),
507 : byte_end: Bound::Excluded(len as u64),
508 : ..Default::default()
509 : },
510 : &cancel,
511 : )
512 : .await?;
513 : let buf = download_to_vec(dl).await?;
514 : assert_eq!(&buf, &orig);
515 :
516 : // partial range (end specified)
517 : let dl = ctx
518 : .client
519 : .download(
520 : &path,
521 : &DownloadOpts {
522 : byte_start: Bound::Included(4),
523 : byte_end: Bound::Excluded(10),
524 : ..Default::default()
525 : },
526 : &cancel,
527 : )
528 : .await?;
529 : let buf = download_to_vec(dl).await?;
530 : assert_eq!(&buf, &orig[4..10]);
531 :
532 : // partial range (end beyond real end)
533 : let dl = ctx
534 : .client
535 : .download(
536 : &path,
537 : &DownloadOpts {
538 : byte_start: Bound::Included(8),
539 : byte_end: Bound::Excluded(len as u64 * 100),
540 : ..Default::default()
541 : },
542 : &cancel,
543 : )
544 : .await?;
545 : let buf = download_to_vec(dl).await?;
546 : assert_eq!(&buf, &orig[8..]);
547 :
548 : // Partial range (end unspecified)
549 : let dl = ctx
550 : .client
551 : .download(
552 : &path,
553 : &DownloadOpts {
554 : byte_start: Bound::Included(4),
555 : ..Default::default()
556 : },
557 : &cancel,
558 : )
559 : .await?;
560 : let buf = download_to_vec(dl).await?;
561 : assert_eq!(&buf, &orig[4..]);
562 :
563 : // Full range (end unspecified)
564 : let dl = ctx
565 : .client
566 : .download(
567 : &path,
568 : &DownloadOpts {
569 : byte_start: Bound::Included(0),
570 : ..Default::default()
571 : },
572 : &cancel,
573 : )
574 : .await?;
575 : let buf = download_to_vec(dl).await?;
576 : assert_eq!(&buf, &orig);
577 :
578 : debug!("Cleanup: deleting file at path {path:?}");
579 : ctx.client
580 : .delete(&path, &cancel)
581 : .await
582 0 : .with_context(|| format!("{path:?} removal"))?;
583 :
584 : Ok(())
585 : }
586 :
587 : /// Tests that conditional downloads work properly, by returning
588 : /// DownloadError::Unmodified when the object ETag matches the given ETag.
589 6 : #[test_context(MaybeEnabledStorage)]
590 : #[tokio::test]
591 : async fn download_conditional(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
592 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
593 : return Ok(());
594 : };
595 : let cancel = CancellationToken::new();
596 :
597 : // Create a file.
598 : let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))?;
599 : let data = bytes::Bytes::from_static("foo".as_bytes());
600 : let (stream, len) = wrap_stream(data);
601 : ctx.client.upload(stream, len, &path, None, &cancel).await?;
602 :
603 : // Download it to obtain its etag.
604 : let mut opts = DownloadOpts::default();
605 : let download = ctx.client.download(&path, &opts, &cancel).await?;
606 :
607 : // Download with the etag yields DownloadError::Unmodified.
608 : opts.etag = Some(download.etag);
609 : let result = ctx.client.download(&path, &opts, &cancel).await;
610 : assert!(
611 : matches!(result, Err(DownloadError::Unmodified)),
612 : "expected DownloadError::Unmodified, got {result:?}"
613 : );
614 :
615 : // Replace the file contents.
616 : let data = bytes::Bytes::from_static("bar".as_bytes());
617 : let (stream, len) = wrap_stream(data);
618 : ctx.client.upload(stream, len, &path, None, &cancel).await?;
619 :
620 : // A download with the old etag should yield the new file.
621 : let download = ctx.client.download(&path, &opts, &cancel).await?;
622 : assert_ne!(download.etag, opts.etag.unwrap(), "ETag did not change");
623 :
624 : // A download with the new etag should yield Unmodified again.
625 : opts.etag = Some(download.etag);
626 : let result = ctx.client.download(&path, &opts, &cancel).await;
627 : assert!(
628 : matches!(result, Err(DownloadError::Unmodified)),
629 : "expected DownloadError::Unmodified, got {result:?}"
630 : );
631 :
632 : Ok(())
633 : }
634 :
635 6 : #[test_context(MaybeEnabledStorage)]
636 : #[tokio::test]
637 : async fn copy_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
638 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
639 : return Ok(());
640 : };
641 :
642 : let cancel = CancellationToken::new();
643 :
644 : let path = RemotePath::new(Utf8Path::new(
645 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
646 : ))
647 0 : .with_context(|| "RemotePath conversion")?;
648 : let path_dest = RemotePath::new(Utf8Path::new(
649 : format!("{}/file_dest", ctx.base_prefix).as_str(),
650 : ))
651 0 : .with_context(|| "RemotePath conversion")?;
652 :
653 : let orig = bytes::Bytes::from_static("remote blob data content".as_bytes());
654 :
655 : let (data, len) = wrap_stream(orig.clone());
656 :
657 : ctx.client.upload(data, len, &path, None, &cancel).await?;
658 :
659 : // Normal download request
660 : ctx.client.copy_object(&path, &path_dest, &cancel).await?;
661 :
662 : let dl = ctx
663 : .client
664 : .download(&path_dest, &DownloadOpts::default(), &cancel)
665 : .await?;
666 : let buf = download_to_vec(dl).await?;
667 : assert_eq!(&buf, &orig);
668 :
669 : debug!("Cleanup: deleting file at path {path:?}");
670 : ctx.client
671 : .delete_objects(&[path.clone(), path_dest.clone()], &cancel)
672 : .await
673 0 : .with_context(|| format!("{path:?} removal"))?;
674 :
675 : Ok(())
676 : }
677 :
678 : /// Tests that head_object works properly.
679 6 : #[test_context(MaybeEnabledStorage)]
680 : #[tokio::test]
681 : async fn head_object(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
682 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
683 : return Ok(());
684 : };
685 : let cancel = CancellationToken::new();
686 :
687 : let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))?;
688 :
689 : // Errors on missing file.
690 : let result = ctx.client.head_object(&path, &cancel).await;
691 : assert!(
692 : matches!(result, Err(DownloadError::NotFound)),
693 : "expected NotFound, got {result:?}"
694 : );
695 :
696 : // Create the file.
697 : let data = bytes::Bytes::from_static("foo".as_bytes());
698 : let (stream, len) = wrap_stream(data);
699 : ctx.client.upload(stream, len, &path, None, &cancel).await?;
700 :
701 : // Fetch the head metadata.
702 : let object = ctx.client.head_object(&path, &cancel).await?;
703 : assert_eq!(
704 : object,
705 : ListingObject {
706 : key: path.clone(),
707 : last_modified: object.last_modified, // ignore
708 : size: 3
709 : }
710 : );
711 :
712 : // Wait for a couple of seconds, and then update the file to check the last
713 : // modified timestamp.
714 : tokio::time::sleep(std::time::Duration::from_secs(2)).await;
715 :
716 : let data = bytes::Bytes::from_static("bar".as_bytes());
717 : let (stream, len) = wrap_stream(data);
718 : ctx.client.upload(stream, len, &path, None, &cancel).await?;
719 : let new = ctx.client.head_object(&path, &cancel).await?;
720 :
721 : assert!(
722 : !new.last_modified
723 : .duration_since(object.last_modified)?
724 : .is_zero(),
725 : "last_modified did not advance"
726 : );
727 :
728 : Ok(())
729 : }
|