LCOV - code coverage report
Current view: top level - libs/remote_storage/tests - test_real_s3.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 17.4 % 241 42
Test Date: 2024-02-07 07:37:29 Functions: 26.7 % 60 16

            Line data    Source code
       1              : use std::env;
       2              : use std::fmt::{Debug, Display};
       3              : use std::num::NonZeroUsize;
       4              : use std::ops::ControlFlow;
       5              : use std::sync::Arc;
       6              : use std::time::{Duration, UNIX_EPOCH};
       7              : use std::{collections::HashSet, time::SystemTime};
       8              : 
       9              : use crate::common::{download_to_vec, upload_stream};
      10              : use anyhow::Context;
      11              : use camino::Utf8Path;
      12              : use futures_util::Future;
      13              : use remote_storage::{
      14              :     GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
      15              : };
      16              : use test_context::test_context;
      17              : use test_context::AsyncTestContext;
      18              : use tokio_util::sync::CancellationToken;
      19              : use tracing::info;
      20              : 
      21              : mod common;
      22              : 
      23              : #[path = "common/tests.rs"]
      24              : mod tests_s3;
      25              : 
      26              : use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
      27              : use utils::backoff;
      28              : 
      29              : const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
      30              : 
      31              : const BASE_PREFIX: &str = "test";
      32              : 
      33            2 : #[test_context(MaybeEnabledStorage)]
      34            2 : #[tokio::test]
      35            2 : async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
      36            2 :     let ctx = match ctx {
      37            0 :         MaybeEnabledStorage::Enabled(ctx) => ctx,
      38            2 :         MaybeEnabledStorage::Disabled => return Ok(()),
      39              :     };
      40              :     // Our test depends on discrepancies in the clock between S3 and the environment the tests
      41              :     // run in. Therefore, wait a little bit before and after. The alternative would be
      42              :     // to take the time from S3 response headers.
      43              :     const WAIT_TIME: Duration = Duration::from_millis(3_000);
      44              : 
      45            0 :     async fn retry<T, O, F, E>(op: O) -> Result<T, E>
      46            0 :     where
      47            0 :         E: Display + Debug + 'static,
      48            0 :         O: FnMut() -> F,
      49            0 :         F: Future<Output = Result<T, E>>,
      50            0 :     {
      51            0 :         let warn_threshold = 3;
      52            0 :         let max_retries = 10;
      53            0 :         backoff::retry(
      54            0 :             op,
      55            0 :             |_e| false,
      56            0 :             warn_threshold,
      57            0 :             max_retries,
      58            0 :             "test retry",
      59            0 :             &CancellationToken::new(),
      60            0 :         )
      61            0 :         .await
      62            0 :         .expect("never cancelled")
      63            0 :     }
      64              : 
      65            0 :     async fn time_point() -> SystemTime {
      66            0 :         tokio::time::sleep(WAIT_TIME).await;
      67            0 :         let ret = SystemTime::now();
      68            0 :         tokio::time::sleep(WAIT_TIME).await;
      69            0 :         ret
      70            0 :     }
      71              : 
      72            0 :     async fn list_files(client: &Arc<GenericRemoteStorage>) -> anyhow::Result<HashSet<RemotePath>> {
      73            0 :         Ok(retry(|| client.list_files(None))
      74            0 :             .await
      75            0 :             .context("list root files failure")?
      76            0 :             .into_iter()
      77            0 :             .collect::<HashSet<_>>())
      78            0 :     }
      79              : 
      80            0 :     let cancel = CancellationToken::new();
      81              : 
      82            0 :     let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
      83            0 :         .with_context(|| "RemotePath conversion")?;
      84              : 
      85            0 :     let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
      86            0 :         .with_context(|| "RemotePath conversion")?;
      87              : 
      88            0 :     let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
      89            0 :         .with_context(|| "RemotePath conversion")?;
      90              : 
      91            0 :     retry(|| {
      92            0 :         let (data, len) = upload_stream("remote blob data1".as_bytes().into());
      93            0 :         ctx.client.upload(data, len, &path1, None)
      94            0 :     })
      95            0 :     .await?;
      96              : 
      97            0 :     let t0_files = list_files(&ctx.client).await?;
      98            0 :     let t0 = time_point().await;
      99            0 :     println!("at t0: {t0_files:?}");
     100            0 : 
     101            0 :     let old_data = "remote blob data2";
     102            0 : 
     103            0 :     retry(|| {
     104            0 :         let (data, len) = upload_stream(old_data.as_bytes().into());
     105            0 :         ctx.client.upload(data, len, &path2, None)
     106            0 :     })
     107            0 :     .await?;
     108              : 
     109            0 :     let t1_files = list_files(&ctx.client).await?;
     110            0 :     let t1 = time_point().await;
     111            0 :     println!("at t1: {t1_files:?}");
     112              : 
     113              :     // A little check to ensure that our clock is not too far off from the S3 clock
     114              :     {
     115            0 :         let dl = retry(|| ctx.client.download(&path2)).await?;
     116            0 :         let last_modified = dl.last_modified.unwrap();
     117            0 :         let half_wt = WAIT_TIME.mul_f32(0.5);
     118            0 :         let t0_hwt = t0 + half_wt;
     119            0 :         let t1_hwt = t1 - half_wt;
     120            0 :         if !(t0_hwt..=t1_hwt).contains(&last_modified) {
     121            0 :             panic!("last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \
     122            0 :                 This likely means a large lock discrepancy between S3 and the local clock.");
     123            0 :         }
     124            0 :     }
     125            0 : 
     126            0 :     retry(|| {
     127            0 :         let (data, len) = upload_stream("remote blob data3".as_bytes().into());
     128            0 :         ctx.client.upload(data, len, &path3, None)
     129            0 :     })
     130            0 :     .await?;
     131              : 
     132            0 :     let new_data = "new remote blob data2";
     133            0 : 
     134            0 :     retry(|| {
     135            0 :         let (data, len) = upload_stream(new_data.as_bytes().into());
     136            0 :         ctx.client.upload(data, len, &path2, None)
     137            0 :     })
     138            0 :     .await?;
     139              : 
     140            0 :     retry(|| ctx.client.delete(&path1)).await?;
     141            0 :     let t2_files = list_files(&ctx.client).await?;
     142            0 :     let t2 = time_point().await;
     143            0 :     println!("at t2: {t2_files:?}");
     144              : 
     145              :     // No changes after recovery to t2 (no-op)
     146            0 :     let t_final = time_point().await;
     147            0 :     ctx.client
     148            0 :         .time_travel_recover(None, t2, t_final, &cancel)
     149            0 :         .await?;
     150            0 :     let t2_files_recovered = list_files(&ctx.client).await?;
     151            0 :     println!("after recovery to t2: {t2_files_recovered:?}");
     152            0 :     assert_eq!(t2_files, t2_files_recovered);
     153            0 :     let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2).await?).await?;
     154            0 :     assert_eq!(path2_recovered_t2, new_data.as_bytes());
     155              : 
     156              :     // after recovery to t1: path1 is back, path2 has the old content
     157            0 :     let t_final = time_point().await;
     158            0 :     ctx.client
     159            0 :         .time_travel_recover(None, t1, t_final, &cancel)
     160            0 :         .await?;
     161            0 :     let t1_files_recovered = list_files(&ctx.client).await?;
     162            0 :     println!("after recovery to t1: {t1_files_recovered:?}");
     163            0 :     assert_eq!(t1_files, t1_files_recovered);
     164            0 :     let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2).await?).await?;
     165            0 :     assert_eq!(path2_recovered_t1, old_data.as_bytes());
     166              : 
     167              :     // after recovery to t0: everything is gone except for path1
     168            0 :     let t_final = time_point().await;
     169            0 :     ctx.client
     170            0 :         .time_travel_recover(None, t0, t_final, &cancel)
     171            0 :         .await?;
     172            0 :     let t0_files_recovered = list_files(&ctx.client).await?;
     173            0 :     println!("after recovery to t0: {t0_files_recovered:?}");
     174            0 :     assert_eq!(t0_files, t0_files_recovered);
     175              : 
     176              :     // cleanup
     177              : 
     178            0 :     let paths = &[path1, path2, path3];
     179            0 :     retry(|| ctx.client.delete_objects(paths)).await?;
     180              : 
     181            0 :     Ok(())
     182            2 : }
     183              : 
     184              : struct EnabledS3 {
     185              :     client: Arc<GenericRemoteStorage>,
     186              :     base_prefix: &'static str,
     187              : }
     188              : 
     189              : impl EnabledS3 {
     190            0 :     async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
     191            0 :         let client = create_s3_client(max_keys_in_list_response)
     192            0 :             .context("S3 client creation")
     193            0 :             .expect("S3 client creation failed");
     194            0 : 
     195            0 :         EnabledS3 {
     196            0 :             client,
     197            0 :             base_prefix: BASE_PREFIX,
     198            0 :         }
     199            0 :     }
     200              : }
     201              : 
     202              : enum MaybeEnabledStorage {
     203              :     Enabled(EnabledS3),
     204              :     Disabled,
     205              : }
     206              : 
     207              : #[async_trait::async_trait]
     208              : impl AsyncTestContext for MaybeEnabledStorage {
     209           10 :     async fn setup() -> Self {
     210           10 :         ensure_logging_ready();
     211           10 : 
     212           10 :         if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
     213           10 :             info!(
     214           10 :                 "`{}` env variable is not set, skipping the test",
     215           10 :                 ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
     216           10 :             );
     217           10 :             return Self::Disabled;
     218            0 :         }
     219            0 : 
     220            0 :         Self::Enabled(EnabledS3::setup(None).await)
     221           20 :     }
     222              : }
     223              : 
     224              : enum MaybeEnabledStorageWithTestBlobs {
     225              :     Enabled(S3WithTestBlobs),
     226              :     Disabled,
     227              :     UploadsFailed(anyhow::Error, S3WithTestBlobs),
     228              : }
     229              : 
     230              : struct S3WithTestBlobs {
     231              :     enabled: EnabledS3,
     232              :     remote_prefixes: HashSet<RemotePath>,
     233              :     remote_blobs: HashSet<RemotePath>,
     234              : }
     235              : 
     236              : #[async_trait::async_trait]
     237              : impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
     238            2 :     async fn setup() -> Self {
     239            2 :         ensure_logging_ready();
     240            2 :         if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
     241            2 :             info!(
     242            2 :                 "`{}` env variable is not set, skipping the test",
     243            2 :                 ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
     244            2 :             );
     245            2 :             return Self::Disabled;
     246            0 :         }
     247            0 : 
     248            0 :         let max_keys_in_list_response = 10;
     249            0 :         let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
     250              : 
     251            0 :         let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
     252              : 
     253            0 :         match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
     254            0 :             ControlFlow::Continue(uploads) => {
     255            0 :                 info!("Remote objects created successfully");
     256              : 
     257            0 :                 Self::Enabled(S3WithTestBlobs {
     258            0 :                     enabled,
     259            0 :                     remote_prefixes: uploads.prefixes,
     260            0 :                     remote_blobs: uploads.blobs,
     261            0 :                 })
     262              :             }
     263            0 :             ControlFlow::Break(uploads) => Self::UploadsFailed(
     264            0 :                 anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
     265            0 :                 S3WithTestBlobs {
     266            0 :                     enabled,
     267            0 :                     remote_prefixes: uploads.prefixes,
     268            0 :                     remote_blobs: uploads.blobs,
     269            0 :                 },
     270            0 :             ),
     271              :         }
     272            4 :     }
     273              : 
     274            2 :     async fn teardown(self) {
     275            2 :         match self {
     276            2 :             Self::Disabled => {}
     277            0 :             Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
     278            0 :                 cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
     279              :             }
     280              :         }
     281            2 :     }
     282              : }
     283              : 
     284              : // NOTE: the setups for the list_prefixes test and the list_files test are very similar
     285              : // However, they are not idential. The list_prefixes function is concerned with listing prefixes,
     286              : // whereas the list_files function is concerned with listing files.
     287              : // See `RemoteStorage::list_files` documentation for more details
     288              : enum MaybeEnabledStorageWithSimpleTestBlobs {
     289              :     Enabled(S3WithSimpleTestBlobs),
     290              :     Disabled,
     291              :     UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
     292              : }
     293              : struct S3WithSimpleTestBlobs {
     294              :     enabled: EnabledS3,
     295              :     remote_blobs: HashSet<RemotePath>,
     296              : }
     297              : 
     298              : #[async_trait::async_trait]
     299              : impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
     300            2 :     async fn setup() -> Self {
     301            2 :         ensure_logging_ready();
     302            2 :         if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
     303            2 :             info!(
     304            2 :                 "`{}` env variable is not set, skipping the test",
     305            2 :                 ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
     306            2 :             );
     307            2 :             return Self::Disabled;
     308            0 :         }
     309            0 : 
     310            0 :         let max_keys_in_list_response = 10;
     311            0 :         let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
     312              : 
     313            0 :         let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
     314              : 
     315            0 :         match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
     316            0 :             ControlFlow::Continue(uploads) => {
     317            0 :                 info!("Remote objects created successfully");
     318              : 
     319            0 :                 Self::Enabled(S3WithSimpleTestBlobs {
     320            0 :                     enabled,
     321            0 :                     remote_blobs: uploads,
     322            0 :                 })
     323              :             }
     324            0 :             ControlFlow::Break(uploads) => Self::UploadsFailed(
     325            0 :                 anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
     326            0 :                 S3WithSimpleTestBlobs {
     327            0 :                     enabled,
     328            0 :                     remote_blobs: uploads,
     329            0 :                 },
     330            0 :             ),
     331              :         }
     332            4 :     }
     333              : 
     334            2 :     async fn teardown(self) {
     335            2 :         match self {
     336            2 :             Self::Disabled => {}
     337            0 :             Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
     338            0 :                 cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
     339              :             }
     340              :         }
     341            2 :     }
     342              : }
     343              : 
     344            0 : fn create_s3_client(
     345            0 :     max_keys_per_list_response: Option<i32>,
     346            0 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
     347              :     use rand::Rng;
     348              : 
     349            0 :     let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
     350            0 :         .context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
     351            0 :     let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
     352            0 :         .context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?;
     353              : 
     354              :     // due to how time works, we've had test runners use the same nanos as bucket prefixes.
     355              :     // millis is just a debugging aid for easier finding the prefix later.
     356            0 :     let millis = std::time::SystemTime::now()
     357            0 :         .duration_since(UNIX_EPOCH)
     358            0 :         .context("random s3 test prefix part calculation")?
     359            0 :         .as_millis();
     360            0 : 
     361            0 :     // because nanos can be the same for two threads so can millis, add randomness
     362            0 :     let random = rand::thread_rng().gen::<u32>();
     363            0 : 
     364            0 :     let remote_storage_config = RemoteStorageConfig {
     365            0 :         storage: RemoteStorageKind::AwsS3(S3Config {
     366            0 :             bucket_name: remote_storage_s3_bucket,
     367            0 :             bucket_region: remote_storage_s3_region,
     368            0 :             prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
     369            0 :             endpoint: None,
     370            0 :             concurrency_limit: NonZeroUsize::new(100).unwrap(),
     371            0 :             max_keys_per_list_response,
     372            0 :         }),
     373            0 :     };
     374            0 :     Ok(Arc::new(
     375            0 :         GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
     376              :     ))
     377            0 : }
        

Generated by: LCOV version 2.1-beta