|             Line data    Source code 
       1              : use std::collections::HashSet;
       2              : use std::env;
       3              : use std::fmt::{Debug, Display};
       4              : use std::future::Future;
       5              : use std::num::NonZeroUsize;
       6              : use std::ops::ControlFlow;
       7              : use std::sync::Arc;
       8              : use std::time::{Duration, SystemTime, UNIX_EPOCH};
       9              : 
      10              : use anyhow::Context;
      11              : use camino::Utf8Path;
      12              : use futures_util::StreamExt;
      13              : use remote_storage::{
      14              :     DownloadError, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
      15              :     RemoteStorageConfig, RemoteStorageKind, S3Config,
      16              : };
      17              : use test_context::{AsyncTestContext, test_context};
      18              : use tokio::io::AsyncBufReadExt;
      19              : use tokio_util::sync::CancellationToken;
      20              : use tracing::info;
      21              : 
      22              : use crate::common::{download_to_vec, upload_stream};
      23              : 
      24              : mod common;
      25              : 
      26              : #[path = "common/tests.rs"]
      27              : mod tests_s3;
      28              : 
      29              : use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
      30              : use utils::backoff;
      31              : 
      32              : const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
      33              : const BASE_PREFIX: &str = "test";
      34              : 
      35              : #[test_context(MaybeEnabledStorage)]
      36              : #[tokio::test]
      37              : async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
      38              :     let ctx = match ctx {
      39              :         MaybeEnabledStorage::Enabled(ctx) => ctx,
      40              :         MaybeEnabledStorage::Disabled => return Ok(()),
      41              :     };
      42              :     // Our test depends on discrepancies in the clock between S3 and the environment the tests
      43              :     // run in. Therefore, wait a little bit before and after. The alternative would be
      44              :     // to take the time from S3 response headers.
      45              :     const WAIT_TIME: Duration = Duration::from_millis(3_000);
      46              : 
      47           26 :     async fn retry<T, O, F, E>(op: O) -> Result<T, E>
      48           26 :     where
      49           26 :         E: Display + Debug + 'static,
      50           26 :         O: FnMut() -> F,
      51           26 :         F: Future<Output = Result<T, E>>,
      52           26 :     {
      53           26 :         let warn_threshold = 3;
      54           26 :         let max_retries = 10;
      55           26 :         backoff::retry(
      56           26 :             op,
      57              :             |_e| false,
      58           26 :             warn_threshold,
      59           26 :             max_retries,
      60           26 :             "test retry",
      61           26 :             &CancellationToken::new(),
      62              :         )
      63           26 :         .await
      64           26 :         .expect("never cancelled")
      65           26 :     }
      66              : 
      67           12 :     async fn time_point() -> SystemTime {
      68           12 :         tokio::time::sleep(WAIT_TIME).await;
      69           12 :         let ret = SystemTime::now();
      70           12 :         tokio::time::sleep(WAIT_TIME).await;
      71           12 :         ret
      72           12 :     }
      73              : 
      74           12 :     async fn list_files(
      75           12 :         client: &Arc<GenericRemoteStorage>,
      76           12 :         cancel: &CancellationToken,
      77           12 :     ) -> anyhow::Result<HashSet<RemotePath>> {
      78              :         Ok(
      79           12 :             retry(|| client.list(None, ListingMode::NoDelimiter, None, cancel))
      80           12 :                 .await
      81           12 :                 .context("list root files failure")?
      82              :                 .keys
      83           12 :                 .into_iter()
      84           12 :                 .map(|o| o.key)
      85           12 :                 .collect::<HashSet<_>>(),
      86              :         )
      87           12 :     }
      88              : 
      89              :     let cancel = CancellationToken::new();
      90              : 
      91              :     let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
      92              :         .with_context(|| "RemotePath conversion")?;
      93              : 
      94              :     let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
      95              :         .with_context(|| "RemotePath conversion")?;
      96              : 
      97              :     let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
      98              :         .with_context(|| "RemotePath conversion")?;
      99              : 
     100            2 :     retry(|| {
     101            2 :         let (data, len) = upload_stream("remote blob data1".as_bytes().into());
     102            2 :         ctx.client.upload(data, len, &path1, None, &cancel)
     103            2 :     })
     104              :     .await?;
     105              : 
     106              :     let t0_files = list_files(&ctx.client, &cancel).await?;
     107              :     let t0 = time_point().await;
     108              :     println!("at t0: {t0_files:?}");
     109              : 
     110              :     let old_data = "remote blob data2";
     111              : 
     112            3 :     retry(|| {
     113            3 :         let (data, len) = upload_stream(old_data.as_bytes().into());
     114            3 :         ctx.client.upload(data, len, &path2, None, &cancel)
     115            3 :     })
     116              :     .await?;
     117              : 
     118              :     let t1_files = list_files(&ctx.client, &cancel).await?;
     119              :     let t1 = time_point().await;
     120              :     println!("at t1: {t1_files:?}");
     121              : 
     122              :     // A little check to ensure that our clock is not too far off from the S3 clock
     123              :     {
     124              :         let opts = DownloadOpts::default();
     125            4 :         let dl = retry(|| ctx.client.download(&path2, &opts, &cancel)).await?;
     126              :         let last_modified = dl.last_modified;
     127              :         let half_wt = WAIT_TIME.mul_f32(0.5);
     128              :         let t0_hwt = t0 + half_wt;
     129              :         let t1_hwt = t1 - half_wt;
     130              :         if !(t0_hwt..=t1_hwt).contains(&last_modified) {
     131              :             panic!(
     132              :                 "last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \
     133              :                 This likely means a large lock discrepancy between S3 and the local clock."
     134              :             );
     135              :         }
     136              :     }
     137              : 
     138            2 :     retry(|| {
     139            2 :         let (data, len) = upload_stream("remote blob data3".as_bytes().into());
     140            2 :         ctx.client.upload(data, len, &path3, None, &cancel)
     141            2 :     })
     142              :     .await?;
     143              : 
     144              :     let new_data = "new remote blob data2";
     145              : 
     146            2 :     retry(|| {
     147            2 :         let (data, len) = upload_stream(new_data.as_bytes().into());
     148            2 :         ctx.client.upload(data, len, &path2, None, &cancel)
     149            2 :     })
     150              :     .await?;
     151              : 
     152            2 :     retry(|| ctx.client.delete(&path1, &cancel)).await?;
     153              :     let t2_files = list_files(&ctx.client, &cancel).await?;
     154              :     let t2 = time_point().await;
     155              :     println!("at t2: {t2_files:?}");
     156              : 
     157              :     // No changes after recovery to t2 (no-op)
     158              :     let t_final = time_point().await;
     159              :     ctx.client
     160              :         .time_travel_recover(None, t2, t_final, &cancel, None)
     161              :         .await?;
     162              :     let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
     163              :     println!("after recovery to t2: {t2_files_recovered:?}");
     164              :     assert_eq!(t2_files, t2_files_recovered);
     165              :     let path2_recovered_t2 = download_to_vec(
     166              :         ctx.client
     167              :             .download(&path2, &DownloadOpts::default(), &cancel)
     168              :             .await?,
     169              :     )
     170              :     .await?;
     171              :     assert_eq!(path2_recovered_t2, new_data.as_bytes());
     172              : 
     173              :     // after recovery to t1: path1 is back, path2 has the old content
     174              :     let t_final = time_point().await;
     175              :     ctx.client
     176              :         .time_travel_recover(None, t1, t_final, &cancel, None)
     177              :         .await?;
     178              :     let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
     179              :     println!("after recovery to t1: {t1_files_recovered:?}");
     180              :     assert_eq!(t1_files, t1_files_recovered);
     181              :     let path2_recovered_t1 = download_to_vec(
     182              :         ctx.client
     183              :             .download(&path2, &DownloadOpts::default(), &cancel)
     184              :             .await?,
     185              :     )
     186              :     .await?;
     187              :     assert_eq!(path2_recovered_t1, old_data.as_bytes());
     188              : 
     189              :     // after recovery to t0: everything is gone except for path1
     190              :     let t_final = time_point().await;
     191              :     ctx.client
     192              :         .time_travel_recover(None, t0, t_final, &cancel, None)
     193              :         .await?;
     194              :     let t0_files_recovered = list_files(&ctx.client, &cancel).await?;
     195              :     println!("after recovery to t0: {t0_files_recovered:?}");
     196              :     assert_eq!(t0_files, t0_files_recovered);
     197              : 
     198              :     // cleanup
     199              : 
     200              :     let paths = &[path1, path2, path3];
     201            2 :     retry(|| ctx.client.delete_objects(paths, &cancel)).await?;
     202              : 
     203              :     Ok(())
     204              : }
     205              : 
     206              : struct EnabledS3 {
     207              :     client: Arc<GenericRemoteStorage>,
     208              :     base_prefix: &'static str,
     209              : }
     210              : 
     211              : impl EnabledS3 {
     212           26 :     async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
     213           26 :         let client = create_s3_client(max_keys_in_list_response)
     214           26 :             .await
     215           26 :             .context("S3 client creation")
     216           26 :             .expect("S3 client creation failed");
     217              : 
     218           26 :         EnabledS3 {
     219           26 :             client,
     220           26 :             base_prefix: BASE_PREFIX,
     221           26 :         }
     222           26 :     }
     223              : 
     224            4 :     fn configure_request_timeout(&mut self, timeout: Duration) {
     225            4 :         match Arc::get_mut(&mut self.client).expect("outer Arc::get_mut") {
     226            4 :             GenericRemoteStorage::AwsS3(s3) => {
     227            4 :                 let s3 = Arc::get_mut(s3).expect("inner Arc::get_mut");
     228            4 :                 s3.timeout = timeout;
     229            4 :             }
     230            0 :             _ => unreachable!(),
     231              :         }
     232            4 :     }
     233              : }
     234              : 
     235              : enum MaybeEnabledStorage {
     236              :     Enabled(EnabledS3),
     237              :     Disabled,
     238              : }
     239              : 
     240              : impl AsyncTestContext for MaybeEnabledStorage {
     241           27 :     async fn setup() -> Self {
     242           27 :         ensure_logging_ready();
     243              : 
     244           27 :         if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
     245            9 :             info!(
     246            0 :                 "`{}` env variable is not set, skipping the test",
     247              :                 ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
     248              :             );
     249            9 :             return Self::Disabled;
     250           18 :         }
     251              : 
     252           18 :         Self::Enabled(EnabledS3::setup(None).await)
     253           27 :     }
     254              : }
     255              : 
     256              : enum MaybeEnabledStorageWithTestBlobs {
     257              :     Enabled(S3WithTestBlobs),
     258              :     Disabled,
     259              :     UploadsFailed(anyhow::Error, S3WithTestBlobs),
     260              : }
     261              : 
     262              : struct S3WithTestBlobs {
     263              :     enabled: EnabledS3,
     264              :     remote_prefixes: HashSet<RemotePath>,
     265              :     remote_blobs: HashSet<RemotePath>,
     266              : }
     267              : 
     268              : impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
     269            3 :     async fn setup() -> Self {
     270            3 :         ensure_logging_ready();
     271            3 :         if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
     272            1 :             info!(
     273            0 :                 "`{}` env variable is not set, skipping the test",
     274              :                 ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
     275              :             );
     276            1 :             return Self::Disabled;
     277            2 :         }
     278              : 
     279            2 :         let max_keys_in_list_response = 10;
     280            2 :         let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
     281              : 
     282            2 :         let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
     283              : 
     284            2 :         match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
     285            2 :             ControlFlow::Continue(uploads) => {
     286            2 :                 info!("Remote objects created successfully");
     287              : 
     288            2 :                 Self::Enabled(S3WithTestBlobs {
     289            2 :                     enabled,
     290            2 :                     remote_prefixes: uploads.prefixes,
     291            2 :                     remote_blobs: uploads.blobs,
     292            2 :                 })
     293              :             }
     294            0 :             ControlFlow::Break(uploads) => Self::UploadsFailed(
     295            0 :                 anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
     296            0 :                 S3WithTestBlobs {
     297            0 :                     enabled,
     298            0 :                     remote_prefixes: uploads.prefixes,
     299            0 :                     remote_blobs: uploads.blobs,
     300            0 :                 },
     301            0 :             ),
     302              :         }
     303            3 :     }
     304              : 
     305            3 :     async fn teardown(self) {
     306            3 :         match self {
     307            1 :             Self::Disabled => {}
     308            2 :             Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
     309            2 :                 cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
     310              :             }
     311              :         }
     312            3 :     }
     313              : }
     314              : 
     315              : enum MaybeEnabledStorageWithSimpleTestBlobs {
     316              :     Enabled(S3WithSimpleTestBlobs),
     317              :     Disabled,
     318              :     UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
     319              : }
     320              : struct S3WithSimpleTestBlobs {
     321              :     enabled: EnabledS3,
     322              :     remote_blobs: HashSet<RemotePath>,
     323              : }
     324              : 
     325              : impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
     326            9 :     async fn setup() -> Self {
     327            9 :         ensure_logging_ready();
     328            9 :         if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
     329            3 :             info!(
     330            0 :                 "`{}` env variable is not set, skipping the test",
     331              :                 ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
     332              :             );
     333            3 :             return Self::Disabled;
     334            6 :         }
     335              : 
     336            6 :         let max_keys_in_list_response = 10;
     337            6 :         let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
     338              : 
     339            6 :         let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
     340              : 
     341            6 :         match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
     342            6 :             ControlFlow::Continue(uploads) => {
     343            6 :                 info!("Remote objects created successfully");
     344              : 
     345            6 :                 Self::Enabled(S3WithSimpleTestBlobs {
     346            6 :                     enabled,
     347            6 :                     remote_blobs: uploads,
     348            6 :                 })
     349              :             }
     350            0 :             ControlFlow::Break(uploads) => Self::UploadsFailed(
     351            0 :                 anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
     352            0 :                 S3WithSimpleTestBlobs {
     353            0 :                     enabled,
     354            0 :                     remote_blobs: uploads,
     355            0 :                 },
     356            0 :             ),
     357              :         }
     358            9 :     }
     359              : 
     360            9 :     async fn teardown(self) {
     361            9 :         match self {
     362            3 :             Self::Disabled => {}
     363            6 :             Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
     364            6 :                 cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
     365              :             }
     366              :         }
     367            9 :     }
     368              : }
     369              : 
     370           26 : async fn create_s3_client(
     371           26 :     max_keys_per_list_response: Option<i32>,
     372           26 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
     373              :     use rand::Rng;
     374              : 
     375           26 :     let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
     376           26 :         .context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
     377           26 :     let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
     378           26 :         .context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?;
     379              : 
     380              :     // due to how time works, we've had test runners use the same nanos as bucket prefixes.
     381              :     // millis is just a debugging aid for easier finding the prefix later.
     382           26 :     let millis = std::time::SystemTime::now()
     383           26 :         .duration_since(UNIX_EPOCH)
     384           26 :         .context("random s3 test prefix part calculation")?
     385           26 :         .as_millis();
     386              : 
     387              :     // because nanos can be the same for two threads so can millis, add randomness
     388           26 :     let random = rand::rng().random::<u32>();
     389              : 
     390           26 :     let remote_storage_config = RemoteStorageConfig {
     391           26 :         storage: RemoteStorageKind::AwsS3(S3Config {
     392           26 :             bucket_name: remote_storage_s3_bucket,
     393           26 :             bucket_region: remote_storage_s3_region,
     394           26 :             prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
     395           26 :             endpoint: None,
     396           26 :             concurrency_limit: NonZeroUsize::new(100).unwrap(),
     397           26 :             max_keys_per_list_response,
     398           26 :             upload_storage_class: None,
     399           26 :         }),
     400           26 :         timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
     401           26 :         small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
     402           26 :     };
     403           26 :     Ok(Arc::new(
     404           26 :         GenericRemoteStorage::from_config(&remote_storage_config)
     405           26 :             .await
     406           26 :             .context("remote storage init")?,
     407              :     ))
     408           26 : }
     409              : 
     410              : #[test_context(MaybeEnabledStorage)]
     411              : #[tokio::test]
     412              : async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
     413              :     let MaybeEnabledStorage::Enabled(ctx) = ctx else {
     414              :         return;
     415              :     };
     416              : 
     417              :     let cancel = CancellationToken::new();
     418              : 
     419              :     let path = RemotePath::new(Utf8Path::new(
     420              :         format!("{}/file_to_copy", ctx.base_prefix).as_str(),
     421              :     ))
     422              :     .unwrap();
     423              : 
     424              :     let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
     425              : 
     426              :     let timeout = std::time::Duration::from_secs(5);
     427              : 
     428              :     ctx.configure_request_timeout(timeout);
     429              : 
     430              :     let started_at = std::time::Instant::now();
     431              :     let mut stream = ctx
     432              :         .client
     433              :         .download(&path, &DownloadOpts::default(), &cancel)
     434              :         .await
     435              :         .expect("download succeeds")
     436              :         .download_stream;
     437              : 
     438              :     if started_at.elapsed().mul_f32(0.9) >= timeout {
     439              :         tracing::warn!(
     440              :             elapsed_ms = started_at.elapsed().as_millis(),
     441              :             "timeout might be too low, consumed most of it during headers"
     442              :         );
     443              :     }
     444              : 
     445              :     let first = stream
     446              :         .next()
     447              :         .await
     448              :         .expect("should have the first blob")
     449              :         .expect("should have succeeded");
     450              : 
     451              :     tracing::info!(len = first.len(), "downloaded first chunk");
     452              : 
     453              :     assert!(
     454              :         first.len() < len,
     455              :         "uploaded file is too small, we downloaded all on first chunk"
     456              :     );
     457              : 
     458              :     tokio::time::sleep(timeout).await;
     459              : 
     460              :     {
     461              :         let started_at = std::time::Instant::now();
     462              :         let next = stream
     463              :             .next()
     464              :             .await
     465              :             .expect("stream should not have ended yet");
     466              : 
     467              :         tracing::info!(
     468              :             next.is_err = next.is_err(),
     469              :             elapsed_ms = started_at.elapsed().as_millis(),
     470              :             "received item after timeout"
     471              :         );
     472              : 
     473              :         let e = next.expect_err("expected an error, but got a chunk?");
     474              : 
     475              :         let inner = e.get_ref().expect("std::io::Error::inner should be set");
     476              :         assert!(
     477              :             inner
     478              :                 .downcast_ref::<DownloadError>()
     479            2 :                 .is_some_and(|e| matches!(e, DownloadError::Timeout)),
     480              :             "{inner:?}"
     481              :         );
     482              :     }
     483              : 
     484              :     ctx.configure_request_timeout(RemoteStorageConfig::DEFAULT_TIMEOUT);
     485              : 
     486              :     ctx.client.delete_objects(&[path], &cancel).await.unwrap()
     487              : }
     488              : 
     489              : #[test_context(MaybeEnabledStorage)]
     490              : #[tokio::test]
     491              : async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
     492              :     let MaybeEnabledStorage::Enabled(ctx) = ctx else {
     493              :         return;
     494              :     };
     495              : 
     496              :     let cancel = CancellationToken::new();
     497              : 
     498              :     let path = RemotePath::new(Utf8Path::new(
     499              :         format!("{}/file_to_copy", ctx.base_prefix).as_str(),
     500              :     ))
     501              :     .unwrap();
     502              : 
     503              :     let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
     504              : 
     505              :     {
     506              :         let stream = ctx
     507              :             .client
     508              :             .download(&path, &DownloadOpts::default(), &cancel)
     509              :             .await
     510              :             .expect("download succeeds")
     511              :             .download_stream;
     512              : 
     513              :         let mut reader = std::pin::pin!(tokio_util::io::StreamReader::new(stream));
     514              : 
     515              :         let first = reader.fill_buf().await.expect("should have the first blob");
     516              : 
     517              :         let len = first.len();
     518              :         tracing::info!(len, "downloaded first chunk");
     519              : 
     520              :         assert!(
     521              :             first.len() < file_len,
     522              :             "uploaded file is too small, we downloaded all on first chunk"
     523              :         );
     524              : 
     525              :         reader.consume(len);
     526              : 
     527              :         cancel.cancel();
     528              : 
     529              :         let next = reader.fill_buf().await;
     530              : 
     531              :         let e = next.expect_err("expected an error, but got a chunk?");
     532              : 
     533              :         let inner = e.get_ref().expect("std::io::Error::inner should be set");
     534              :         assert!(
     535              :             inner
     536              :                 .downcast_ref::<DownloadError>()
     537            2 :                 .is_some_and(|e| matches!(e, DownloadError::Cancelled)),
     538              :             "{inner:?}"
     539              :         );
     540              : 
     541              :         let e = DownloadError::from(e);
     542              : 
     543              :         assert!(matches!(e, DownloadError::Cancelled), "{e:?}");
     544              :     }
     545              : 
     546              :     let cancel = CancellationToken::new();
     547              : 
     548              :     ctx.client.delete_objects(&[path], &cancel).await.unwrap();
     549              : }
     550              : 
     551              : /// Upload a long enough file so that we cannot download it in single chunk
     552              : ///
     553              : /// For s3 the first chunk seems to be less than 10kB, so this has a bit of a safety margin
     554            4 : async fn upload_large_enough_file(
     555            4 :     client: &GenericRemoteStorage,
     556            4 :     path: &RemotePath,
     557            4 :     cancel: &CancellationToken,
     558            4 : ) -> usize {
     559            4 :     let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
     560            4 :     let body = bytes::Bytes::from(vec![0u8; 1024]);
     561            4 :     let contents = std::iter::once(header).chain(std::iter::repeat_n(body, 128));
     562              : 
     563          516 :     let len = contents.clone().fold(0, |acc, next| acc + next.len());
     564              : 
     565            4 :     let contents = futures::stream::iter(contents.map(std::io::Result::Ok));
     566              : 
     567            4 :     client
     568            4 :         .upload(contents, len, path, None, cancel)
     569            4 :         .await
     570            4 :         .expect("upload succeeds");
     571              : 
     572            4 :     len
     573            4 : }
         |