LCOV - code coverage report
Current view: top level - libs/remote_storage/tests/common - tests.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 17.2 % 209 36
Test Date: 2024-02-29 11:57:12 Functions: 68.2 % 88 60

            Line data    Source code
       1              : use anyhow::Context;
       2              : use camino::Utf8Path;
       3              : use remote_storage::RemotePath;
       4              : use std::sync::Arc;
       5              : use std::{collections::HashSet, num::NonZeroU32};
       6              : use test_context::test_context;
       7              : use tokio_util::sync::CancellationToken;
       8              : use tracing::debug;
       9              : 
      10              : use crate::common::{download_to_vec, upload_stream, wrap_stream};
      11              : 
      12              : use super::{
      13              :     MaybeEnabledStorage, MaybeEnabledStorageWithSimpleTestBlobs, MaybeEnabledStorageWithTestBlobs,
      14              : };
      15              : 
      16              : /// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries.
      17              : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified.
      18              : /// See the client creation in [`create_s3_client`] for details on the required env vars.
      19              : /// If real S3 tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the
      20              : /// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details.
      21              : ///
      22              : /// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_remote_data`]
      23              : /// where
      24              : /// * `random_prefix_part` is set for the entire S3 client during the S3 client creation in [`create_s3_client`], to avoid multiple test runs interference
      25              : /// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket
      26              : ///
      27              : /// Then, verifies that the client does return correct prefixes when queried:
      28              : /// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only
      29              : /// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}`
      30              : ///
      31              : /// With the real S3 enabled and `#[cfg(test)]` Rust configuration used, the S3 client test adds a `max-keys` param to limit the response keys.
      32              : /// This way, we are able to test the pagination implicitly, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to S3,
      33              : /// since current default AWS S3 pagination limit is 1000.
      34              : /// (see https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax)
      35              : ///
      36              : /// Lastly, the test attempts to clean up and remove all uploaded S3 files.
      37              : /// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished.
      38            4 : #[test_context(MaybeEnabledStorageWithTestBlobs)]
      39            4 : #[tokio::test]
      40            8 : async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> anyhow::Result<()> {
      41            4 :     let ctx = match ctx {
      42            0 :         MaybeEnabledStorageWithTestBlobs::Enabled(ctx) => ctx,
      43            4 :         MaybeEnabledStorageWithTestBlobs::Disabled => return Ok(()),
      44            0 :         MaybeEnabledStorageWithTestBlobs::UploadsFailed(e, _) => {
      45            0 :             anyhow::bail!("S3 init failed: {e:?}")
      46              :         }
      47              :     };
      48              : 
      49            0 :     let cancel = CancellationToken::new();
      50            0 : 
      51            0 :     let test_client = Arc::clone(&ctx.enabled.client);
      52            0 :     let expected_remote_prefixes = ctx.remote_prefixes.clone();
      53              : 
      54            0 :     let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix))
      55            0 :         .context("common_prefix construction")?;
      56            0 :     let root_remote_prefixes = test_client
      57            0 :         .list_prefixes(None, &cancel)
      58            0 :         .await
      59            0 :         .context("client list root prefixes failure")?
      60            0 :         .into_iter()
      61            0 :         .collect::<HashSet<_>>();
      62            0 :     assert_eq!(
      63            0 :         root_remote_prefixes, HashSet::from([base_prefix.clone()]),
      64            0 :         "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}"
      65              :     );
      66              : 
      67            0 :     let nested_remote_prefixes = test_client
      68            0 :         .list_prefixes(Some(&base_prefix), &cancel)
      69            0 :         .await
      70            0 :         .context("client list nested prefixes failure")?
      71            0 :         .into_iter()
      72            0 :         .collect::<HashSet<_>>();
      73            0 :     let remote_only_prefixes = nested_remote_prefixes
      74            0 :         .difference(&expected_remote_prefixes)
      75            0 :         .collect::<HashSet<_>>();
      76            0 :     let missing_uploaded_prefixes = expected_remote_prefixes
      77            0 :         .difference(&nested_remote_prefixes)
      78            0 :         .collect::<HashSet<_>>();
      79            0 :     assert_eq!(
      80            0 :         remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
      81            0 :         "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
      82              :     );
      83              : 
      84            0 :     Ok(())
      85            4 : }
      86              : 
      87              : /// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries.
      88              : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set.
      89              : /// See `s3_pagination_should_work` for more information.
      90              : ///
      91              : /// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`]
      92              : /// Then performs the following queries:
      93              : ///    1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
      94              : ///    2. `list_files("folder1")`.  This  should return all files `random_prefix/folder1/blob_{i}.txt`
      95            4 : #[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
      96            4 : #[tokio::test]
      97            8 : async fn list_files_works(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> anyhow::Result<()> {
      98            4 :     let ctx = match ctx {
      99            0 :         MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
     100            4 :         MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
     101            0 :         MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
     102            0 :             anyhow::bail!("S3 init failed: {e:?}")
     103              :         }
     104              :     };
     105            0 :     let cancel = CancellationToken::new();
     106            0 :     let test_client = Arc::clone(&ctx.enabled.client);
     107            0 :     let base_prefix =
     108            0 :         RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?;
     109            0 :     let root_files = test_client
     110            0 :         .list_files(None, None, &cancel)
     111            0 :         .await
     112            0 :         .context("client list root files failure")?
     113            0 :         .into_iter()
     114            0 :         .collect::<HashSet<_>>();
     115            0 :     assert_eq!(
     116            0 :         root_files,
     117            0 :         ctx.remote_blobs.clone(),
     118            0 :         "remote storage list_files on root mismatches with the uploads."
     119              :     );
     120              : 
     121              :     // Test that max_keys limit works. In total there are about 21 files (see
     122              :     // upload_simple_remote_data call in test_real_s3.rs).
     123            0 :     let limited_root_files = test_client
     124            0 :         .list_files(None, Some(NonZeroU32::new(2).unwrap()), &cancel)
     125            0 :         .await
     126            0 :         .context("client list root files failure")?;
     127            0 :     assert_eq!(limited_root_files.len(), 2);
     128              : 
     129            0 :     let nested_remote_files = test_client
     130            0 :         .list_files(Some(&base_prefix), None, &cancel)
     131            0 :         .await
     132            0 :         .context("client list nested files failure")?
     133            0 :         .into_iter()
     134            0 :         .collect::<HashSet<_>>();
     135            0 :     let trim_remote_blobs: HashSet<_> = ctx
     136            0 :         .remote_blobs
     137            0 :         .iter()
     138            0 :         .map(|x| x.get_path())
     139            0 :         .filter(|x| x.starts_with("folder1"))
     140            0 :         .map(|x| RemotePath::new(x).expect("must be valid path"))
     141            0 :         .collect();
     142              :     assert_eq!(
     143              :         nested_remote_files, trim_remote_blobs,
     144            0 :         "remote storage list_files on subdirrectory mismatches with the uploads."
     145              :     );
     146            0 :     Ok(())
     147            4 : }
     148              : 
     149            4 : #[test_context(MaybeEnabledStorage)]
     150            4 : #[tokio::test]
     151            8 : async fn delete_non_exising_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     152            4 :     let ctx = match ctx {
     153            0 :         MaybeEnabledStorage::Enabled(ctx) => ctx,
     154            4 :         MaybeEnabledStorage::Disabled => return Ok(()),
     155              :     };
     156              : 
     157            0 :     let cancel = CancellationToken::new();
     158              : 
     159            0 :     let path = RemotePath::new(Utf8Path::new(
     160            0 :         format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(),
     161            0 :     ))
     162            0 :     .with_context(|| "RemotePath conversion")?;
     163              : 
     164            0 :     ctx.client
     165            0 :         .delete(&path, &cancel)
     166            0 :         .await
     167            0 :         .expect("should succeed");
     168            0 : 
     169            0 :     Ok(())
     170            4 : }
     171              : 
     172            4 : #[test_context(MaybeEnabledStorage)]
     173            4 : #[tokio::test]
     174            8 : async fn delete_objects_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     175            4 :     let ctx = match ctx {
     176            0 :         MaybeEnabledStorage::Enabled(ctx) => ctx,
     177            4 :         MaybeEnabledStorage::Disabled => return Ok(()),
     178              :     };
     179              : 
     180            0 :     let cancel = CancellationToken::new();
     181              : 
     182            0 :     let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
     183            0 :         .with_context(|| "RemotePath conversion")?;
     184              : 
     185            0 :     let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
     186            0 :         .with_context(|| "RemotePath conversion")?;
     187              : 
     188            0 :     let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
     189            0 :         .with_context(|| "RemotePath conversion")?;
     190              : 
     191            0 :     let (data, len) = upload_stream("remote blob data1".as_bytes().into());
     192            0 :     ctx.client.upload(data, len, &path1, None, &cancel).await?;
     193              : 
     194            0 :     let (data, len) = upload_stream("remote blob data2".as_bytes().into());
     195            0 :     ctx.client.upload(data, len, &path2, None, &cancel).await?;
     196              : 
     197            0 :     let (data, len) = upload_stream("remote blob data3".as_bytes().into());
     198            0 :     ctx.client.upload(data, len, &path3, None, &cancel).await?;
     199              : 
     200            0 :     ctx.client.delete_objects(&[path1, path2], &cancel).await?;
     201              : 
     202            0 :     let prefixes = ctx.client.list_prefixes(None, &cancel).await?;
     203              : 
     204            0 :     assert_eq!(prefixes.len(), 1);
     205              : 
     206            0 :     ctx.client.delete_objects(&[path3], &cancel).await?;
     207              : 
     208            0 :     Ok(())
     209            4 : }
     210              : 
     211            4 : #[test_context(MaybeEnabledStorage)]
     212            4 : #[tokio::test]
     213            8 : async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     214            4 :     let MaybeEnabledStorage::Enabled(ctx) = ctx else {
     215            4 :         return Ok(());
     216              :     };
     217              : 
     218            0 :     let cancel = CancellationToken::new();
     219              : 
     220            0 :     let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))
     221            0 :         .with_context(|| "RemotePath conversion")?;
     222              : 
     223            0 :     let orig = bytes::Bytes::from_static("remote blob data here".as_bytes());
     224            0 : 
     225            0 :     let (data, len) = wrap_stream(orig.clone());
     226            0 : 
     227            0 :     ctx.client.upload(data, len, &path, None, &cancel).await?;
     228              : 
     229              :     // Normal download request
     230            0 :     let dl = ctx.client.download(&path, &cancel).await?;
     231            0 :     let buf = download_to_vec(dl).await?;
     232            0 :     assert_eq!(&buf, &orig);
     233              : 
     234              :     // Full range (end specified)
     235            0 :     let dl = ctx
     236            0 :         .client
     237            0 :         .download_byte_range(&path, 0, Some(len as u64), &cancel)
     238            0 :         .await?;
     239            0 :     let buf = download_to_vec(dl).await?;
     240            0 :     assert_eq!(&buf, &orig);
     241              : 
     242              :     // partial range (end specified)
     243            0 :     let dl = ctx
     244            0 :         .client
     245            0 :         .download_byte_range(&path, 4, Some(10), &cancel)
     246            0 :         .await?;
     247            0 :     let buf = download_to_vec(dl).await?;
     248            0 :     assert_eq!(&buf, &orig[4..10]);
     249              : 
     250              :     // partial range (end beyond real end)
     251            0 :     let dl = ctx
     252            0 :         .client
     253            0 :         .download_byte_range(&path, 8, Some(len as u64 * 100), &cancel)
     254            0 :         .await?;
     255            0 :     let buf = download_to_vec(dl).await?;
     256            0 :     assert_eq!(&buf, &orig[8..]);
     257              : 
     258              :     // Partial range (end unspecified)
     259            0 :     let dl = ctx
     260            0 :         .client
     261            0 :         .download_byte_range(&path, 4, None, &cancel)
     262            0 :         .await?;
     263            0 :     let buf = download_to_vec(dl).await?;
     264            0 :     assert_eq!(&buf, &orig[4..]);
     265              : 
     266              :     // Full range (end unspecified)
     267            0 :     let dl = ctx
     268            0 :         .client
     269            0 :         .download_byte_range(&path, 0, None, &cancel)
     270            0 :         .await?;
     271            0 :     let buf = download_to_vec(dl).await?;
     272            0 :     assert_eq!(&buf, &orig);
     273              : 
     274            0 :     debug!("Cleanup: deleting file at path {path:?}");
     275            0 :     ctx.client
     276            0 :         .delete(&path, &cancel)
     277            0 :         .await
     278            0 :         .with_context(|| format!("{path:?} removal"))?;
     279              : 
     280            0 :     Ok(())
     281            4 : }
     282              : 
     283            4 : #[test_context(MaybeEnabledStorage)]
     284            4 : #[tokio::test]
     285            8 : async fn copy_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     286            4 :     let MaybeEnabledStorage::Enabled(ctx) = ctx else {
     287            4 :         return Ok(());
     288              :     };
     289              : 
     290            0 :     let cancel = CancellationToken::new();
     291              : 
     292            0 :     let path = RemotePath::new(Utf8Path::new(
     293            0 :         format!("{}/file_to_copy", ctx.base_prefix).as_str(),
     294            0 :     ))
     295            0 :     .with_context(|| "RemotePath conversion")?;
     296            0 :     let path_dest = RemotePath::new(Utf8Path::new(
     297            0 :         format!("{}/file_dest", ctx.base_prefix).as_str(),
     298            0 :     ))
     299            0 :     .with_context(|| "RemotePath conversion")?;
     300              : 
     301            0 :     let orig = bytes::Bytes::from_static("remote blob data content".as_bytes());
     302            0 : 
     303            0 :     let (data, len) = wrap_stream(orig.clone());
     304            0 : 
     305            0 :     ctx.client.upload(data, len, &path, None, &cancel).await?;
     306              : 
     307              :     // Normal download request
     308            0 :     ctx.client.copy_object(&path, &path_dest, &cancel).await?;
     309              : 
     310            0 :     let dl = ctx.client.download(&path_dest, &cancel).await?;
     311            0 :     let buf = download_to_vec(dl).await?;
     312            0 :     assert_eq!(&buf, &orig);
     313              : 
     314            0 :     debug!("Cleanup: deleting file at path {path:?}");
     315            0 :     ctx.client
     316            0 :         .delete_objects(&[path.clone(), path_dest.clone()], &cancel)
     317            0 :         .await
     318            0 :         .with_context(|| format!("{path:?} removal"))?;
     319              : 
     320            0 :     Ok(())
     321            4 : }
        

Generated by: LCOV version 2.1-beta