LCOV - code coverage report
Current view: top level - libs/remote_storage/tests/common - tests.rs (source / functions) Coverage Total Hit
Test: 42f947419473a288706e86ecdf7c2863d760d5d7.info Lines: 95.5 % 246 235
Test Date: 2024-08-02 21:34:27 Functions: 65.4 % 52 34

            Line data    Source code
       1              : use anyhow::Context;
       2              : use camino::Utf8Path;
       3              : use futures::StreamExt;
       4              : use remote_storage::ListingMode;
       5              : use remote_storage::RemotePath;
       6              : use std::sync::Arc;
       7              : use std::{collections::HashSet, num::NonZeroU32};
       8              : use test_context::test_context;
       9              : use tokio_util::sync::CancellationToken;
      10              : use tracing::debug;
      11              : 
      12              : use crate::common::{download_to_vec, upload_stream, wrap_stream};
      13              : 
      14              : use super::{
      15              :     MaybeEnabledStorage, MaybeEnabledStorageWithSimpleTestBlobs, MaybeEnabledStorageWithTestBlobs,
      16              : };
      17              : 
      18              : /// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries.
      19              : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified.
      20              : /// See the client creation in [`create_s3_client`] for details on the required env vars.
      21              : /// If real S3 tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the
      22              : /// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details.
      23              : ///
      24              : /// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_remote_data`]
      25              : /// where
      26              : /// * `random_prefix_part` is set for the entire S3 client during the S3 client creation in [`create_s3_client`], to avoid multiple test runs interference
      27              : /// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket
      28              : ///
      29              : /// Then, verifies that the client does return correct prefixes when queried:
      30              : /// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only
      31              : /// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}`
      32              : ///
      33              : /// In the `MaybeEnabledStorageWithTestBlobs::setup`, we set the `max_keys_in_list_response` param to limit the keys in a single response.
      34              : /// This way, we are able to test the pagination, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to S3,
      35              : /// as the current default AWS S3 pagination limit is 1000.
      36              : /// (see <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>).
      37              : ///
      38              : /// Lastly, the test attempts to clean up and remove all uploaded S3 files.
      39              : /// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished.
      40            8 : #[test_context(MaybeEnabledStorageWithTestBlobs)]
      41              : #[tokio::test]
      42            8 : async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> anyhow::Result<()> {
      43            8 :     let ctx = match ctx {
      44            3 :         MaybeEnabledStorageWithTestBlobs::Enabled(ctx) => ctx,
      45            5 :         MaybeEnabledStorageWithTestBlobs::Disabled => return Ok(()),
      46            0 :         MaybeEnabledStorageWithTestBlobs::UploadsFailed(e, _) => {
      47            0 :             anyhow::bail!("S3 init failed: {e:?}")
      48              :         }
      49              :     };
      50              : 
      51            3 :     let cancel = CancellationToken::new();
      52            3 : 
      53            3 :     let test_client = Arc::clone(&ctx.enabled.client);
      54            3 :     let expected_remote_prefixes = ctx.remote_prefixes.clone();
      55              : 
      56            3 :     let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix))
      57            3 :         .context("common_prefix construction")?;
      58            3 :     let root_remote_prefixes = test_client
      59            3 :         .list(None, ListingMode::WithDelimiter, None, &cancel)
      60           10 :         .await?
      61              :         .prefixes
      62            3 :         .into_iter()
      63            3 :         .collect::<HashSet<_>>();
      64            3 :     assert_eq!(
      65            3 :         root_remote_prefixes, HashSet::from([base_prefix.clone()]),
      66            0 :         "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}"
      67              :     );
      68              : 
      69            3 :     let nested_remote_prefixes = test_client
      70            3 :         .list(
      71            3 :             Some(&base_prefix.add_trailing_slash()),
      72            3 :             ListingMode::WithDelimiter,
      73            3 :             None,
      74            3 :             &cancel,
      75            3 :         )
      76           34 :         .await?
      77              :         .prefixes
      78            3 :         .into_iter()
      79            3 :         .collect::<HashSet<_>>();
      80            3 :     let remote_only_prefixes = nested_remote_prefixes
      81            3 :         .difference(&expected_remote_prefixes)
      82            3 :         .collect::<HashSet<_>>();
      83            3 :     let missing_uploaded_prefixes = expected_remote_prefixes
      84            3 :         .difference(&nested_remote_prefixes)
      85            3 :         .collect::<HashSet<_>>();
      86            3 :     assert_eq!(
      87            3 :         remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
      88            0 :         "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
      89              :     );
      90              : 
      91              :     // list_streaming
      92              : 
      93            3 :     let prefix_with_slash = base_prefix.add_trailing_slash();
      94            3 :     let mut nested_remote_prefixes_st = test_client.list_streaming(
      95            3 :         Some(&prefix_with_slash),
      96            3 :         ListingMode::WithDelimiter,
      97            3 :         None,
      98            3 :         &cancel,
      99            3 :     );
     100            3 :     let mut nested_remote_prefixes_combined = HashSet::new();
     101            3 :     let mut segments = 0;
     102            3 :     let mut segment_max_size = 0;
     103           33 :     while let Some(st) = nested_remote_prefixes_st.next().await {
     104            9 :         let st = st?;
     105            9 :         segment_max_size = segment_max_size.max(st.prefixes.len());
     106            9 :         nested_remote_prefixes_combined.extend(st.prefixes.into_iter());
     107            9 :         segments += 1;
     108              :     }
     109            3 :     assert!(segments > 1, "less than 2 segments: {segments}");
     110            3 :     assert!(
     111            3 :         segment_max_size * 2 <= nested_remote_prefixes_combined.len(),
     112            0 :         "double of segment_max_size={segment_max_size} larger number of remote prefixes of {}",
     113            0 :         nested_remote_prefixes_combined.len()
     114              :     );
     115            3 :     let remote_only_prefixes = nested_remote_prefixes_combined
     116            3 :         .difference(&expected_remote_prefixes)
     117            3 :         .collect::<HashSet<_>>();
     118            3 :     let missing_uploaded_prefixes = expected_remote_prefixes
     119            3 :         .difference(&nested_remote_prefixes_combined)
     120            3 :         .collect::<HashSet<_>>();
     121            3 :     assert_eq!(
     122            3 :         remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
     123            0 :         "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
     124              :     );
     125              : 
     126            3 :     Ok(())
     127            8 : }
     128              : 
     129              : /// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries.
     130              : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set.
     131              : /// See `s3_pagination_should_work` for more information.
     132              : ///
     133              : /// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`]
     134              : /// Then performs the following queries:
     135              : ///    1. `list(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
     136              : ///    2. `list("folder1")`.  This  should return all files `random_prefix/folder1/blob_{i}.txt`
     137            8 : #[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
     138              : #[tokio::test]
     139              : async fn list_no_delimiter_works(
     140              :     ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs,
     141            8 : ) -> anyhow::Result<()> {
     142            8 :     let ctx = match ctx {
     143            3 :         MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
     144            5 :         MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
     145            0 :         MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
     146            0 :             anyhow::bail!("S3 init failed: {e:?}")
     147              :         }
     148              :     };
     149            3 :     let cancel = CancellationToken::new();
     150            3 :     let test_client = Arc::clone(&ctx.enabled.client);
     151            3 :     let base_prefix =
     152            3 :         RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?;
     153            3 :     let root_files = test_client
     154            3 :         .list(None, ListingMode::NoDelimiter, None, &cancel)
     155           39 :         .await
     156            3 :         .context("client list root files failure")?
     157              :         .keys
     158            3 :         .into_iter()
     159           63 :         .map(|o| o.key)
     160            3 :         .collect::<HashSet<_>>();
     161            3 :     assert_eq!(
     162            3 :         root_files,
     163            3 :         ctx.remote_blobs.clone(),
     164            0 :         "remote storage list on root mismatches with the uploads."
     165              :     );
     166              : 
     167              :     // Test that max_keys limit works. In total there are about 21 files (see
     168              :     // upload_simple_remote_data call in test_real_s3.rs).
     169            3 :     let limited_root_files = test_client
     170            3 :         .list(
     171            3 :             None,
     172            3 :             ListingMode::NoDelimiter,
     173            3 :             Some(NonZeroU32::new(2).unwrap()),
     174            3 :             &cancel,
     175            3 :         )
     176           11 :         .await
     177            3 :         .context("client list root files failure")?;
     178            3 :     assert_eq!(limited_root_files.keys.len(), 2);
     179              : 
     180            3 :     let nested_remote_files = test_client
     181            3 :         .list(Some(&base_prefix), ListingMode::NoDelimiter, None, &cancel)
     182           13 :         .await
     183            3 :         .context("client list nested files failure")?
     184              :         .keys
     185            3 :         .into_iter()
     186           21 :         .map(|o| o.key)
     187            3 :         .collect::<HashSet<_>>();
     188            3 :     let trim_remote_blobs: HashSet<_> = ctx
     189            3 :         .remote_blobs
     190            3 :         .iter()
     191           63 :         .map(|x| x.get_path())
     192           63 :         .filter(|x| x.starts_with("folder1"))
     193           21 :         .map(|x| RemotePath::new(x).expect("must be valid path"))
     194            3 :         .collect();
     195            3 :     assert_eq!(
     196              :         nested_remote_files, trim_remote_blobs,
     197            0 :         "remote storage list on subdirrectory mismatches with the uploads."
     198              :     );
     199            3 :     Ok(())
     200            8 : }
     201              : 
     202            8 : #[test_context(MaybeEnabledStorage)]
     203              : #[tokio::test]
     204            8 : async fn delete_non_exising_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     205            8 :     let ctx = match ctx {
     206            3 :         MaybeEnabledStorage::Enabled(ctx) => ctx,
     207            5 :         MaybeEnabledStorage::Disabled => return Ok(()),
     208              :     };
     209              : 
     210            3 :     let cancel = CancellationToken::new();
     211              : 
     212            3 :     let path = RemotePath::new(Utf8Path::new(
     213            3 :         format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(),
     214            3 :     ))
     215            3 :     .with_context(|| "RemotePath conversion")?;
     216              : 
     217            3 :     ctx.client
     218            3 :         .delete(&path, &cancel)
     219           27 :         .await
     220            3 :         .expect("should succeed");
     221            3 : 
     222            3 :     Ok(())
     223            8 : }
     224              : 
     225            8 : #[test_context(MaybeEnabledStorage)]
     226              : #[tokio::test]
     227            8 : async fn delete_objects_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     228            8 :     let ctx = match ctx {
     229            3 :         MaybeEnabledStorage::Enabled(ctx) => ctx,
     230            5 :         MaybeEnabledStorage::Disabled => return Ok(()),
     231              :     };
     232              : 
     233            3 :     let cancel = CancellationToken::new();
     234              : 
     235            3 :     let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
     236            3 :         .with_context(|| "RemotePath conversion")?;
     237              : 
     238            3 :     let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
     239            3 :         .with_context(|| "RemotePath conversion")?;
     240              : 
     241            3 :     let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
     242            3 :         .with_context(|| "RemotePath conversion")?;
     243              : 
     244            3 :     let (data, len) = upload_stream("remote blob data1".as_bytes().into());
     245           23 :     ctx.client.upload(data, len, &path1, None, &cancel).await?;
     246              : 
     247            3 :     let (data, len) = upload_stream("remote blob data2".as_bytes().into());
     248            7 :     ctx.client.upload(data, len, &path2, None, &cancel).await?;
     249              : 
     250            3 :     let (data, len) = upload_stream("remote blob data3".as_bytes().into());
     251            7 :     ctx.client.upload(data, len, &path3, None, &cancel).await?;
     252              : 
     253           15 :     ctx.client.delete_objects(&[path1, path2], &cancel).await?;
     254              : 
     255            3 :     let prefixes = ctx
     256            3 :         .client
     257            3 :         .list(None, ListingMode::WithDelimiter, None, &cancel)
     258           22 :         .await?
     259              :         .prefixes;
     260              : 
     261            3 :     assert_eq!(prefixes.len(), 1);
     262              : 
     263           10 :     ctx.client.delete_objects(&[path3], &cancel).await?;
     264              : 
     265            3 :     Ok(())
     266            8 : }
     267              : 
     268            8 : #[test_context(MaybeEnabledStorage)]
     269              : #[tokio::test]
     270            8 : async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     271            8 :     let MaybeEnabledStorage::Enabled(ctx) = ctx else {
     272            5 :         return Ok(());
     273              :     };
     274              : 
     275            3 :     let cancel = CancellationToken::new();
     276              : 
     277            3 :     let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))
     278            3 :         .with_context(|| "RemotePath conversion")?;
     279              : 
     280            3 :     let orig = bytes::Bytes::from_static("remote blob data here".as_bytes());
     281            3 : 
     282            3 :     let (data, len) = wrap_stream(orig.clone());
     283            3 : 
     284           22 :     ctx.client.upload(data, len, &path, None, &cancel).await?;
     285              : 
     286              :     // Normal download request
     287            7 :     let dl = ctx.client.download(&path, &cancel).await?;
     288            3 :     let buf = download_to_vec(dl).await?;
     289            3 :     assert_eq!(&buf, &orig);
     290              : 
     291              :     // Full range (end specified)
     292            3 :     let dl = ctx
     293            3 :         .client
     294            3 :         .download_byte_range(&path, 0, Some(len as u64), &cancel)
     295            7 :         .await?;
     296            3 :     let buf = download_to_vec(dl).await?;
     297            3 :     assert_eq!(&buf, &orig);
     298              : 
     299              :     // partial range (end specified)
     300            3 :     let dl = ctx
     301            3 :         .client
     302            3 :         .download_byte_range(&path, 4, Some(10), &cancel)
     303            7 :         .await?;
     304            3 :     let buf = download_to_vec(dl).await?;
     305            3 :     assert_eq!(&buf, &orig[4..10]);
     306              : 
     307              :     // partial range (end beyond real end)
     308            3 :     let dl = ctx
     309            3 :         .client
     310            3 :         .download_byte_range(&path, 8, Some(len as u64 * 100), &cancel)
     311            7 :         .await?;
     312            3 :     let buf = download_to_vec(dl).await?;
     313            3 :     assert_eq!(&buf, &orig[8..]);
     314              : 
     315              :     // Partial range (end unspecified)
     316            3 :     let dl = ctx
     317            3 :         .client
     318            3 :         .download_byte_range(&path, 4, None, &cancel)
     319            7 :         .await?;
     320            3 :     let buf = download_to_vec(dl).await?;
     321            3 :     assert_eq!(&buf, &orig[4..]);
     322              : 
     323              :     // Full range (end unspecified)
     324            3 :     let dl = ctx
     325            3 :         .client
     326            3 :         .download_byte_range(&path, 0, None, &cancel)
     327            7 :         .await?;
     328            3 :     let buf = download_to_vec(dl).await?;
     329            3 :     assert_eq!(&buf, &orig);
     330              : 
     331            3 :     debug!("Cleanup: deleting file at path {path:?}");
     332            3 :     ctx.client
     333            3 :         .delete(&path, &cancel)
     334           10 :         .await
     335            3 :         .with_context(|| format!("{path:?} removal"))?;
     336              : 
     337            3 :     Ok(())
     338            8 : }
     339              : 
     340            8 : #[test_context(MaybeEnabledStorage)]
     341              : #[tokio::test]
     342            8 : async fn copy_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
     343            8 :     let MaybeEnabledStorage::Enabled(ctx) = ctx else {
     344            5 :         return Ok(());
     345              :     };
     346              : 
     347            3 :     let cancel = CancellationToken::new();
     348              : 
     349            3 :     let path = RemotePath::new(Utf8Path::new(
     350            3 :         format!("{}/file_to_copy", ctx.base_prefix).as_str(),
     351            3 :     ))
     352            3 :     .with_context(|| "RemotePath conversion")?;
     353            3 :     let path_dest = RemotePath::new(Utf8Path::new(
     354            3 :         format!("{}/file_dest", ctx.base_prefix).as_str(),
     355            3 :     ))
     356            3 :     .with_context(|| "RemotePath conversion")?;
     357              : 
     358            3 :     let orig = bytes::Bytes::from_static("remote blob data content".as_bytes());
     359            3 : 
     360            3 :     let (data, len) = wrap_stream(orig.clone());
     361            3 : 
     362           21 :     ctx.client.upload(data, len, &path, None, &cancel).await?;
     363              : 
     364              :     // Normal download request
     365            8 :     ctx.client.copy_object(&path, &path_dest, &cancel).await?;
     366              : 
     367            7 :     let dl = ctx.client.download(&path_dest, &cancel).await?;
     368            3 :     let buf = download_to_vec(dl).await?;
     369            3 :     assert_eq!(&buf, &orig);
     370              : 
     371            3 :     debug!("Cleanup: deleting file at path {path:?}");
     372            3 :     ctx.client
     373            3 :         .delete_objects(&[path.clone(), path_dest.clone()], &cancel)
     374           16 :         .await
     375            3 :         .with_context(|| format!("{path:?} removal"))?;
     376              : 
     377            3 :     Ok(())
     378            8 : }
        

Generated by: LCOV version 2.1-beta