LCOV - code coverage report
Current view: top level - libs/remote_storage/tests - test_real_s3.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 14.4 % 376 54
Test Date: 2024-02-29 11:57:12 Functions: 34.1 % 82 28

            Line data    Source code
       1              : use std::env;
       2              : use std::fmt::{Debug, Display};
       3              : use std::future::Future;
       4              : use std::num::NonZeroUsize;
       5              : use std::ops::ControlFlow;
       6              : use std::sync::Arc;
       7              : use std::time::{Duration, UNIX_EPOCH};
       8              : use std::{collections::HashSet, time::SystemTime};
       9              : 
      10              : use crate::common::{download_to_vec, upload_stream};
      11              : use anyhow::Context;
      12              : use camino::Utf8Path;
      13              : use futures_util::StreamExt;
      14              : use remote_storage::{
      15              :     DownloadError, GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind,
      16              :     S3Config,
      17              : };
      18              : use test_context::test_context;
      19              : use test_context::AsyncTestContext;
      20              : use tokio_util::sync::CancellationToken;
      21              : use tracing::info;
      22              : 
      23              : mod common;
      24              : 
      25              : #[path = "common/tests.rs"]
      26              : mod tests_s3;
      27              : 
      28              : use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
      29              : use utils::backoff;
      30              : 
      31              : const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
      32              : const BASE_PREFIX: &str = "test";
      33              : 
      34            2 : #[test_context(MaybeEnabledStorage)]
      35            2 : #[tokio::test]
      36            4 : async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
      37            2 :     let ctx = match ctx {
      38            0 :         MaybeEnabledStorage::Enabled(ctx) => ctx,
      39            2 :         MaybeEnabledStorage::Disabled => return Ok(()),
      40              :     };
      41              :     // Our test depends on discrepancies in the clock between S3 and the environment the tests
      42              :     // run in. Therefore, wait a little bit before and after. The alternative would be
      43              :     // to take the time from S3 response headers.
      44              :     const WAIT_TIME: Duration = Duration::from_millis(3_000);
      45              : 
      46            0 :     async fn retry<T, O, F, E>(op: O) -> Result<T, E>
      47            0 :     where
      48            0 :         E: Display + Debug + 'static,
      49            0 :         O: FnMut() -> F,
      50            0 :         F: Future<Output = Result<T, E>>,
      51            0 :     {
      52            0 :         let warn_threshold = 3;
      53            0 :         let max_retries = 10;
      54            0 :         backoff::retry(
      55            0 :             op,
      56            0 :             |_e| false,
      57            0 :             warn_threshold,
      58            0 :             max_retries,
      59            0 :             "test retry",
      60            0 :             &CancellationToken::new(),
      61            0 :         )
      62            0 :         .await
      63            0 :         .expect("never cancelled")
      64            0 :     }
      65              : 
      66            0 :     async fn time_point() -> SystemTime {
      67            0 :         tokio::time::sleep(WAIT_TIME).await;
      68            0 :         let ret = SystemTime::now();
      69            0 :         tokio::time::sleep(WAIT_TIME).await;
      70            0 :         ret
      71            0 :     }
      72              : 
      73            0 :     async fn list_files(
      74            0 :         client: &Arc<GenericRemoteStorage>,
      75            0 :         cancel: &CancellationToken,
      76            0 :     ) -> anyhow::Result<HashSet<RemotePath>> {
      77            0 :         Ok(retry(|| client.list_files(None, None, cancel))
      78            0 :             .await
      79            0 :             .context("list root files failure")?
      80            0 :             .into_iter()
      81            0 :             .collect::<HashSet<_>>())
      82            0 :     }
      83              : 
      84            0 :     let cancel = CancellationToken::new();
      85              : 
      86            0 :     let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
      87            0 :         .with_context(|| "RemotePath conversion")?;
      88              : 
      89            0 :     let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
      90            0 :         .with_context(|| "RemotePath conversion")?;
      91              : 
      92            0 :     let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
      93            0 :         .with_context(|| "RemotePath conversion")?;
      94              : 
      95            0 :     retry(|| {
      96            0 :         let (data, len) = upload_stream("remote blob data1".as_bytes().into());
      97            0 :         ctx.client.upload(data, len, &path1, None, &cancel)
      98            0 :     })
      99            0 :     .await?;
     100              : 
     101            0 :     let t0_files = list_files(&ctx.client, &cancel).await?;
     102            0 :     let t0 = time_point().await;
     103            0 :     println!("at t0: {t0_files:?}");
     104            0 : 
     105            0 :     let old_data = "remote blob data2";
     106            0 : 
     107            0 :     retry(|| {
     108            0 :         let (data, len) = upload_stream(old_data.as_bytes().into());
     109            0 :         ctx.client.upload(data, len, &path2, None, &cancel)
     110            0 :     })
     111            0 :     .await?;
     112              : 
     113            0 :     let t1_files = list_files(&ctx.client, &cancel).await?;
     114            0 :     let t1 = time_point().await;
     115            0 :     println!("at t1: {t1_files:?}");
     116              : 
     117              :     // A little check to ensure that our clock is not too far off from the S3 clock
     118              :     {
     119            0 :         let dl = retry(|| ctx.client.download(&path2, &cancel)).await?;
     120            0 :         let last_modified = dl.last_modified.unwrap();
     121            0 :         let half_wt = WAIT_TIME.mul_f32(0.5);
     122            0 :         let t0_hwt = t0 + half_wt;
     123            0 :         let t1_hwt = t1 - half_wt;
     124            0 :         if !(t0_hwt..=t1_hwt).contains(&last_modified) {
     125            0 :             panic!("last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \
     126            0 :                 This likely means a large lock discrepancy between S3 and the local clock.");
     127            0 :         }
     128            0 :     }
     129            0 : 
     130            0 :     retry(|| {
     131            0 :         let (data, len) = upload_stream("remote blob data3".as_bytes().into());
     132            0 :         ctx.client.upload(data, len, &path3, None, &cancel)
     133            0 :     })
     134            0 :     .await?;
     135              : 
     136            0 :     let new_data = "new remote blob data2";
     137            0 : 
     138            0 :     retry(|| {
     139            0 :         let (data, len) = upload_stream(new_data.as_bytes().into());
     140            0 :         ctx.client.upload(data, len, &path2, None, &cancel)
     141            0 :     })
     142            0 :     .await?;
     143              : 
     144            0 :     retry(|| ctx.client.delete(&path1, &cancel)).await?;
     145            0 :     let t2_files = list_files(&ctx.client, &cancel).await?;
     146            0 :     let t2 = time_point().await;
     147            0 :     println!("at t2: {t2_files:?}");
     148              : 
     149              :     // No changes after recovery to t2 (no-op)
     150            0 :     let t_final = time_point().await;
     151            0 :     ctx.client
     152            0 :         .time_travel_recover(None, t2, t_final, &cancel)
     153            0 :         .await?;
     154            0 :     let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
     155            0 :     println!("after recovery to t2: {t2_files_recovered:?}");
     156            0 :     assert_eq!(t2_files, t2_files_recovered);
     157            0 :     let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
     158            0 :     assert_eq!(path2_recovered_t2, new_data.as_bytes());
     159              : 
     160              :     // after recovery to t1: path1 is back, path2 has the old content
     161            0 :     let t_final = time_point().await;
     162            0 :     ctx.client
     163            0 :         .time_travel_recover(None, t1, t_final, &cancel)
     164            0 :         .await?;
     165            0 :     let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
     166            0 :     println!("after recovery to t1: {t1_files_recovered:?}");
     167            0 :     assert_eq!(t1_files, t1_files_recovered);
     168            0 :     let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
     169            0 :     assert_eq!(path2_recovered_t1, old_data.as_bytes());
     170              : 
     171              :     // after recovery to t0: everything is gone except for path1
     172            0 :     let t_final = time_point().await;
     173            0 :     ctx.client
     174            0 :         .time_travel_recover(None, t0, t_final, &cancel)
     175            0 :         .await?;
     176            0 :     let t0_files_recovered = list_files(&ctx.client, &cancel).await?;
     177            0 :     println!("after recovery to t0: {t0_files_recovered:?}");
     178            0 :     assert_eq!(t0_files, t0_files_recovered);
     179              : 
     180              :     // cleanup
     181              : 
     182            0 :     let paths = &[path1, path2, path3];
     183            0 :     retry(|| ctx.client.delete_objects(paths, &cancel)).await?;
     184              : 
     185            0 :     Ok(())
     186            2 : }
     187              : 
     188              : struct EnabledS3 {
     189              :     client: Arc<GenericRemoteStorage>,
     190              :     base_prefix: &'static str,
     191              : }
     192              : 
     193              : impl EnabledS3 {
     194            0 :     async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
     195            0 :         let client = create_s3_client(max_keys_in_list_response)
     196            0 :             .context("S3 client creation")
     197            0 :             .expect("S3 client creation failed");
     198            0 : 
     199            0 :         EnabledS3 {
     200            0 :             client,
     201            0 :             base_prefix: BASE_PREFIX,
     202            0 :         }
     203            0 :     }
     204              : 
     205            0 :     fn configure_request_timeout(&mut self, timeout: Duration) {
     206            0 :         match Arc::get_mut(&mut self.client).expect("outer Arc::get_mut") {
     207            0 :             GenericRemoteStorage::AwsS3(s3) => {
     208            0 :                 let s3 = Arc::get_mut(s3).expect("inner Arc::get_mut");
     209            0 :                 s3.timeout = timeout;
     210            0 :             }
     211            0 :             _ => unreachable!(),
     212              :         }
     213            0 :     }
     214              : }
     215              : 
     216              : enum MaybeEnabledStorage {
     217              :     Enabled(EnabledS3),
     218              :     Disabled,
     219              : }
     220              : 
     221              : #[async_trait::async_trait]
     222              : impl AsyncTestContext for MaybeEnabledStorage {
     223           14 :     async fn setup() -> Self {
     224           14 :         ensure_logging_ready();
     225           14 : 
     226           14 :         if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
     227           14 :             info!(
     228           14 :                 "`{}` env variable is not set, skipping the test",
     229           14 :                 ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
     230           14 :             );
     231           14 :             return Self::Disabled;
     232            0 :         }
     233            0 : 
     234            0 :         Self::Enabled(EnabledS3::setup(None).await)
     235           42 :     }
     236              : }
     237              : 
     238              : enum MaybeEnabledStorageWithTestBlobs {
     239              :     Enabled(S3WithTestBlobs),
     240              :     Disabled,
     241              :     UploadsFailed(anyhow::Error, S3WithTestBlobs),
     242              : }
     243              : 
     244              : struct S3WithTestBlobs {
     245              :     enabled: EnabledS3,
     246              :     remote_prefixes: HashSet<RemotePath>,
     247              :     remote_blobs: HashSet<RemotePath>,
     248              : }
     249              : 
     250              : #[async_trait::async_trait]
     251              : impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
     252            2 :     async fn setup() -> Self {
     253            2 :         ensure_logging_ready();
     254            2 :         if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
     255            2 :             info!(
     256            2 :                 "`{}` env variable is not set, skipping the test",
     257            2 :                 ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
     258            2 :             );
     259            2 :             return Self::Disabled;
     260            0 :         }
     261            0 : 
     262            0 :         let max_keys_in_list_response = 10;
     263            0 :         let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
     264              : 
     265            0 :         let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
     266              : 
     267            0 :         match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
     268            0 :             ControlFlow::Continue(uploads) => {
     269            0 :                 info!("Remote objects created successfully");
     270              : 
     271            0 :                 Self::Enabled(S3WithTestBlobs {
     272            0 :                     enabled,
     273            0 :                     remote_prefixes: uploads.prefixes,
     274            0 :                     remote_blobs: uploads.blobs,
     275            0 :                 })
     276              :             }
     277            0 :             ControlFlow::Break(uploads) => Self::UploadsFailed(
     278            0 :                 anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
     279            0 :                 S3WithTestBlobs {
     280            0 :                     enabled,
     281            0 :                     remote_prefixes: uploads.prefixes,
     282            0 :                     remote_blobs: uploads.blobs,
     283            0 :                 },
     284            0 :             ),
     285              :         }
     286            6 :     }
     287              : 
     288            2 :     async fn teardown(self) {
     289            2 :         match self {
     290            2 :             Self::Disabled => {}
     291            0 :             Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
     292            0 :                 cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
     293              :             }
     294              :         }
     295            4 :     }
     296              : }
     297              : 
     298              : // NOTE: the setups for the list_prefixes test and the list_files test are very similar
     299              : // However, they are not idential. The list_prefixes function is concerned with listing prefixes,
     300              : // whereas the list_files function is concerned with listing files.
     301              : // See `RemoteStorage::list_files` documentation for more details
     302              : enum MaybeEnabledStorageWithSimpleTestBlobs {
     303              :     Enabled(S3WithSimpleTestBlobs),
     304              :     Disabled,
     305              :     UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
     306              : }
     307              : struct S3WithSimpleTestBlobs {
     308              :     enabled: EnabledS3,
     309              :     remote_blobs: HashSet<RemotePath>,
     310              : }
     311              : 
     312              : #[async_trait::async_trait]
     313              : impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
     314            2 :     async fn setup() -> Self {
     315            2 :         ensure_logging_ready();
     316            2 :         if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
     317            2 :             info!(
     318            2 :                 "`{}` env variable is not set, skipping the test",
     319            2 :                 ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
     320            2 :             );
     321            2 :             return Self::Disabled;
     322            0 :         }
     323            0 : 
     324            0 :         let max_keys_in_list_response = 10;
     325            0 :         let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
     326              : 
     327            0 :         let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
     328              : 
     329            0 :         match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
     330            0 :             ControlFlow::Continue(uploads) => {
     331            0 :                 info!("Remote objects created successfully");
     332              : 
     333            0 :                 Self::Enabled(S3WithSimpleTestBlobs {
     334            0 :                     enabled,
     335            0 :                     remote_blobs: uploads,
     336            0 :                 })
     337              :             }
     338            0 :             ControlFlow::Break(uploads) => Self::UploadsFailed(
     339            0 :                 anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
     340            0 :                 S3WithSimpleTestBlobs {
     341            0 :                     enabled,
     342            0 :                     remote_blobs: uploads,
     343            0 :                 },
     344            0 :             ),
     345              :         }
     346            6 :     }
     347              : 
     348            2 :     async fn teardown(self) {
     349            2 :         match self {
     350            2 :             Self::Disabled => {}
     351            0 :             Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
     352            0 :                 cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
     353              :             }
     354              :         }
     355            4 :     }
     356              : }
     357              : 
     358            0 : fn create_s3_client(
     359            0 :     max_keys_per_list_response: Option<i32>,
     360            0 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
     361              :     use rand::Rng;
     362              : 
     363            0 :     let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
     364            0 :         .context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
     365            0 :     let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
     366            0 :         .context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?;
     367              : 
     368              :     // due to how time works, we've had test runners use the same nanos as bucket prefixes.
     369              :     // millis is just a debugging aid for easier finding the prefix later.
     370            0 :     let millis = std::time::SystemTime::now()
     371            0 :         .duration_since(UNIX_EPOCH)
     372            0 :         .context("random s3 test prefix part calculation")?
     373            0 :         .as_millis();
     374            0 : 
     375            0 :     // because nanos can be the same for two threads so can millis, add randomness
     376            0 :     let random = rand::thread_rng().gen::<u32>();
     377            0 : 
     378            0 :     let remote_storage_config = RemoteStorageConfig {
     379            0 :         storage: RemoteStorageKind::AwsS3(S3Config {
     380            0 :             bucket_name: remote_storage_s3_bucket,
     381            0 :             bucket_region: remote_storage_s3_region,
     382            0 :             prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
     383            0 :             endpoint: None,
     384            0 :             concurrency_limit: NonZeroUsize::new(100).unwrap(),
     385            0 :             max_keys_per_list_response,
     386            0 :         }),
     387            0 :         timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
     388            0 :     };
     389            0 :     Ok(Arc::new(
     390            0 :         GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
     391              :     ))
     392            0 : }
     393              : 
     394            2 : #[test_context(MaybeEnabledStorage)]
     395            2 : #[tokio::test]
     396            4 : async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
     397            2 :     let MaybeEnabledStorage::Enabled(ctx) = ctx else {
     398            2 :         return;
     399              :     };
     400              : 
     401            0 :     let cancel = CancellationToken::new();
     402            0 : 
     403            0 :     let path = RemotePath::new(Utf8Path::new(
     404            0 :         format!("{}/file_to_copy", ctx.base_prefix).as_str(),
     405            0 :     ))
     406            0 :     .unwrap();
     407              : 
     408            0 :     let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
     409              : 
     410            0 :     let timeout = std::time::Duration::from_secs(5);
     411            0 : 
     412            0 :     ctx.configure_request_timeout(timeout);
     413            0 : 
     414            0 :     let started_at = std::time::Instant::now();
     415            0 :     let mut stream = ctx
     416            0 :         .client
     417            0 :         .download(&path, &cancel)
     418            0 :         .await
     419            0 :         .expect("download succeeds")
     420            0 :         .download_stream;
     421            0 : 
     422            0 :     if started_at.elapsed().mul_f32(0.9) >= timeout {
     423            0 :         tracing::warn!(
     424            0 :             elapsed_ms = started_at.elapsed().as_millis(),
     425            0 :             "timeout might be too low, consumed most of it during headers"
     426            0 :         );
     427            0 :     }
     428              : 
     429            0 :     let first = stream
     430            0 :         .next()
     431            0 :         .await
     432            0 :         .expect("should have the first blob")
     433            0 :         .expect("should have succeeded");
     434              : 
     435            0 :     tracing::info!(len = first.len(), "downloaded first chunk");
     436              : 
     437            0 :     assert!(
     438            0 :         first.len() < len,
     439            0 :         "uploaded file is too small, we downloaded all on first chunk"
     440              :     );
     441              : 
     442            0 :     tokio::time::sleep(timeout).await;
     443              : 
     444              :     {
     445            0 :         let started_at = std::time::Instant::now();
     446            0 :         let next = stream
     447            0 :             .next()
     448            0 :             .await
     449            0 :             .expect("stream should not have ended yet");
     450              : 
     451            0 :         tracing::info!(
     452            0 :             next.is_err = next.is_err(),
     453            0 :             elapsed_ms = started_at.elapsed().as_millis(),
     454            0 :             "received item after timeout"
     455            0 :         );
     456              : 
     457            0 :         let e = next.expect_err("expected an error, but got a chunk?");
     458            0 : 
     459            0 :         let inner = e.get_ref().expect("std::io::Error::inner should be set");
     460            0 :         assert!(
     461            0 :             inner
     462            0 :                 .downcast_ref::<DownloadError>()
     463            0 :                 .is_some_and(|e| matches!(e, DownloadError::Timeout)),
     464            0 :             "{inner:?}"
     465              :         );
     466              :     }
     467              : 
     468            0 :     ctx.configure_request_timeout(RemoteStorageConfig::DEFAULT_TIMEOUT);
     469            0 : 
     470            0 :     ctx.client.delete_objects(&[path], &cancel).await.unwrap()
     471            2 : }
     472              : 
     473            2 : #[test_context(MaybeEnabledStorage)]
     474            2 : #[tokio::test]
     475            4 : async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
     476            2 :     let MaybeEnabledStorage::Enabled(ctx) = ctx else {
     477            2 :         return;
     478              :     };
     479              : 
     480            0 :     let cancel = CancellationToken::new();
     481            0 : 
     482            0 :     let path = RemotePath::new(Utf8Path::new(
     483            0 :         format!("{}/file_to_copy", ctx.base_prefix).as_str(),
     484            0 :     ))
     485            0 :     .unwrap();
     486              : 
     487            0 :     let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
     488              : 
     489              :     {
     490            0 :         let mut stream = ctx
     491            0 :             .client
     492            0 :             .download(&path, &cancel)
     493            0 :             .await
     494            0 :             .expect("download succeeds")
     495              :             .download_stream;
     496              : 
     497            0 :         let first = stream
     498            0 :             .next()
     499            0 :             .await
     500            0 :             .expect("should have the first blob")
     501            0 :             .expect("should have succeeded");
     502              : 
     503            0 :         tracing::info!(len = first.len(), "downloaded first chunk");
     504              : 
     505            0 :         assert!(
     506            0 :             first.len() < len,
     507            0 :             "uploaded file is too small, we downloaded all on first chunk"
     508              :         );
     509              : 
     510            0 :         cancel.cancel();
     511              : 
     512            0 :         let next = stream.next().await.expect("stream should have more");
     513            0 : 
     514            0 :         let e = next.expect_err("expected an error, but got a chunk?");
     515            0 : 
     516            0 :         let inner = e.get_ref().expect("std::io::Error::inner should be set");
     517            0 :         assert!(
     518            0 :             inner
     519            0 :                 .downcast_ref::<DownloadError>()
     520            0 :                 .is_some_and(|e| matches!(e, DownloadError::Cancelled)),
     521            0 :             "{inner:?}"
     522              :         );
     523              :     }
     524              : 
     525            0 :     let cancel = CancellationToken::new();
     526            0 : 
     527            0 :     ctx.client.delete_objects(&[path], &cancel).await.unwrap();
     528            2 : }
     529              : 
     530              : /// Upload a long enough file so that we cannot download it in single chunk
     531              : ///
     532              : /// For s3 the first chunk seems to be less than 10kB, so this has a bit of a safety margin
     533            0 : async fn upload_large_enough_file(
     534            0 :     client: &GenericRemoteStorage,
     535            0 :     path: &RemotePath,
     536            0 :     cancel: &CancellationToken,
     537            0 : ) -> usize {
     538            0 :     let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
     539            0 :     let body = bytes::Bytes::from(vec![0u8; 1024]);
     540            0 :     let contents = std::iter::once(header).chain(std::iter::repeat(body).take(128));
     541            0 : 
     542            0 :     let len = contents.clone().fold(0, |acc, next| acc + next.len());
     543            0 : 
     544            0 :     let contents = futures::stream::iter(contents.map(std::io::Result::Ok));
     545            0 : 
     546            0 :     client
     547            0 :         .upload(contents, len, path, None, cancel)
     548            0 :         .await
     549            0 :         .expect("upload succeeds");
     550            0 : 
     551            0 :     len
     552            0 : }
        

Generated by: LCOV version 2.1-beta