LCOV - code coverage report
Current view: top level - libs/remote_storage/tests/common - tests.rs (source / functions) Coverage Total Hit
Test: 5fe7fa8d483b39476409aee736d6d5e32728bfac.info Lines: 61.8 % 34 21
Test Date: 2025-03-12 16:10:49 Functions: 64.1 % 78 50

            Line data    Source code
       1              : use std::collections::HashSet;
       2              : use std::num::NonZeroU32;
       3              : use std::ops::Bound;
       4              : use std::sync::Arc;
       5              : 
       6              : use anyhow::Context;
       7              : use camino::Utf8Path;
       8              : use futures::StreamExt;
       9              : use remote_storage::{DownloadError, DownloadOpts, ListingMode, ListingObject, RemotePath};
      10              : use test_context::test_context;
      11              : use tokio_util::sync::CancellationToken;
      12              : use tracing::debug;
      13              : 
      14              : use super::{
      15              :     MaybeEnabledStorage, MaybeEnabledStorageWithSimpleTestBlobs, MaybeEnabledStorageWithTestBlobs,
      16              : };
      17              : use crate::common::{download_to_vec, upload_stream, wrap_stream};
      18              : 
      19              : /// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries.
      20              : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified.
      21              : /// See the client creation in [`create_s3_client`] for details on the required env vars.
      22              : /// If real S3 tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the
      23              : /// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details.
      24              : ///
      25              : /// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_remote_data`]
      26              : /// where
      27              : /// * `random_prefix_part` is set for the entire S3 client during the S3 client creation in [`create_s3_client`], to avoid multiple test runs interference
      28              : /// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket
      29              : ///
      30              : /// Then, verifies that the client does return correct prefixes when queried:
      31              : /// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only
      32              : /// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}`
      33              : ///
      34              : /// In the `MaybeEnabledStorageWithTestBlobs::setup`, we set the `max_keys_in_list_response` param to limit the keys in a single response.
      35              : /// This way, we are able to test the pagination, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to S3,
      36              : /// as the current default AWS S3 pagination limit is 1000.
      37              : /// (see <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>).
      38              : ///
      39              : /// Lastly, the test attempts to clean up and remove all uploaded S3 files.
      40              : /// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished.
      41            6 : #[test_context(MaybeEnabledStorageWithTestBlobs)]
      42              : #[tokio::test]
      43              : async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> anyhow::Result<()> {
      44              :     let ctx = match ctx {
      45              :         MaybeEnabledStorageWithTestBlobs::Enabled(ctx) => ctx,
      46              :         MaybeEnabledStorageWithTestBlobs::Disabled => return Ok(()),
      47              :         MaybeEnabledStorageWithTestBlobs::UploadsFailed(e, _) => {
      48              :             anyhow::bail!("S3 init failed: {e:?}")
      49              :         }
      50              :     };
      51              : 
      52              :     let cancel = CancellationToken::new();
      53              : 
      54              :     let test_client = Arc::clone(&ctx.enabled.client);
      55              :     let expected_remote_prefixes = ctx.remote_prefixes.clone();
      56              : 
      57              :     let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix))
      58              :         .context("common_prefix construction")?;
      59              :     let root_remote_prefixes = test_client
      60              :         .list(None, ListingMode::WithDelimiter, None, &cancel)
      61              :         .await?
      62              :         .prefixes
      63              :         .into_iter()
      64              :         .collect::<HashSet<_>>();
      65              :     assert_eq!(
      66              :         root_remote_prefixes,
      67              :         HashSet::from([base_prefix.clone()]),
      68              :         "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}"
      69              :     );
      70              : 
      71              :     let nested_remote_prefixes = test_client
      72              :         .list(
      73              :             Some(&base_prefix.add_trailing_slash()),
      74              :             ListingMode::WithDelimiter,
      75              :             None,
      76              :             &cancel,
      77              :         )
      78              :         .await?
      79              :         .prefixes
      80              :         .into_iter()
      81              :         .collect::<HashSet<_>>();
      82              :     let remote_only_prefixes = nested_remote_prefixes
      83              :         .difference(&expected_remote_prefixes)
      84              :         .collect::<HashSet<_>>();
      85              :     let missing_uploaded_prefixes = expected_remote_prefixes
      86              :         .difference(&nested_remote_prefixes)
      87              :         .collect::<HashSet<_>>();
      88              :     assert_eq!(
      89              :         remote_only_prefixes.len() + missing_uploaded_prefixes.len(),
      90              :         0,
      91              :         "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
      92              :     );
      93              : 
      94              :     // list_streaming
      95              : 
      96              :     let prefix_with_slash = base_prefix.add_trailing_slash();
      97              :     let mut nested_remote_prefixes_st = test_client.list_streaming(
      98              :         Some(&prefix_with_slash),
      99              :         ListingMode::WithDelimiter,
     100              :         None,
     101              :         &cancel,
     102              :     );
     103              :     let mut nested_remote_prefixes_combined = HashSet::new();
     104              :     let mut segments = 0;
     105              :     let mut segment_max_size = 0;
     106              :     while let Some(st) = nested_remote_prefixes_st.next().await {
     107              :         let st = st?;
     108              :         segment_max_size = segment_max_size.max(st.prefixes.len());
     109              :         nested_remote_prefixes_combined.extend(st.prefixes.into_iter());
     110              :         segments += 1;
     111              :     }
     112              :     assert!(segments > 1, "less than 2 segments: {segments}");
     113              :     assert!(
     114              :         segment_max_size * 2 <= nested_remote_prefixes_combined.len(),
     115              :         "double of segment_max_size={segment_max_size} larger number of remote prefixes of {}",
     116              :         nested_remote_prefixes_combined.len()
     117              :     );
     118              :     let remote_only_prefixes = nested_remote_prefixes_combined
     119              :         .difference(&expected_remote_prefixes)
     120              :         .collect::<HashSet<_>>();
     121              :     let missing_uploaded_prefixes = expected_remote_prefixes
     122              :         .difference(&nested_remote_prefixes_combined)
     123              :         .collect::<HashSet<_>>();
     124              :     assert_eq!(
     125              :         remote_only_prefixes.len() + missing_uploaded_prefixes.len(),
     126              :         0,
     127              :         "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
     128              :     );
     129              : 
     130              :     Ok(())
     131              : }
     132              : 
     133              : /// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries.
     134              : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set.
     135              : /// See `s3_pagination_should_work` for more information.
     136              : ///
     137              : /// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`]
     138              : /// Then performs the following queries:
     139              : ///    1. `list(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
     140              : ///    2. `list("folder1")`.  This  should return all files `random_prefix/folder1/blob_{i}.txt`
     141            6 : #[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
     142              : #[tokio::test]
     143              : async fn list_no_delimiter_works(
     144              :     ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs,
     145              : ) -> anyhow::Result<()> {
     146              :     let ctx = match ctx {
     147              :         MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
     148              :         MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
     149              :         MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
     150              :             anyhow::bail!("S3 init failed: {e:?}")
     151              :         }
     152              :     };
     153              :     let cancel = CancellationToken::new();
     154              :     let test_client = Arc::clone(&ctx.enabled.client);
     155              :     let base_prefix =
     156              :         RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?;
     157              :     let root_files = test_client
     158              :         .list(None, ListingMode::NoDelimiter, None, &cancel)
     159              :         .await
     160              :         .context("client list root files failure")?
     161              :         .keys
     162              :         .into_iter()
     163           63 :         .map(|o| o.key)
     164              :         .collect::<HashSet<_>>();
     165              :     assert_eq!(
     166              :         root_files,
     167              :         ctx.remote_blobs.clone(),
     168              :         "remote storage list on root mismatches with the uploads."
     169              :     );
     170              : 
     171              :     // Test that max_keys limit works. In total there are about 21 files (see
     172              :     // upload_simple_remote_data call in test_real_s3.rs).
     173              :     let limited_root_files = test_client
     174              :         .list(
     175              :             None,
     176              :             ListingMode::NoDelimiter,
     177              :             Some(NonZeroU32::new(2).unwrap()),
     178              :             &cancel,
     179              :         )
     180              :         .await
     181              :         .context("client list root files failure")?;
     182              :     assert_eq!(limited_root_files.keys.len(), 2);
     183              : 
     184              :     let nested_remote_files = test_client
     185              :         .list(Some(&base_prefix), ListingMode::NoDelimiter, None, &cancel)
     186              :         .await
     187              :         .context("client list nested files failure")?
     188              :         .keys
     189              :         .into_iter()
     190           21 :         .map(|o| o.key)
     191              :         .collect::<HashSet<_>>();
     192              :     let trim_remote_blobs: HashSet<_> = ctx
     193              :         .remote_blobs
     194              :         .iter()
     195           63 :         .map(|x| x.get_path())
     196           63 :         .filter(|x| x.starts_with("folder1"))
     197           21 :         .map(|x| RemotePath::new(x).expect("must be valid path"))
     198              :         .collect();
     199              :     assert_eq!(
     200              :         nested_remote_files, trim_remote_blobs,
     201              :         "remote storage list on subdirrectory mismatches with the uploads."
     202              :     );
     203              :     Ok(())
     204              : }
     205              : 
     206              : /// Tests that giving a partial prefix returns all matches (e.g. "/foo" yields "/foobar/baz"),
     207              : /// but only with NoDelimiter.
     208            6 : #[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
     209              : #[tokio::test]
     210              : async fn list_partial_prefix(
     211              :     ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs,
     212              : ) -> anyhow::Result<()> {
     213              :     let ctx = match ctx {
     214              :         MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
     215              :         MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
     216              :         MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
     217              :             anyhow::bail!("S3 init failed: {e:?}")
     218              :         }
     219              :     };
     220              : 
     221              :     let cancel = CancellationToken::new();
     222              :     let test_client = Arc::clone(&ctx.enabled.client);
     223              : 
     224              :     // Prefix "fold" should match all "folder{i}" directories with NoDelimiter.
     225              :     let objects: HashSet<_> = test_client
     226              :         .list(
     227              :             Some(&RemotePath::from_string("fold")?),
     228              :             ListingMode::NoDelimiter,
     229              :             None,
     230              :             &cancel,
     231              :         )
     232              :         .await?
     233              :         .keys
     234              :         .into_iter()
     235           63 :         .map(|o| o.key)
     236              :         .collect();
     237              :     assert_eq!(&objects, &ctx.remote_blobs);
     238              : 
     239              :     // Prefix "fold" matches nothing with WithDelimiter.
     240              :     let objects: HashSet<_> = test_client
     241              :         .list(
     242              :             Some(&RemotePath::from_string("fold")?),
     243              :             ListingMode::WithDelimiter,
     244              :             None,
     245              :             &cancel,
     246              :         )
     247              :         .await?
     248              :         .keys
     249              :         .into_iter()
     250            0 :         .map(|o| o.key)
     251              :         .collect();
     252              :     assert!(objects.is_empty());
     253              : 
     254              :     // Prefix "" matches everything.
     255              :     let objects: HashSet<_> = test_client
     256              :         .list(
     257              :             Some(&RemotePath::from_string("")?),
     258              :             ListingMode::NoDelimiter,
     259              :             None,
     260              :             &cancel,
     261              :         )
     262              :         .await?
     263              :         .keys
     264              :         .into_iter()
     265           63 :         .map(|o| o.key)
     266              :         .collect();
     267              :     assert_eq!(&objects, &ctx.remote_blobs);
     268              : 
     269              :     // Prefix "" matches nothing with WithDelimiter.
     270              :     let objects: HashSet<_> = test_client
     271              :         .list(
     272              :             Some(&RemotePath::from_string("")?),
     273              :             ListingMode::WithDelimiter,
     274              :             None,
     275              :             &cancel,
     276              :         )
     277              :         .await?
     278              :         .keys
     279              :         .into_iter()
     280            0 :         .map(|o| o.key)
     281              :         .collect();
     282              :     assert!(objects.is_empty());
     283              : 
     284              :     // Prefix "foo" matches nothing.
     285              :     let objects: HashSet<_> = test_client
     286              :         .list(
     287              :             Some(&RemotePath::from_string("foo")?),
     288              :             ListingMode::NoDelimiter,
     289              :             None,
     290              :             &cancel,
     291              :         )
     292              :         .await?
     293              :         .keys
     294              :         .into_iter()
     295            0 :         .map(|o| o.key)
     296              :         .collect();
     297              :     assert!(objects.is_empty());
     298              : 
     299              :     // Prefix "folder2/blob" matches.
     300              :     let objects: HashSet<_> = test_client
     301              :         .list(
     302              :             Some(&RemotePath::from_string("folder2/blob")?),
     303              :             ListingMode::NoDelimiter,
     304              :             None,
     305              :             &cancel,
     306              :         )
     307              :         .await?
     308              :         .keys
     309              :         .into_iter()
     310           21 :         .map(|o| o.key)
     311              :         .collect();
     312              :     let expect: HashSet<_> = ctx
     313              :         .remote_blobs
     314              :         .iter()
     315           63 :         .filter(|o| o.get_path().starts_with("folder2"))
     316              :         .cloned()
     317              :         .collect();
     318              :     assert_eq!(&objects, &expect);
     319              : 
     320              :     // Prefix "folder2/foo" matches nothing.
     321              :     let objects: HashSet<_> = test_client
     322              :         .list(
     323              :             Some(&RemotePath::from_string("folder2/foo")?),
     324              :             ListingMode::NoDelimiter,
     325              :             None,
     326              :             &cancel,
     327              :         )
     328              :         .await?
     329              :         .keys
     330              :         .into_iter()
     331            0 :         .map(|o| o.key)
     332              :         .collect();
     333              :     assert!(objects.is_empty());
     334              : 
     335              :     Ok(())
     336              : }
     337              : 
     338            6 : #[test_context(MaybeEnabledStorage)]
     339              : #[tokio::test]
     340              : async fn delete_non_exising_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     341              :     let ctx = match ctx {
     342              :         MaybeEnabledStorage::Enabled(ctx) => ctx,
     343              :         MaybeEnabledStorage::Disabled => return Ok(()),
     344              :     };
     345              : 
     346              :     let cancel = CancellationToken::new();
     347              : 
     348              :     let path = RemotePath::new(Utf8Path::new(
     349              :         format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(),
     350              :     ))
     351            0 :     .with_context(|| "RemotePath conversion")?;
     352              : 
     353              :     ctx.client
     354              :         .delete(&path, &cancel)
     355              :         .await
     356              :         .expect("should succeed");
     357              : 
     358              :     Ok(())
     359              : }
     360              : 
     361            6 : #[test_context(MaybeEnabledStorage)]
     362              : #[tokio::test]
     363              : async fn delete_objects_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     364              :     let ctx = match ctx {
     365              :         MaybeEnabledStorage::Enabled(ctx) => ctx,
     366              :         MaybeEnabledStorage::Disabled => return Ok(()),
     367              :     };
     368              : 
     369              :     let cancel = CancellationToken::new();
     370              : 
     371              :     let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
     372            0 :         .with_context(|| "RemotePath conversion")?;
     373              : 
     374              :     let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
     375            0 :         .with_context(|| "RemotePath conversion")?;
     376              : 
     377              :     let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
     378            0 :         .with_context(|| "RemotePath conversion")?;
     379              : 
     380              :     let (data, len) = upload_stream("remote blob data1".as_bytes().into());
     381              :     ctx.client.upload(data, len, &path1, None, &cancel).await?;
     382              : 
     383              :     let (data, len) = upload_stream("remote blob data2".as_bytes().into());
     384              :     ctx.client.upload(data, len, &path2, None, &cancel).await?;
     385              : 
     386              :     let (data, len) = upload_stream("remote blob data3".as_bytes().into());
     387              :     ctx.client.upload(data, len, &path3, None, &cancel).await?;
     388              : 
     389              :     ctx.client.delete_objects(&[path1, path2], &cancel).await?;
     390              : 
     391              :     let prefixes = ctx
     392              :         .client
     393              :         .list(None, ListingMode::WithDelimiter, None, &cancel)
     394              :         .await?
     395              :         .prefixes;
     396              : 
     397              :     assert_eq!(prefixes.len(), 1);
     398              : 
     399              :     ctx.client.delete_objects(&[path3], &cancel).await?;
     400              : 
     401              :     Ok(())
     402              : }
     403              : 
     404              : /// Tests that delete_prefix() will delete all objects matching a prefix, including
     405              : /// partial prefixes (i.e. "/foo" matches "/foobar").
     406            6 : #[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
     407              : #[tokio::test]
     408              : async fn delete_prefix(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> anyhow::Result<()> {
     409              :     let ctx = match ctx {
     410              :         MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
     411              :         MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
     412              :         MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
     413              :             anyhow::bail!("S3 init failed: {e:?}")
     414              :         }
     415              :     };
     416              : 
     417              :     let cancel = CancellationToken::new();
     418              :     let test_client = Arc::clone(&ctx.enabled.client);
     419              : 
     420              :     /// Asserts that the S3 listing matches the given paths.
     421              :     macro_rules! assert_list {
     422              :         ($expect:expr) => {{
     423              :             let listing = test_client
     424              :                 .list(None, ListingMode::NoDelimiter, None, &cancel)
     425              :                 .await?
     426              :                 .keys
     427              :                 .into_iter()
     428          292 :                 .map(|o| o.key)
     429              :                 .collect();
     430              :             assert_eq!($expect, listing);
     431              :         }};
     432              :     }
     433              : 
     434              :     // We start with the full set of uploaded files.
     435              :     let mut expect = ctx.remote_blobs.clone();
     436              : 
     437              :     // Deleting a non-existing prefix should do nothing.
     438              :     test_client
     439              :         .delete_prefix(&RemotePath::from_string("xyz")?, &cancel)
     440              :         .await?;
     441              :     assert_list!(expect);
     442              : 
     443              :     // Prefixes are case-sensitive.
     444              :     test_client
     445              :         .delete_prefix(&RemotePath::from_string("Folder")?, &cancel)
     446              :         .await?;
     447              :     assert_list!(expect);
     448              : 
     449              :     // Deleting a path which overlaps with an existing object should do nothing. We pick the first
     450              :     // path in the set as our common prefix.
     451              :     let path = expect.iter().next().expect("empty set").clone().join("xyz");
     452              :     test_client.delete_prefix(&path, &cancel).await?;
     453              :     assert_list!(expect);
     454              : 
     455              :     // Deleting an exact path should work. We pick the first path in the set.
     456              :     let path = expect.iter().next().expect("empty set").clone();
     457              :     test_client.delete_prefix(&path, &cancel).await?;
     458              :     expect.remove(&path);
     459              :     assert_list!(expect);
     460              : 
     461              :     // Deleting a prefix should delete all matching objects.
     462              :     test_client
     463              :         .delete_prefix(&RemotePath::from_string("folder0/blob_")?, &cancel)
     464              :         .await?;
     465           60 :     expect.retain(|p| !p.get_path().as_str().starts_with("folder0/"));
     466              :     assert_list!(expect);
     467              : 
     468              :     // Deleting a common prefix should delete all objects.
     469              :     test_client
     470              :         .delete_prefix(&RemotePath::from_string("fold")?, &cancel)
     471              :         .await?;
     472              :     expect.clear();
     473              :     assert_list!(expect);
     474              : 
     475              :     Ok(())
     476              : }
     477              : 
     478            6 : #[test_context(MaybeEnabledStorage)]
     479              : #[tokio::test]
     480              : async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     481              :     let MaybeEnabledStorage::Enabled(ctx) = ctx else {
     482              :         return Ok(());
     483              :     };
     484              : 
     485              :     let cancel = CancellationToken::new();
     486              : 
     487              :     let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))
     488            0 :         .with_context(|| "RemotePath conversion")?;
     489              : 
     490              :     let orig = bytes::Bytes::from_static("remote blob data here".as_bytes());
     491              : 
     492              :     let (data, len) = wrap_stream(orig.clone());
     493              : 
     494              :     ctx.client.upload(data, len, &path, None, &cancel).await?;
     495              : 
     496              :     // Normal download request
     497              :     let dl = ctx
     498              :         .client
     499              :         .download(&path, &DownloadOpts::default(), &cancel)
     500              :         .await?;
     501              :     let buf = download_to_vec(dl).await?;
     502              :     assert_eq!(&buf, &orig);
     503              : 
     504              :     // Full range (end specified)
     505              :     let dl = ctx
     506              :         .client
     507              :         .download(
     508              :             &path,
     509              :             &DownloadOpts {
     510              :                 byte_start: Bound::Included(0),
     511              :                 byte_end: Bound::Excluded(len as u64),
     512              :                 ..Default::default()
     513              :             },
     514              :             &cancel,
     515              :         )
     516              :         .await?;
     517              :     let buf = download_to_vec(dl).await?;
     518              :     assert_eq!(&buf, &orig);
     519              : 
     520              :     // partial range (end specified)
     521              :     let dl = ctx
     522              :         .client
     523              :         .download(
     524              :             &path,
     525              :             &DownloadOpts {
     526              :                 byte_start: Bound::Included(4),
     527              :                 byte_end: Bound::Excluded(10),
     528              :                 ..Default::default()
     529              :             },
     530              :             &cancel,
     531              :         )
     532              :         .await?;
     533              :     let buf = download_to_vec(dl).await?;
     534              :     assert_eq!(&buf, &orig[4..10]);
     535              : 
     536              :     // partial range (end beyond real end)
     537              :     let dl = ctx
     538              :         .client
     539              :         .download(
     540              :             &path,
     541              :             &DownloadOpts {
     542              :                 byte_start: Bound::Included(8),
     543              :                 byte_end: Bound::Excluded(len as u64 * 100),
     544              :                 ..Default::default()
     545              :             },
     546              :             &cancel,
     547              :         )
     548              :         .await?;
     549              :     let buf = download_to_vec(dl).await?;
     550              :     assert_eq!(&buf, &orig[8..]);
     551              : 
     552              :     // Partial range (end unspecified)
     553              :     let dl = ctx
     554              :         .client
     555              :         .download(
     556              :             &path,
     557              :             &DownloadOpts {
     558              :                 byte_start: Bound::Included(4),
     559              :                 ..Default::default()
     560              :             },
     561              :             &cancel,
     562              :         )
     563              :         .await?;
     564              :     let buf = download_to_vec(dl).await?;
     565              :     assert_eq!(&buf, &orig[4..]);
     566              : 
     567              :     // Full range (end unspecified)
     568              :     let dl = ctx
     569              :         .client
     570              :         .download(
     571              :             &path,
     572              :             &DownloadOpts {
     573              :                 byte_start: Bound::Included(0),
     574              :                 ..Default::default()
     575              :             },
     576              :             &cancel,
     577              :         )
     578              :         .await?;
     579              :     let buf = download_to_vec(dl).await?;
     580              :     assert_eq!(&buf, &orig);
     581              : 
     582              :     debug!("Cleanup: deleting file at path {path:?}");
     583              :     ctx.client
     584              :         .delete(&path, &cancel)
     585              :         .await
     586            0 :         .with_context(|| format!("{path:?} removal"))?;
     587              : 
     588              :     Ok(())
     589              : }
     590              : 
     591              : /// Tests that conditional downloads work properly, by returning
     592              : /// DownloadError::Unmodified when the object ETag matches the given ETag.
     593            6 : #[test_context(MaybeEnabledStorage)]
     594              : #[tokio::test]
     595              : async fn download_conditional(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     596              :     let MaybeEnabledStorage::Enabled(ctx) = ctx else {
     597              :         return Ok(());
     598              :     };
     599              :     let cancel = CancellationToken::new();
     600              : 
     601              :     // Create a file.
     602              :     let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))?;
     603              :     let data = bytes::Bytes::from_static("foo".as_bytes());
     604              :     let (stream, len) = wrap_stream(data);
     605              :     ctx.client.upload(stream, len, &path, None, &cancel).await?;
     606              : 
     607              :     // Download it to obtain its etag.
     608              :     let mut opts = DownloadOpts::default();
     609              :     let download = ctx.client.download(&path, &opts, &cancel).await?;
     610              : 
     611              :     // Download with the etag yields DownloadError::Unmodified.
     612              :     opts.etag = Some(download.etag);
     613              :     let result = ctx.client.download(&path, &opts, &cancel).await;
     614              :     assert!(
     615              :         matches!(result, Err(DownloadError::Unmodified)),
     616              :         "expected DownloadError::Unmodified, got {result:?}"
     617              :     );
     618              : 
     619              :     // Replace the file contents.
     620              :     let data = bytes::Bytes::from_static("bar".as_bytes());
     621              :     let (stream, len) = wrap_stream(data);
     622              :     ctx.client.upload(stream, len, &path, None, &cancel).await?;
     623              : 
     624              :     // A download with the old etag should yield the new file.
     625              :     let download = ctx.client.download(&path, &opts, &cancel).await?;
     626              :     assert_ne!(download.etag, opts.etag.unwrap(), "ETag did not change");
     627              : 
     628              :     // A download with the new etag should yield Unmodified again.
     629              :     opts.etag = Some(download.etag);
     630              :     let result = ctx.client.download(&path, &opts, &cancel).await;
     631              :     assert!(
     632              :         matches!(result, Err(DownloadError::Unmodified)),
     633              :         "expected DownloadError::Unmodified, got {result:?}"
     634              :     );
     635              : 
     636              :     Ok(())
     637              : }
     638              : 
     639            6 : #[test_context(MaybeEnabledStorage)]
     640              : #[tokio::test]
     641              : async fn copy_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     642              :     let MaybeEnabledStorage::Enabled(ctx) = ctx else {
     643              :         return Ok(());
     644              :     };
     645              : 
     646              :     let cancel = CancellationToken::new();
     647              : 
     648              :     let path = RemotePath::new(Utf8Path::new(
     649              :         format!("{}/file_to_copy", ctx.base_prefix).as_str(),
     650              :     ))
     651            0 :     .with_context(|| "RemotePath conversion")?;
     652              :     let path_dest = RemotePath::new(Utf8Path::new(
     653              :         format!("{}/file_dest", ctx.base_prefix).as_str(),
     654              :     ))
     655            0 :     .with_context(|| "RemotePath conversion")?;
     656              : 
     657              :     let orig = bytes::Bytes::from_static("remote blob data content".as_bytes());
     658              : 
     659              :     let (data, len) = wrap_stream(orig.clone());
     660              : 
     661              :     ctx.client.upload(data, len, &path, None, &cancel).await?;
     662              : 
     663              :     // Normal download request
     664              :     ctx.client.copy_object(&path, &path_dest, &cancel).await?;
     665              : 
     666              :     let dl = ctx
     667              :         .client
     668              :         .download(&path_dest, &DownloadOpts::default(), &cancel)
     669              :         .await?;
     670              :     let buf = download_to_vec(dl).await?;
     671              :     assert_eq!(&buf, &orig);
     672              : 
     673              :     debug!("Cleanup: deleting file at path {path:?}");
     674              :     ctx.client
     675              :         .delete_objects(&[path.clone(), path_dest.clone()], &cancel)
     676              :         .await
     677            0 :         .with_context(|| format!("{path:?} removal"))?;
     678              : 
     679              :     Ok(())
     680              : }
     681              : 
     682              : /// Tests that head_object works properly.
     683            6 : #[test_context(MaybeEnabledStorage)]
     684              : #[tokio::test]
     685              : async fn head_object(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     686              :     let MaybeEnabledStorage::Enabled(ctx) = ctx else {
     687              :         return Ok(());
     688              :     };
     689              :     let cancel = CancellationToken::new();
     690              : 
     691              :     let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))?;
     692              : 
     693              :     // Errors on missing file.
     694              :     let result = ctx.client.head_object(&path, &cancel).await;
     695              :     assert!(
     696              :         matches!(result, Err(DownloadError::NotFound)),
     697              :         "expected NotFound, got {result:?}"
     698              :     );
     699              : 
     700              :     // Create the file.
     701              :     let data = bytes::Bytes::from_static("foo".as_bytes());
     702              :     let (stream, len) = wrap_stream(data);
     703              :     ctx.client.upload(stream, len, &path, None, &cancel).await?;
     704              : 
     705              :     // Fetch the head metadata.
     706              :     let object = ctx.client.head_object(&path, &cancel).await?;
     707              :     assert_eq!(
     708              :         object,
     709              :         ListingObject {
     710              :             key: path.clone(),
     711              :             last_modified: object.last_modified, // ignore
     712              :             size: 3
     713              :         }
     714              :     );
     715              : 
     716              :     // Wait for a couple of seconds, and then update the file to check the last
     717              :     // modified timestamp.
     718              :     tokio::time::sleep(std::time::Duration::from_secs(2)).await;
     719              : 
     720              :     let data = bytes::Bytes::from_static("bar".as_bytes());
     721              :     let (stream, len) = wrap_stream(data);
     722              :     ctx.client.upload(stream, len, &path, None, &cancel).await?;
     723              :     let new = ctx.client.head_object(&path, &cancel).await?;
     724              : 
     725              :     assert!(
     726              :         !new.last_modified
     727              :             .duration_since(object.last_modified)?
     728              :             .is_zero(),
     729              :         "last_modified did not advance"
     730              :     );
     731              : 
     732              :     Ok(())
     733              : }
        

Generated by: LCOV version 2.1-beta