LCOV - code coverage report
Current view: top level - libs/remote_storage/tests - test_real_s3.rs (source / functions) Coverage Total Hit
Test: bb522999b2ee0ee028df22bb188d3a84170ba700.info Lines: 91.0 % 377 343
Test Date: 2024-07-21 16:16:09 Functions: 84.6 % 65 55

            Line data    Source code
       1              : use std::env;
       2              : use std::fmt::{Debug, Display};
       3              : use std::future::Future;
       4              : use std::num::NonZeroUsize;
       5              : use std::ops::ControlFlow;
       6              : use std::sync::Arc;
       7              : use std::time::{Duration, UNIX_EPOCH};
       8              : use std::{collections::HashSet, time::SystemTime};
       9              : 
      10              : use crate::common::{download_to_vec, upload_stream};
      11              : use anyhow::Context;
      12              : use camino::Utf8Path;
      13              : use futures_util::StreamExt;
      14              : use remote_storage::{
      15              :     DownloadError, GenericRemoteStorage, ListingMode, RemotePath, RemoteStorageConfig,
      16              :     RemoteStorageKind, S3Config,
      17              : };
      18              : use test_context::test_context;
      19              : use test_context::AsyncTestContext;
      20              : use tokio::io::AsyncBufReadExt;
      21              : use tokio_util::sync::CancellationToken;
      22              : use tracing::info;
      23              : 
      24              : mod common;
      25              : 
      26              : #[path = "common/tests.rs"]
      27              : mod tests_s3;
      28              : 
      29              : use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
      30              : use utils::backoff;
      31              : 
      32              : const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
      33              : const BASE_PREFIX: &str = "test";
      34              : 
      35            4 : #[test_context(MaybeEnabledStorage)]
      36              : #[tokio::test]
      37            4 : async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
      38            4 :     let ctx = match ctx {
      39            2 :         MaybeEnabledStorage::Enabled(ctx) => ctx,
      40            2 :         MaybeEnabledStorage::Disabled => return Ok(()),
      41              :     };
      42              :     // Our test depends on discrepancies in the clock between S3 and the environment the tests
      43              :     // run in. Therefore, wait a little bit before and after. The alternative would be
      44              :     // to take the time from S3 response headers.
      45              :     const WAIT_TIME: Duration = Duration::from_millis(3_000);
      46              : 
      47           26 :     async fn retry<T, O, F, E>(op: O) -> Result<T, E>
      48           26 :     where
      49           26 :         E: Display + Debug + 'static,
      50           26 :         O: FnMut() -> F,
      51           26 :         F: Future<Output = Result<T, E>>,
      52           26 :     {
      53           26 :         let warn_threshold = 3;
      54           26 :         let max_retries = 10;
      55           26 :         backoff::retry(
      56           26 :             op,
      57           26 :             |_e| false,
      58           26 :             warn_threshold,
      59           26 :             max_retries,
      60           26 :             "test retry",
      61           26 :             &CancellationToken::new(),
      62           26 :         )
      63          106 :         .await
      64           26 :         .expect("never cancelled")
      65           26 :     }
      66              : 
      67           12 :     async fn time_point() -> SystemTime {
      68           12 :         tokio::time::sleep(WAIT_TIME).await;
      69           12 :         let ret = SystemTime::now();
      70           12 :         tokio::time::sleep(WAIT_TIME).await;
      71           12 :         ret
      72           12 :     }
      73              : 
      74           12 :     async fn list_files(
      75           12 :         client: &Arc<GenericRemoteStorage>,
      76           12 :         cancel: &CancellationToken,
      77           12 :     ) -> anyhow::Result<HashSet<RemotePath>> {
      78           12 :         Ok(
      79           12 :             retry(|| client.list(None, ListingMode::NoDelimiter, None, cancel))
      80           58 :                 .await
      81           12 :                 .context("list root files failure")?
      82              :                 .keys
      83           12 :                 .into_iter()
      84           12 :                 .collect::<HashSet<_>>(),
      85              :         )
      86           12 :     }
      87              : 
      88            2 :     let cancel = CancellationToken::new();
      89              : 
      90            2 :     let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
      91            2 :         .with_context(|| "RemotePath conversion")?;
      92              : 
      93            2 :     let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
      94            2 :         .with_context(|| "RemotePath conversion")?;
      95              : 
      96            2 :     let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
      97            2 :         .with_context(|| "RemotePath conversion")?;
      98              : 
      99            2 :     retry(|| {
     100            2 :         let (data, len) = upload_stream("remote blob data1".as_bytes().into());
     101            2 :         ctx.client.upload(data, len, &path1, None, &cancel)
     102            2 :     })
     103           18 :     .await?;
     104              : 
     105            6 :     let t0_files = list_files(&ctx.client, &cancel).await?;
     106            4 :     let t0 = time_point().await;
     107            2 :     println!("at t0: {t0_files:?}");
     108            2 : 
     109            2 :     let old_data = "remote blob data2";
     110            2 : 
     111            3 :     retry(|| {
     112            3 :         let (data, len) = upload_stream(old_data.as_bytes().into());
     113            3 :         ctx.client.upload(data, len, &path2, None, &cancel)
     114            3 :     })
     115           16 :     .await?;
     116              : 
     117            4 :     let t1_files = list_files(&ctx.client, &cancel).await?;
     118            4 :     let t1 = time_point().await;
     119            2 :     println!("at t1: {t1_files:?}");
     120              : 
     121              :     // A little check to ensure that our clock is not too far off from the S3 clock
     122              :     {
     123            2 :         let dl = retry(|| ctx.client.download(&path2, &cancel)).await?;
     124            2 :         let last_modified = dl.last_modified;
     125            2 :         let half_wt = WAIT_TIME.mul_f32(0.5);
     126            2 :         let t0_hwt = t0 + half_wt;
     127            2 :         let t1_hwt = t1 - half_wt;
     128            2 :         if !(t0_hwt..=t1_hwt).contains(&last_modified) {
     129            0 :             panic!("last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \
     130            0 :                 This likely means a large lock discrepancy between S3 and the local clock.");
     131            2 :         }
     132            2 :     }
     133            2 : 
     134            2 :     retry(|| {
     135            2 :         let (data, len) = upload_stream("remote blob data3".as_bytes().into());
     136            2 :         ctx.client.upload(data, len, &path3, None, &cancel)
     137            2 :     })
     138            2 :     .await?;
     139              : 
     140            2 :     let new_data = "new remote blob data2";
     141            2 : 
     142            2 :     retry(|| {
     143            2 :         let (data, len) = upload_stream(new_data.as_bytes().into());
     144            2 :         ctx.client.upload(data, len, &path2, None, &cancel)
     145            2 :     })
     146            2 :     .await?;
     147              : 
     148            4 :     retry(|| ctx.client.delete(&path1, &cancel)).await?;
     149           17 :     let t2_files = list_files(&ctx.client, &cancel).await?;
     150            4 :     let t2 = time_point().await;
     151            2 :     println!("at t2: {t2_files:?}");
     152              : 
     153              :     // No changes after recovery to t2 (no-op)
     154            4 :     let t_final = time_point().await;
     155            2 :     ctx.client
     156            2 :         .time_travel_recover(None, t2, t_final, &cancel)
     157           14 :         .await?;
     158            4 :     let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
     159            2 :     println!("after recovery to t2: {t2_files_recovered:?}");
     160            2 :     assert_eq!(t2_files, t2_files_recovered);
     161            2 :     let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
     162            2 :     assert_eq!(path2_recovered_t2, new_data.as_bytes());
     163              : 
     164              :     // after recovery to t1: path1 is back, path2 has the old content
     165            4 :     let t_final = time_point().await;
     166            2 :     ctx.client
     167            2 :         .time_travel_recover(None, t1, t_final, &cancel)
     168           33 :         .await?;
     169           11 :     let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
     170            2 :     println!("after recovery to t1: {t1_files_recovered:?}");
     171            2 :     assert_eq!(t1_files, t1_files_recovered);
     172            2 :     let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
     173            2 :     assert_eq!(path2_recovered_t1, old_data.as_bytes());
     174              : 
     175              :     // after recovery to t0: everything is gone except for path1
     176            4 :     let t_final = time_point().await;
     177            2 :     ctx.client
     178            2 :         .time_travel_recover(None, t0, t_final, &cancel)
     179           22 :         .await?;
     180           16 :     let t0_files_recovered = list_files(&ctx.client, &cancel).await?;
     181            2 :     println!("after recovery to t0: {t0_files_recovered:?}");
     182            2 :     assert_eq!(t0_files, t0_files_recovered);
     183              : 
     184              :     // cleanup
     185              : 
     186            2 :     let paths = &[path1, path2, path3];
     187            4 :     retry(|| ctx.client.delete_objects(paths, &cancel)).await?;
     188              : 
     189            2 :     Ok(())
     190            4 : }
     191              : 
     192              : struct EnabledS3 {
     193              :     client: Arc<GenericRemoteStorage>,
     194              :     base_prefix: &'static str,
     195              : }
     196              : 
     197              : impl EnabledS3 {
     198           18 :     async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
     199           18 :         let client = create_s3_client(max_keys_in_list_response)
     200            0 :             .await
     201           18 :             .context("S3 client creation")
     202           18 :             .expect("S3 client creation failed");
     203           18 : 
     204           18 :         EnabledS3 {
     205           18 :             client,
     206           18 :             base_prefix: BASE_PREFIX,
     207           18 :         }
     208           18 :     }
     209              : 
     210            4 :     fn configure_request_timeout(&mut self, timeout: Duration) {
     211            4 :         match Arc::get_mut(&mut self.client).expect("outer Arc::get_mut") {
     212            4 :             GenericRemoteStorage::AwsS3(s3) => {
     213            4 :                 let s3 = Arc::get_mut(s3).expect("inner Arc::get_mut");
     214            4 :                 s3.timeout = timeout;
     215            4 :             }
     216            0 :             _ => unreachable!(),
     217              :         }
     218            4 :     }
     219              : }
     220              : 
     221              : enum MaybeEnabledStorage {
     222              :     Enabled(EnabledS3),
     223              :     Disabled,
     224              : }
     225              : 
     226              : impl AsyncTestContext for MaybeEnabledStorage {
     227           28 :     async fn setup() -> Self {
     228           28 :         ensure_logging_ready();
     229           28 : 
     230           28 :         if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
     231           14 :             info!(
     232            0 :                 "`{}` env variable is not set, skipping the test",
     233              :                 ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
     234              :             );
     235           14 :             return Self::Disabled;
     236           14 :         }
     237           14 : 
     238           14 :         Self::Enabled(EnabledS3::setup(None).await)
     239           28 :     }
     240              : }
     241              : 
     242              : enum MaybeEnabledStorageWithTestBlobs {
     243              :     Enabled(S3WithTestBlobs),
     244              :     Disabled,
     245              :     UploadsFailed(anyhow::Error, S3WithTestBlobs),
     246              : }
     247              : 
     248              : struct S3WithTestBlobs {
     249              :     enabled: EnabledS3,
     250              :     remote_prefixes: HashSet<RemotePath>,
     251              :     remote_blobs: HashSet<RemotePath>,
     252              : }
     253              : 
     254              : impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
     255            4 :     async fn setup() -> Self {
     256            4 :         ensure_logging_ready();
     257            4 :         if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
     258            2 :             info!(
     259            0 :                 "`{}` env variable is not set, skipping the test",
     260              :                 ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
     261              :             );
     262            2 :             return Self::Disabled;
     263            2 :         }
     264            2 : 
     265            2 :         let max_keys_in_list_response = 10;
     266            2 :         let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
     267              : 
     268            2 :         let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
     269              : 
     270           40 :         match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
     271            2 :             ControlFlow::Continue(uploads) => {
     272            2 :                 info!("Remote objects created successfully");
     273              : 
     274            2 :                 Self::Enabled(S3WithTestBlobs {
     275            2 :                     enabled,
     276            2 :                     remote_prefixes: uploads.prefixes,
     277            2 :                     remote_blobs: uploads.blobs,
     278            2 :                 })
     279              :             }
     280            0 :             ControlFlow::Break(uploads) => Self::UploadsFailed(
     281            0 :                 anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
     282            0 :                 S3WithTestBlobs {
     283            0 :                     enabled,
     284            0 :                     remote_prefixes: uploads.prefixes,
     285            0 :                     remote_blobs: uploads.blobs,
     286            0 :                 },
     287            0 :             ),
     288              :         }
     289            4 :     }
     290              : 
     291            4 :     async fn teardown(self) {
     292            4 :         match self {
     293            2 :             Self::Disabled => {}
     294            2 :             Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
     295           15 :                 cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
     296              :             }
     297              :         }
     298            4 :     }
     299              : }
     300              : 
     301              : enum MaybeEnabledStorageWithSimpleTestBlobs {
     302              :     Enabled(S3WithSimpleTestBlobs),
     303              :     Disabled,
     304              :     UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
     305              : }
     306              : struct S3WithSimpleTestBlobs {
     307              :     enabled: EnabledS3,
     308              :     remote_blobs: HashSet<RemotePath>,
     309              : }
     310              : 
     311              : impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
     312            4 :     async fn setup() -> Self {
     313            4 :         ensure_logging_ready();
     314            4 :         if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
     315            2 :             info!(
     316            0 :                 "`{}` env variable is not set, skipping the test",
     317              :                 ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
     318              :             );
     319            2 :             return Self::Disabled;
     320            2 :         }
     321            2 : 
     322            2 :         let max_keys_in_list_response = 10;
     323            2 :         let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
     324              : 
     325            2 :         let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
     326              : 
     327           39 :         match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
     328            2 :             ControlFlow::Continue(uploads) => {
     329            2 :                 info!("Remote objects created successfully");
     330              : 
     331            2 :                 Self::Enabled(S3WithSimpleTestBlobs {
     332            2 :                     enabled,
     333            2 :                     remote_blobs: uploads,
     334            2 :                 })
     335              :             }
     336            0 :             ControlFlow::Break(uploads) => Self::UploadsFailed(
     337            0 :                 anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
     338            0 :                 S3WithSimpleTestBlobs {
     339            0 :                     enabled,
     340            0 :                     remote_blobs: uploads,
     341            0 :                 },
     342            0 :             ),
     343              :         }
     344            4 :     }
     345              : 
     346            4 :     async fn teardown(self) {
     347            4 :         match self {
     348            2 :             Self::Disabled => {}
     349            2 :             Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
     350           21 :                 cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
     351              :             }
     352              :         }
     353            4 :     }
     354              : }
     355              : 
     356           18 : async fn create_s3_client(
     357           18 :     max_keys_per_list_response: Option<i32>,
     358           18 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
     359              :     use rand::Rng;
     360              : 
     361           18 :     let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
     362           18 :         .context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
     363           18 :     let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
     364           18 :         .context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?;
     365              : 
     366              :     // due to how time works, we've had test runners use the same nanos as bucket prefixes.
     367              :     // millis is just a debugging aid for easier finding the prefix later.
     368           18 :     let millis = std::time::SystemTime::now()
     369           18 :         .duration_since(UNIX_EPOCH)
     370           18 :         .context("random s3 test prefix part calculation")?
     371           18 :         .as_millis();
     372           18 : 
     373           18 :     // because nanos can be the same for two threads so can millis, add randomness
     374           18 :     let random = rand::thread_rng().gen::<u32>();
     375           18 : 
     376           18 :     let remote_storage_config = RemoteStorageConfig {
     377           18 :         storage: RemoteStorageKind::AwsS3(S3Config {
     378           18 :             bucket_name: remote_storage_s3_bucket,
     379           18 :             bucket_region: remote_storage_s3_region,
     380           18 :             prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
     381           18 :             endpoint: None,
     382           18 :             concurrency_limit: NonZeroUsize::new(100).unwrap(),
     383           18 :             max_keys_per_list_response,
     384           18 :             upload_storage_class: None,
     385           18 :         }),
     386           18 :         timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
     387           18 :     };
     388           18 :     Ok(Arc::new(
     389           18 :         GenericRemoteStorage::from_config(&remote_storage_config)
     390            0 :             .await
     391           18 :             .context("remote storage init")?,
     392              :     ))
     393           18 : }
     394              : 
     395            4 : #[test_context(MaybeEnabledStorage)]
     396              : #[tokio::test]
     397            4 : async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
     398            4 :     let MaybeEnabledStorage::Enabled(ctx) = ctx else {
     399            2 :         return;
     400              :     };
     401              : 
     402            2 :     let cancel = CancellationToken::new();
     403            2 : 
     404            2 :     let path = RemotePath::new(Utf8Path::new(
     405            2 :         format!("{}/file_to_copy", ctx.base_prefix).as_str(),
     406            2 :     ))
     407            2 :     .unwrap();
     408              : 
     409           17 :     let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
     410              : 
     411            2 :     let timeout = std::time::Duration::from_secs(5);
     412            2 : 
     413            2 :     ctx.configure_request_timeout(timeout);
     414            2 : 
     415            2 :     let started_at = std::time::Instant::now();
     416            2 :     let mut stream = ctx
     417            2 :         .client
     418            2 :         .download(&path, &cancel)
     419            2 :         .await
     420            2 :         .expect("download succeeds")
     421            2 :         .download_stream;
     422            2 : 
     423            2 :     if started_at.elapsed().mul_f32(0.9) >= timeout {
     424            0 :         tracing::warn!(
     425            0 :             elapsed_ms = started_at.elapsed().as_millis(),
     426            0 :             "timeout might be too low, consumed most of it during headers"
     427              :         );
     428            2 :     }
     429              : 
     430            2 :     let first = stream
     431            2 :         .next()
     432            2 :         .await
     433            2 :         .expect("should have the first blob")
     434            2 :         .expect("should have succeeded");
     435            2 : 
     436            2 :     tracing::info!(len = first.len(), "downloaded first chunk");
     437              : 
     438            2 :     assert!(
     439            2 :         first.len() < len,
     440            0 :         "uploaded file is too small, we downloaded all on first chunk"
     441              :     );
     442              : 
     443            6 :     tokio::time::sleep(timeout).await;
     444              : 
     445              :     {
     446            2 :         let started_at = std::time::Instant::now();
     447            2 :         let next = stream
     448            2 :             .next()
     449            0 :             .await
     450            2 :             .expect("stream should not have ended yet");
     451            2 : 
     452            2 :         tracing::info!(
     453            0 :             next.is_err = next.is_err(),
     454            0 :             elapsed_ms = started_at.elapsed().as_millis(),
     455            0 :             "received item after timeout"
     456              :         );
     457              : 
     458            2 :         let e = next.expect_err("expected an error, but got a chunk?");
     459            2 : 
     460            2 :         let inner = e.get_ref().expect("std::io::Error::inner should be set");
     461            2 :         assert!(
     462            2 :             inner
     463            2 :                 .downcast_ref::<DownloadError>()
     464            2 :                 .is_some_and(|e| matches!(e, DownloadError::Timeout)),
     465            0 :             "{inner:?}"
     466              :         );
     467              :     }
     468              : 
     469            2 :     ctx.configure_request_timeout(RemoteStorageConfig::DEFAULT_TIMEOUT);
     470            2 : 
     471           15 :     ctx.client.delete_objects(&[path], &cancel).await.unwrap()
     472            4 : }
     473              : 
     474            4 : #[test_context(MaybeEnabledStorage)]
     475              : #[tokio::test]
     476            4 : async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
     477            4 :     let MaybeEnabledStorage::Enabled(ctx) = ctx else {
     478            2 :         return;
     479              :     };
     480              : 
     481            2 :     let cancel = CancellationToken::new();
     482            2 : 
     483            2 :     let path = RemotePath::new(Utf8Path::new(
     484            2 :         format!("{}/file_to_copy", ctx.base_prefix).as_str(),
     485            2 :     ))
     486            2 :     .unwrap();
     487              : 
     488           15 :     let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
     489              : 
     490              :     {
     491            2 :         let stream = ctx
     492            2 :             .client
     493            2 :             .download(&path, &cancel)
     494            2 :             .await
     495            2 :             .expect("download succeeds")
     496            2 :             .download_stream;
     497            2 : 
     498            2 :         let mut reader = std::pin::pin!(tokio_util::io::StreamReader::new(stream));
     499              : 
     500            2 :         let first = reader.fill_buf().await.expect("should have the first blob");
     501            2 : 
     502            2 :         let len = first.len();
     503            2 :         tracing::info!(len, "downloaded first chunk");
     504              : 
     505            2 :         assert!(
     506            2 :             first.len() < file_len,
     507            0 :             "uploaded file is too small, we downloaded all on first chunk"
     508              :         );
     509              : 
     510            2 :         reader.consume(len);
     511            2 : 
     512            2 :         cancel.cancel();
     513              : 
     514            2 :         let next = reader.fill_buf().await;
     515              : 
     516            2 :         let e = next.expect_err("expected an error, but got a chunk?");
     517            2 : 
     518            2 :         let inner = e.get_ref().expect("std::io::Error::inner should be set");
     519            2 :         assert!(
     520            2 :             inner
     521            2 :                 .downcast_ref::<DownloadError>()
     522            2 :                 .is_some_and(|e| matches!(e, DownloadError::Cancelled)),
     523            0 :             "{inner:?}"
     524              :         );
     525              : 
     526            2 :         let e = DownloadError::from(e);
     527            2 : 
     528            2 :         assert!(matches!(e, DownloadError::Cancelled), "{e:?}");
     529              :     }
     530              : 
     531            2 :     let cancel = CancellationToken::new();
     532            2 : 
     533           21 :     ctx.client.delete_objects(&[path], &cancel).await.unwrap();
     534            4 : }
     535              : 
     536              : /// Upload a long enough file so that we cannot download it in single chunk
     537              : ///
     538              : /// For s3 the first chunk seems to be less than 10kB, so this has a bit of a safety margin
     539            4 : async fn upload_large_enough_file(
     540            4 :     client: &GenericRemoteStorage,
     541            4 :     path: &RemotePath,
     542            4 :     cancel: &CancellationToken,
     543            4 : ) -> usize {
     544            4 :     let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
     545            4 :     let body = bytes::Bytes::from(vec![0u8; 1024]);
     546            4 :     let contents = std::iter::once(header).chain(std::iter::repeat(body).take(128));
     547            4 : 
     548          516 :     let len = contents.clone().fold(0, |acc, next| acc + next.len());
     549            4 : 
     550            4 :     let contents = futures::stream::iter(contents.map(std::io::Result::Ok));
     551            4 : 
     552            4 :     client
     553            4 :         .upload(contents, len, path, None, cancel)
     554           32 :         .await
     555            4 :         .expect("upload succeeds");
     556            4 : 
     557            4 :     len
     558            4 : }
        

Generated by: LCOV version 2.1-beta