LCOV - code coverage report
Current view: top level - libs/remote_storage/tests/common - tests.rs (source / functions) Coverage Total Hit
Test: 49aa928ec5b4b510172d8b5c6d154da28e70a46c.info Lines: 61.8 % 34 21
Test Date: 2024-11-13 18:23:39 Functions: 64.1 % 78 50

            Line data    Source code
       1              : use anyhow::Context;
       2              : use camino::Utf8Path;
       3              : use futures::StreamExt;
       4              : use remote_storage::{DownloadError, DownloadOpts, ListingMode, ListingObject, RemotePath};
       5              : use std::ops::Bound;
       6              : use std::sync::Arc;
       7              : use std::{collections::HashSet, num::NonZeroU32};
       8              : use test_context::test_context;
       9              : use tokio_util::sync::CancellationToken;
      10              : use tracing::debug;
      11              : 
      12              : use crate::common::{download_to_vec, upload_stream, wrap_stream};
      13              : 
      14              : use super::{
      15              :     MaybeEnabledStorage, MaybeEnabledStorageWithSimpleTestBlobs, MaybeEnabledStorageWithTestBlobs,
      16              : };
      17              : 
      18              : /// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries.
      19              : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified.
      20              : /// See the client creation in [`create_s3_client`] for details on the required env vars.
      21              : /// If real S3 tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the
      22              : /// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details.
      23              : ///
      24              : /// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_remote_data`]
      25              : /// where
      26              : /// * `random_prefix_part` is set for the entire S3 client during the S3 client creation in [`create_s3_client`], to avoid multiple test runs interference
      27              : /// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket
      28              : ///
      29              : /// Then, verifies that the client does return correct prefixes when queried:
      30              : /// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only
      31              : /// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}`
      32              : ///
      33              : /// In the `MaybeEnabledStorageWithTestBlobs::setup`, we set the `max_keys_in_list_response` param to limit the keys in a single response.
      34              : /// This way, we are able to test the pagination, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to S3,
      35              : /// as the current default AWS S3 pagination limit is 1000.
      36              : /// (see <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>).
      37              : ///
      38              : /// Lastly, the test attempts to clean up and remove all uploaded S3 files.
      39              : /// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished.
      40            6 : #[test_context(MaybeEnabledStorageWithTestBlobs)]
      41              : #[tokio::test]
      42              : async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> anyhow::Result<()> {
      43              :     let ctx = match ctx {
      44              :         MaybeEnabledStorageWithTestBlobs::Enabled(ctx) => ctx,
      45              :         MaybeEnabledStorageWithTestBlobs::Disabled => return Ok(()),
      46              :         MaybeEnabledStorageWithTestBlobs::UploadsFailed(e, _) => {
      47              :             anyhow::bail!("S3 init failed: {e:?}")
      48              :         }
      49              :     };
      50              : 
      51              :     let cancel = CancellationToken::new();
      52              : 
      53              :     let test_client = Arc::clone(&ctx.enabled.client);
      54              :     let expected_remote_prefixes = ctx.remote_prefixes.clone();
      55              : 
      56              :     let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix))
      57              :         .context("common_prefix construction")?;
      58              :     let root_remote_prefixes = test_client
      59              :         .list(None, ListingMode::WithDelimiter, None, &cancel)
      60              :         .await?
      61              :         .prefixes
      62              :         .into_iter()
      63              :         .collect::<HashSet<_>>();
      64              :     assert_eq!(
      65              :         root_remote_prefixes, HashSet::from([base_prefix.clone()]),
      66              :         "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}"
      67              :     );
      68              : 
      69              :     let nested_remote_prefixes = test_client
      70              :         .list(
      71              :             Some(&base_prefix.add_trailing_slash()),
      72              :             ListingMode::WithDelimiter,
      73              :             None,
      74              :             &cancel,
      75              :         )
      76              :         .await?
      77              :         .prefixes
      78              :         .into_iter()
      79              :         .collect::<HashSet<_>>();
      80              :     let remote_only_prefixes = nested_remote_prefixes
      81              :         .difference(&expected_remote_prefixes)
      82              :         .collect::<HashSet<_>>();
      83              :     let missing_uploaded_prefixes = expected_remote_prefixes
      84              :         .difference(&nested_remote_prefixes)
      85              :         .collect::<HashSet<_>>();
      86              :     assert_eq!(
      87              :         remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
      88              :         "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
      89              :     );
      90              : 
      91              :     // list_streaming
      92              : 
      93              :     let prefix_with_slash = base_prefix.add_trailing_slash();
      94              :     let mut nested_remote_prefixes_st = test_client.list_streaming(
      95              :         Some(&prefix_with_slash),
      96              :         ListingMode::WithDelimiter,
      97              :         None,
      98              :         &cancel,
      99              :     );
     100              :     let mut nested_remote_prefixes_combined = HashSet::new();
     101              :     let mut segments = 0;
     102              :     let mut segment_max_size = 0;
     103              :     while let Some(st) = nested_remote_prefixes_st.next().await {
     104              :         let st = st?;
     105              :         segment_max_size = segment_max_size.max(st.prefixes.len());
     106              :         nested_remote_prefixes_combined.extend(st.prefixes.into_iter());
     107              :         segments += 1;
     108              :     }
     109              :     assert!(segments > 1, "less than 2 segments: {segments}");
     110              :     assert!(
     111              :         segment_max_size * 2 <= nested_remote_prefixes_combined.len(),
     112              :         "double of segment_max_size={segment_max_size} larger number of remote prefixes of {}",
     113              :         nested_remote_prefixes_combined.len()
     114              :     );
     115              :     let remote_only_prefixes = nested_remote_prefixes_combined
     116              :         .difference(&expected_remote_prefixes)
     117              :         .collect::<HashSet<_>>();
     118              :     let missing_uploaded_prefixes = expected_remote_prefixes
     119              :         .difference(&nested_remote_prefixes_combined)
     120              :         .collect::<HashSet<_>>();
     121              :     assert_eq!(
     122              :         remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
     123              :         "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
     124              :     );
     125              : 
     126              :     Ok(())
     127              : }
     128              : 
     129              : /// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries.
     130              : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set.
     131              : /// See `s3_pagination_should_work` for more information.
     132              : ///
     133              : /// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`]
     134              : /// Then performs the following queries:
     135              : ///    1. `list(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
     136              : ///    2. `list("folder1")`.  This  should return all files `random_prefix/folder1/blob_{i}.txt`
     137            6 : #[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
     138              : #[tokio::test]
     139              : async fn list_no_delimiter_works(
     140              :     ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs,
     141              : ) -> anyhow::Result<()> {
     142              :     let ctx = match ctx {
     143              :         MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
     144              :         MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
     145              :         MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
     146              :             anyhow::bail!("S3 init failed: {e:?}")
     147              :         }
     148              :     };
     149              :     let cancel = CancellationToken::new();
     150              :     let test_client = Arc::clone(&ctx.enabled.client);
     151              :     let base_prefix =
     152              :         RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?;
     153              :     let root_files = test_client
     154              :         .list(None, ListingMode::NoDelimiter, None, &cancel)
     155              :         .await
     156              :         .context("client list root files failure")?
     157              :         .keys
     158              :         .into_iter()
     159           63 :         .map(|o| o.key)
     160              :         .collect::<HashSet<_>>();
     161              :     assert_eq!(
     162              :         root_files,
     163              :         ctx.remote_blobs.clone(),
     164              :         "remote storage list on root mismatches with the uploads."
     165              :     );
     166              : 
     167              :     // Test that max_keys limit works. In total there are about 21 files (see
     168              :     // upload_simple_remote_data call in test_real_s3.rs).
     169              :     let limited_root_files = test_client
     170              :         .list(
     171              :             None,
     172              :             ListingMode::NoDelimiter,
     173              :             Some(NonZeroU32::new(2).unwrap()),
     174              :             &cancel,
     175              :         )
     176              :         .await
     177              :         .context("client list root files failure")?;
     178              :     assert_eq!(limited_root_files.keys.len(), 2);
     179              : 
     180              :     let nested_remote_files = test_client
     181              :         .list(Some(&base_prefix), ListingMode::NoDelimiter, None, &cancel)
     182              :         .await
     183              :         .context("client list nested files failure")?
     184              :         .keys
     185              :         .into_iter()
     186           21 :         .map(|o| o.key)
     187              :         .collect::<HashSet<_>>();
     188              :     let trim_remote_blobs: HashSet<_> = ctx
     189              :         .remote_blobs
     190              :         .iter()
     191           63 :         .map(|x| x.get_path())
     192           63 :         .filter(|x| x.starts_with("folder1"))
     193           21 :         .map(|x| RemotePath::new(x).expect("must be valid path"))
     194              :         .collect();
     195              :     assert_eq!(
     196              :         nested_remote_files, trim_remote_blobs,
     197              :         "remote storage list on subdirrectory mismatches with the uploads."
     198              :     );
     199              :     Ok(())
     200              : }
     201              : 
     202              : /// Tests that giving a partial prefix returns all matches (e.g. "/foo" yields "/foobar/baz"),
     203              : /// but only with NoDelimiter.
     204            6 : #[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
     205              : #[tokio::test]
     206              : async fn list_partial_prefix(
     207              :     ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs,
     208              : ) -> anyhow::Result<()> {
     209              :     let ctx = match ctx {
     210              :         MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
     211              :         MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
     212              :         MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
     213              :             anyhow::bail!("S3 init failed: {e:?}")
     214              :         }
     215              :     };
     216              : 
     217              :     let cancel = CancellationToken::new();
     218              :     let test_client = Arc::clone(&ctx.enabled.client);
     219              : 
     220              :     // Prefix "fold" should match all "folder{i}" directories with NoDelimiter.
     221              :     let objects: HashSet<_> = test_client
     222              :         .list(
     223              :             Some(&RemotePath::from_string("fold")?),
     224              :             ListingMode::NoDelimiter,
     225              :             None,
     226              :             &cancel,
     227              :         )
     228              :         .await?
     229              :         .keys
     230              :         .into_iter()
     231           63 :         .map(|o| o.key)
     232              :         .collect();
     233              :     assert_eq!(&objects, &ctx.remote_blobs);
     234              : 
     235              :     // Prefix "fold" matches nothing with WithDelimiter.
     236              :     let objects: HashSet<_> = test_client
     237              :         .list(
     238              :             Some(&RemotePath::from_string("fold")?),
     239              :             ListingMode::WithDelimiter,
     240              :             None,
     241              :             &cancel,
     242              :         )
     243              :         .await?
     244              :         .keys
     245              :         .into_iter()
     246            0 :         .map(|o| o.key)
     247              :         .collect();
     248              :     assert!(objects.is_empty());
     249              : 
     250              :     // Prefix "" matches everything.
     251              :     let objects: HashSet<_> = test_client
     252              :         .list(
     253              :             Some(&RemotePath::from_string("")?),
     254              :             ListingMode::NoDelimiter,
     255              :             None,
     256              :             &cancel,
     257              :         )
     258              :         .await?
     259              :         .keys
     260              :         .into_iter()
     261           63 :         .map(|o| o.key)
     262              :         .collect();
     263              :     assert_eq!(&objects, &ctx.remote_blobs);
     264              : 
     265              :     // Prefix "" matches nothing with WithDelimiter.
     266              :     let objects: HashSet<_> = test_client
     267              :         .list(
     268              :             Some(&RemotePath::from_string("")?),
     269              :             ListingMode::WithDelimiter,
     270              :             None,
     271              :             &cancel,
     272              :         )
     273              :         .await?
     274              :         .keys
     275              :         .into_iter()
     276            0 :         .map(|o| o.key)
     277              :         .collect();
     278              :     assert!(objects.is_empty());
     279              : 
     280              :     // Prefix "foo" matches nothing.
     281              :     let objects: HashSet<_> = test_client
     282              :         .list(
     283              :             Some(&RemotePath::from_string("foo")?),
     284              :             ListingMode::NoDelimiter,
     285              :             None,
     286              :             &cancel,
     287              :         )
     288              :         .await?
     289              :         .keys
     290              :         .into_iter()
     291            0 :         .map(|o| o.key)
     292              :         .collect();
     293              :     assert!(objects.is_empty());
     294              : 
     295              :     // Prefix "folder2/blob" matches.
     296              :     let objects: HashSet<_> = test_client
     297              :         .list(
     298              :             Some(&RemotePath::from_string("folder2/blob")?),
     299              :             ListingMode::NoDelimiter,
     300              :             None,
     301              :             &cancel,
     302              :         )
     303              :         .await?
     304              :         .keys
     305              :         .into_iter()
     306           21 :         .map(|o| o.key)
     307              :         .collect();
     308              :     let expect: HashSet<_> = ctx
     309              :         .remote_blobs
     310              :         .iter()
     311           63 :         .filter(|o| o.get_path().starts_with("folder2"))
     312              :         .cloned()
     313              :         .collect();
     314              :     assert_eq!(&objects, &expect);
     315              : 
     316              :     // Prefix "folder2/foo" matches nothing.
     317              :     let objects: HashSet<_> = test_client
     318              :         .list(
     319              :             Some(&RemotePath::from_string("folder2/foo")?),
     320              :             ListingMode::NoDelimiter,
     321              :             None,
     322              :             &cancel,
     323              :         )
     324              :         .await?
     325              :         .keys
     326              :         .into_iter()
     327            0 :         .map(|o| o.key)
     328              :         .collect();
     329              :     assert!(objects.is_empty());
     330              : 
     331              :     Ok(())
     332              : }
     333              : 
     334            6 : #[test_context(MaybeEnabledStorage)]
     335              : #[tokio::test]
     336              : async fn delete_non_exising_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     337              :     let ctx = match ctx {
     338              :         MaybeEnabledStorage::Enabled(ctx) => ctx,
     339              :         MaybeEnabledStorage::Disabled => return Ok(()),
     340              :     };
     341              : 
     342              :     let cancel = CancellationToken::new();
     343              : 
     344              :     let path = RemotePath::new(Utf8Path::new(
     345              :         format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(),
     346              :     ))
     347            0 :     .with_context(|| "RemotePath conversion")?;
     348              : 
     349              :     ctx.client
     350              :         .delete(&path, &cancel)
     351              :         .await
     352              :         .expect("should succeed");
     353              : 
     354              :     Ok(())
     355              : }
     356              : 
     357            6 : #[test_context(MaybeEnabledStorage)]
     358              : #[tokio::test]
     359              : async fn delete_objects_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     360              :     let ctx = match ctx {
     361              :         MaybeEnabledStorage::Enabled(ctx) => ctx,
     362              :         MaybeEnabledStorage::Disabled => return Ok(()),
     363              :     };
     364              : 
     365              :     let cancel = CancellationToken::new();
     366              : 
     367              :     let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
     368            0 :         .with_context(|| "RemotePath conversion")?;
     369              : 
     370              :     let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
     371            0 :         .with_context(|| "RemotePath conversion")?;
     372              : 
     373              :     let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
     374            0 :         .with_context(|| "RemotePath conversion")?;
     375              : 
     376              :     let (data, len) = upload_stream("remote blob data1".as_bytes().into());
     377              :     ctx.client.upload(data, len, &path1, None, &cancel).await?;
     378              : 
     379              :     let (data, len) = upload_stream("remote blob data2".as_bytes().into());
     380              :     ctx.client.upload(data, len, &path2, None, &cancel).await?;
     381              : 
     382              :     let (data, len) = upload_stream("remote blob data3".as_bytes().into());
     383              :     ctx.client.upload(data, len, &path3, None, &cancel).await?;
     384              : 
     385              :     ctx.client.delete_objects(&[path1, path2], &cancel).await?;
     386              : 
     387              :     let prefixes = ctx
     388              :         .client
     389              :         .list(None, ListingMode::WithDelimiter, None, &cancel)
     390              :         .await?
     391              :         .prefixes;
     392              : 
     393              :     assert_eq!(prefixes.len(), 1);
     394              : 
     395              :     ctx.client.delete_objects(&[path3], &cancel).await?;
     396              : 
     397              :     Ok(())
     398              : }
     399              : 
     400              : /// Tests that delete_prefix() will delete all objects matching a prefix, including
     401              : /// partial prefixes (i.e. "/foo" matches "/foobar").
     402            6 : #[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
     403              : #[tokio::test]
     404              : async fn delete_prefix(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> anyhow::Result<()> {
     405              :     let ctx = match ctx {
     406              :         MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
     407              :         MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
     408              :         MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
     409              :             anyhow::bail!("S3 init failed: {e:?}")
     410              :         }
     411              :     };
     412              : 
     413              :     let cancel = CancellationToken::new();
     414              :     let test_client = Arc::clone(&ctx.enabled.client);
     415              : 
     416              :     /// Asserts that the S3 listing matches the given paths.
     417              :     macro_rules! assert_list {
     418              :         ($expect:expr) => {{
     419              :             let listing = test_client
     420              :                 .list(None, ListingMode::NoDelimiter, None, &cancel)
     421              :                 .await?
     422              :                 .keys
     423              :                 .into_iter()
     424          293 :                 .map(|o| o.key)
     425              :                 .collect();
     426              :             assert_eq!($expect, listing);
     427              :         }};
     428              :     }
     429              : 
     430              :     // We start with the full set of uploaded files.
     431              :     let mut expect = ctx.remote_blobs.clone();
     432              : 
     433              :     // Deleting a non-existing prefix should do nothing.
     434              :     test_client
     435              :         .delete_prefix(&RemotePath::from_string("xyz")?, &cancel)
     436              :         .await?;
     437              :     assert_list!(expect);
     438              : 
     439              :     // Prefixes are case-sensitive.
     440              :     test_client
     441              :         .delete_prefix(&RemotePath::from_string("Folder")?, &cancel)
     442              :         .await?;
     443              :     assert_list!(expect);
     444              : 
     445              :     // Deleting a path which overlaps with an existing object should do nothing. We pick the first
     446              :     // path in the set as our common prefix.
     447              :     let path = expect.iter().next().expect("empty set").clone().join("xyz");
     448              :     test_client.delete_prefix(&path, &cancel).await?;
     449              :     assert_list!(expect);
     450              : 
     451              :     // Deleting an exact path should work. We pick the first path in the set.
     452              :     let path = expect.iter().next().expect("empty set").clone();
     453              :     test_client.delete_prefix(&path, &cancel).await?;
     454              :     expect.remove(&path);
     455              :     assert_list!(expect);
     456              : 
     457              :     // Deleting a prefix should delete all matching objects.
     458              :     test_client
     459              :         .delete_prefix(&RemotePath::from_string("folder0/blob_")?, &cancel)
     460              :         .await?;
     461           60 :     expect.retain(|p| !p.get_path().as_str().starts_with("folder0/"));
     462              :     assert_list!(expect);
     463              : 
     464              :     // Deleting a common prefix should delete all objects.
     465              :     test_client
     466              :         .delete_prefix(&RemotePath::from_string("fold")?, &cancel)
     467              :         .await?;
     468              :     expect.clear();
     469              :     assert_list!(expect);
     470              : 
     471              :     Ok(())
     472              : }
     473              : 
     474            6 : #[test_context(MaybeEnabledStorage)]
     475              : #[tokio::test]
     476              : async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     477              :     let MaybeEnabledStorage::Enabled(ctx) = ctx else {
     478              :         return Ok(());
     479              :     };
     480              : 
     481              :     let cancel = CancellationToken::new();
     482              : 
     483              :     let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))
     484            0 :         .with_context(|| "RemotePath conversion")?;
     485              : 
     486              :     let orig = bytes::Bytes::from_static("remote blob data here".as_bytes());
     487              : 
     488              :     let (data, len) = wrap_stream(orig.clone());
     489              : 
     490              :     ctx.client.upload(data, len, &path, None, &cancel).await?;
     491              : 
     492              :     // Normal download request
     493              :     let dl = ctx
     494              :         .client
     495              :         .download(&path, &DownloadOpts::default(), &cancel)
     496              :         .await?;
     497              :     let buf = download_to_vec(dl).await?;
     498              :     assert_eq!(&buf, &orig);
     499              : 
     500              :     // Full range (end specified)
     501              :     let dl = ctx
     502              :         .client
     503              :         .download(
     504              :             &path,
     505              :             &DownloadOpts {
     506              :                 byte_start: Bound::Included(0),
     507              :                 byte_end: Bound::Excluded(len as u64),
     508              :                 ..Default::default()
     509              :             },
     510              :             &cancel,
     511              :         )
     512              :         .await?;
     513              :     let buf = download_to_vec(dl).await?;
     514              :     assert_eq!(&buf, &orig);
     515              : 
     516              :     // partial range (end specified)
     517              :     let dl = ctx
     518              :         .client
     519              :         .download(
     520              :             &path,
     521              :             &DownloadOpts {
     522              :                 byte_start: Bound::Included(4),
     523              :                 byte_end: Bound::Excluded(10),
     524              :                 ..Default::default()
     525              :             },
     526              :             &cancel,
     527              :         )
     528              :         .await?;
     529              :     let buf = download_to_vec(dl).await?;
     530              :     assert_eq!(&buf, &orig[4..10]);
     531              : 
     532              :     // partial range (end beyond real end)
     533              :     let dl = ctx
     534              :         .client
     535              :         .download(
     536              :             &path,
     537              :             &DownloadOpts {
     538              :                 byte_start: Bound::Included(8),
     539              :                 byte_end: Bound::Excluded(len as u64 * 100),
     540              :                 ..Default::default()
     541              :             },
     542              :             &cancel,
     543              :         )
     544              :         .await?;
     545              :     let buf = download_to_vec(dl).await?;
     546              :     assert_eq!(&buf, &orig[8..]);
     547              : 
     548              :     // Partial range (end unspecified)
     549              :     let dl = ctx
     550              :         .client
     551              :         .download(
     552              :             &path,
     553              :             &DownloadOpts {
     554              :                 byte_start: Bound::Included(4),
     555              :                 ..Default::default()
     556              :             },
     557              :             &cancel,
     558              :         )
     559              :         .await?;
     560              :     let buf = download_to_vec(dl).await?;
     561              :     assert_eq!(&buf, &orig[4..]);
     562              : 
     563              :     // Full range (end unspecified)
     564              :     let dl = ctx
     565              :         .client
     566              :         .download(
     567              :             &path,
     568              :             &DownloadOpts {
     569              :                 byte_start: Bound::Included(0),
     570              :                 ..Default::default()
     571              :             },
     572              :             &cancel,
     573              :         )
     574              :         .await?;
     575              :     let buf = download_to_vec(dl).await?;
     576              :     assert_eq!(&buf, &orig);
     577              : 
     578              :     debug!("Cleanup: deleting file at path {path:?}");
     579              :     ctx.client
     580              :         .delete(&path, &cancel)
     581              :         .await
     582            0 :         .with_context(|| format!("{path:?} removal"))?;
     583              : 
     584              :     Ok(())
     585              : }
     586              : 
     587              : /// Tests that conditional downloads work properly, by returning
     588              : /// DownloadError::Unmodified when the object ETag matches the given ETag.
     589            6 : #[test_context(MaybeEnabledStorage)]
     590              : #[tokio::test]
     591              : async fn download_conditional(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     592              :     let MaybeEnabledStorage::Enabled(ctx) = ctx else {
     593              :         return Ok(());
     594              :     };
     595              :     let cancel = CancellationToken::new();
     596              : 
     597              :     // Create a file.
     598              :     let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))?;
     599              :     let data = bytes::Bytes::from_static("foo".as_bytes());
     600              :     let (stream, len) = wrap_stream(data);
     601              :     ctx.client.upload(stream, len, &path, None, &cancel).await?;
     602              : 
     603              :     // Download it to obtain its etag.
     604              :     let mut opts = DownloadOpts::default();
     605              :     let download = ctx.client.download(&path, &opts, &cancel).await?;
     606              : 
     607              :     // Download with the etag yields DownloadError::Unmodified.
     608              :     opts.etag = Some(download.etag);
     609              :     let result = ctx.client.download(&path, &opts, &cancel).await;
     610              :     assert!(
     611              :         matches!(result, Err(DownloadError::Unmodified)),
     612              :         "expected DownloadError::Unmodified, got {result:?}"
     613              :     );
     614              : 
     615              :     // Replace the file contents.
     616              :     let data = bytes::Bytes::from_static("bar".as_bytes());
     617              :     let (stream, len) = wrap_stream(data);
     618              :     ctx.client.upload(stream, len, &path, None, &cancel).await?;
     619              : 
     620              :     // A download with the old etag should yield the new file.
     621              :     let download = ctx.client.download(&path, &opts, &cancel).await?;
     622              :     assert_ne!(download.etag, opts.etag.unwrap(), "ETag did not change");
     623              : 
     624              :     // A download with the new etag should yield Unmodified again.
     625              :     opts.etag = Some(download.etag);
     626              :     let result = ctx.client.download(&path, &opts, &cancel).await;
     627              :     assert!(
     628              :         matches!(result, Err(DownloadError::Unmodified)),
     629              :         "expected DownloadError::Unmodified, got {result:?}"
     630              :     );
     631              : 
     632              :     Ok(())
     633              : }
     634              : 
     635            6 : #[test_context(MaybeEnabledStorage)]
     636              : #[tokio::test]
     637              : async fn copy_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     638              :     let MaybeEnabledStorage::Enabled(ctx) = ctx else {
     639              :         return Ok(());
     640              :     };
     641              : 
     642              :     let cancel = CancellationToken::new();
     643              : 
     644              :     let path = RemotePath::new(Utf8Path::new(
     645              :         format!("{}/file_to_copy", ctx.base_prefix).as_str(),
     646              :     ))
     647            0 :     .with_context(|| "RemotePath conversion")?;
     648              :     let path_dest = RemotePath::new(Utf8Path::new(
     649              :         format!("{}/file_dest", ctx.base_prefix).as_str(),
     650              :     ))
     651            0 :     .with_context(|| "RemotePath conversion")?;
     652              : 
     653              :     let orig = bytes::Bytes::from_static("remote blob data content".as_bytes());
     654              : 
     655              :     let (data, len) = wrap_stream(orig.clone());
     656              : 
     657              :     ctx.client.upload(data, len, &path, None, &cancel).await?;
     658              : 
     659              :     // Normal download request
     660              :     ctx.client.copy_object(&path, &path_dest, &cancel).await?;
     661              : 
     662              :     let dl = ctx
     663              :         .client
     664              :         .download(&path_dest, &DownloadOpts::default(), &cancel)
     665              :         .await?;
     666              :     let buf = download_to_vec(dl).await?;
     667              :     assert_eq!(&buf, &orig);
     668              : 
     669              :     debug!("Cleanup: deleting file at path {path:?}");
     670              :     ctx.client
     671              :         .delete_objects(&[path.clone(), path_dest.clone()], &cancel)
     672              :         .await
     673            0 :         .with_context(|| format!("{path:?} removal"))?;
     674              : 
     675              :     Ok(())
     676              : }
     677              : 
     678              : /// Tests that head_object works properly.
     679            6 : #[test_context(MaybeEnabledStorage)]
     680              : #[tokio::test]
     681              : async fn head_object(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     682              :     let MaybeEnabledStorage::Enabled(ctx) = ctx else {
     683              :         return Ok(());
     684              :     };
     685              :     let cancel = CancellationToken::new();
     686              : 
     687              :     let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))?;
     688              : 
     689              :     // Errors on missing file.
     690              :     let result = ctx.client.head_object(&path, &cancel).await;
     691              :     assert!(
     692              :         matches!(result, Err(DownloadError::NotFound)),
     693              :         "expected NotFound, got {result:?}"
     694              :     );
     695              : 
     696              :     // Create the file.
     697              :     let data = bytes::Bytes::from_static("foo".as_bytes());
     698              :     let (stream, len) = wrap_stream(data);
     699              :     ctx.client.upload(stream, len, &path, None, &cancel).await?;
     700              : 
     701              :     // Fetch the head metadata.
     702              :     let object = ctx.client.head_object(&path, &cancel).await?;
     703              :     assert_eq!(
     704              :         object,
     705              :         ListingObject {
     706              :             key: path.clone(),
     707              :             last_modified: object.last_modified, // ignore
     708              :             size: 3
     709              :         }
     710              :     );
     711              : 
     712              :     // Wait for a couple of seconds, and then update the file to check the last
     713              :     // modified timestamp.
     714              :     tokio::time::sleep(std::time::Duration::from_secs(2)).await;
     715              : 
     716              :     let data = bytes::Bytes::from_static("bar".as_bytes());
     717              :     let (stream, len) = wrap_stream(data);
     718              :     ctx.client.upload(stream, len, &path, None, &cancel).await?;
     719              :     let new = ctx.client.head_object(&path, &cancel).await?;
     720              : 
     721              :     assert!(
     722              :         !new.last_modified
     723              :             .duration_since(object.last_modified)?
     724              :             .is_zero(),
     725              :         "last_modified did not advance"
     726              :     );
     727              : 
     728              :     Ok(())
     729              : }
        

Generated by: LCOV version 2.1-beta