Line data Source code
1 : use std::env;
2 : use std::fmt::{Debug, Display};
3 : use std::num::NonZeroUsize;
4 : use std::ops::ControlFlow;
5 : use std::sync::Arc;
6 : use std::time::{Duration, UNIX_EPOCH};
7 : use std::{collections::HashSet, time::SystemTime};
8 :
9 : use crate::common::{download_to_vec, upload_stream};
10 : use anyhow::Context;
11 : use camino::Utf8Path;
12 : use futures_util::Future;
13 : use remote_storage::{
14 : GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
15 : };
16 : use test_context::test_context;
17 : use test_context::AsyncTestContext;
18 : use tokio_util::sync::CancellationToken;
19 : use tracing::info;
20 :
21 : mod common;
22 :
23 : #[path = "common/tests.rs"]
24 : mod tests_s3;
25 :
26 : use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
27 : use utils::backoff;
28 :
29 : const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
30 :
31 : const BASE_PREFIX: &str = "test";
32 :
33 2 : #[test_context(MaybeEnabledStorage)]
34 2 : #[tokio::test]
35 2 : async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
36 2 : let ctx = match ctx {
37 0 : MaybeEnabledStorage::Enabled(ctx) => ctx,
38 2 : MaybeEnabledStorage::Disabled => return Ok(()),
39 : };
40 : // Our test depends on discrepancies in the clock between S3 and the environment the tests
41 : // run in. Therefore, wait a little bit before and after. The alternative would be
42 : // to take the time from S3 response headers.
43 : const WAIT_TIME: Duration = Duration::from_millis(3_000);
44 :
45 0 : async fn retry<T, O, F, E>(op: O) -> Result<T, E>
46 0 : where
47 0 : E: Display + Debug + 'static,
48 0 : O: FnMut() -> F,
49 0 : F: Future<Output = Result<T, E>>,
50 0 : {
51 0 : let warn_threshold = 3;
52 0 : let max_retries = 10;
53 0 : backoff::retry(
54 0 : op,
55 0 : |_e| false,
56 0 : warn_threshold,
57 0 : max_retries,
58 0 : "test retry",
59 0 : &CancellationToken::new(),
60 0 : )
61 0 : .await
62 0 : .expect("never cancelled")
63 0 : }
64 :
65 0 : async fn time_point() -> SystemTime {
66 0 : tokio::time::sleep(WAIT_TIME).await;
67 0 : let ret = SystemTime::now();
68 0 : tokio::time::sleep(WAIT_TIME).await;
69 0 : ret
70 0 : }
71 :
72 0 : async fn list_files(client: &Arc<GenericRemoteStorage>) -> anyhow::Result<HashSet<RemotePath>> {
73 0 : Ok(retry(|| client.list_files(None))
74 0 : .await
75 0 : .context("list root files failure")?
76 0 : .into_iter()
77 0 : .collect::<HashSet<_>>())
78 0 : }
79 :
80 0 : let cancel = CancellationToken::new();
81 :
82 0 : let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
83 0 : .with_context(|| "RemotePath conversion")?;
84 :
85 0 : let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
86 0 : .with_context(|| "RemotePath conversion")?;
87 :
88 0 : let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
89 0 : .with_context(|| "RemotePath conversion")?;
90 :
91 0 : retry(|| {
92 0 : let (data, len) = upload_stream("remote blob data1".as_bytes().into());
93 0 : ctx.client.upload(data, len, &path1, None)
94 0 : })
95 0 : .await?;
96 :
97 0 : let t0_files = list_files(&ctx.client).await?;
98 0 : let t0 = time_point().await;
99 0 : println!("at t0: {t0_files:?}");
100 0 :
101 0 : let old_data = "remote blob data2";
102 0 :
103 0 : retry(|| {
104 0 : let (data, len) = upload_stream(old_data.as_bytes().into());
105 0 : ctx.client.upload(data, len, &path2, None)
106 0 : })
107 0 : .await?;
108 :
109 0 : let t1_files = list_files(&ctx.client).await?;
110 0 : let t1 = time_point().await;
111 0 : println!("at t1: {t1_files:?}");
112 :
113 : // A little check to ensure that our clock is not too far off from the S3 clock
114 : {
115 0 : let dl = retry(|| ctx.client.download(&path2)).await?;
116 0 : let last_modified = dl.last_modified.unwrap();
117 0 : let half_wt = WAIT_TIME.mul_f32(0.5);
118 0 : let t0_hwt = t0 + half_wt;
119 0 : let t1_hwt = t1 - half_wt;
120 0 : if !(t0_hwt..=t1_hwt).contains(&last_modified) {
121 0 : panic!("last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \
122 0 : This likely means a large lock discrepancy between S3 and the local clock.");
123 0 : }
124 0 : }
125 0 :
126 0 : retry(|| {
127 0 : let (data, len) = upload_stream("remote blob data3".as_bytes().into());
128 0 : ctx.client.upload(data, len, &path3, None)
129 0 : })
130 0 : .await?;
131 :
132 0 : let new_data = "new remote blob data2";
133 0 :
134 0 : retry(|| {
135 0 : let (data, len) = upload_stream(new_data.as_bytes().into());
136 0 : ctx.client.upload(data, len, &path2, None)
137 0 : })
138 0 : .await?;
139 :
140 0 : retry(|| ctx.client.delete(&path1)).await?;
141 0 : let t2_files = list_files(&ctx.client).await?;
142 0 : let t2 = time_point().await;
143 0 : println!("at t2: {t2_files:?}");
144 :
145 : // No changes after recovery to t2 (no-op)
146 0 : let t_final = time_point().await;
147 0 : ctx.client
148 0 : .time_travel_recover(None, t2, t_final, &cancel)
149 0 : .await?;
150 0 : let t2_files_recovered = list_files(&ctx.client).await?;
151 0 : println!("after recovery to t2: {t2_files_recovered:?}");
152 0 : assert_eq!(t2_files, t2_files_recovered);
153 0 : let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2).await?).await?;
154 0 : assert_eq!(path2_recovered_t2, new_data.as_bytes());
155 :
156 : // after recovery to t1: path1 is back, path2 has the old content
157 0 : let t_final = time_point().await;
158 0 : ctx.client
159 0 : .time_travel_recover(None, t1, t_final, &cancel)
160 0 : .await?;
161 0 : let t1_files_recovered = list_files(&ctx.client).await?;
162 0 : println!("after recovery to t1: {t1_files_recovered:?}");
163 0 : assert_eq!(t1_files, t1_files_recovered);
164 0 : let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2).await?).await?;
165 0 : assert_eq!(path2_recovered_t1, old_data.as_bytes());
166 :
167 : // after recovery to t0: everything is gone except for path1
168 0 : let t_final = time_point().await;
169 0 : ctx.client
170 0 : .time_travel_recover(None, t0, t_final, &cancel)
171 0 : .await?;
172 0 : let t0_files_recovered = list_files(&ctx.client).await?;
173 0 : println!("after recovery to t0: {t0_files_recovered:?}");
174 0 : assert_eq!(t0_files, t0_files_recovered);
175 :
176 : // cleanup
177 :
178 0 : let paths = &[path1, path2, path3];
179 0 : retry(|| ctx.client.delete_objects(paths)).await?;
180 :
181 0 : Ok(())
182 2 : }
183 :
184 : struct EnabledS3 {
185 : client: Arc<GenericRemoteStorage>,
186 : base_prefix: &'static str,
187 : }
188 :
189 : impl EnabledS3 {
190 0 : async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
191 0 : let client = create_s3_client(max_keys_in_list_response)
192 0 : .context("S3 client creation")
193 0 : .expect("S3 client creation failed");
194 0 :
195 0 : EnabledS3 {
196 0 : client,
197 0 : base_prefix: BASE_PREFIX,
198 0 : }
199 0 : }
200 : }
201 :
202 : enum MaybeEnabledStorage {
203 : Enabled(EnabledS3),
204 : Disabled,
205 : }
206 :
207 : #[async_trait::async_trait]
208 : impl AsyncTestContext for MaybeEnabledStorage {
209 10 : async fn setup() -> Self {
210 10 : ensure_logging_ready();
211 10 :
212 10 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
213 10 : info!(
214 10 : "`{}` env variable is not set, skipping the test",
215 10 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
216 10 : );
217 10 : return Self::Disabled;
218 0 : }
219 0 :
220 0 : Self::Enabled(EnabledS3::setup(None).await)
221 20 : }
222 : }
223 :
224 : enum MaybeEnabledStorageWithTestBlobs {
225 : Enabled(S3WithTestBlobs),
226 : Disabled,
227 : UploadsFailed(anyhow::Error, S3WithTestBlobs),
228 : }
229 :
230 : struct S3WithTestBlobs {
231 : enabled: EnabledS3,
232 : remote_prefixes: HashSet<RemotePath>,
233 : remote_blobs: HashSet<RemotePath>,
234 : }
235 :
236 : #[async_trait::async_trait]
237 : impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
238 2 : async fn setup() -> Self {
239 2 : ensure_logging_ready();
240 2 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
241 2 : info!(
242 2 : "`{}` env variable is not set, skipping the test",
243 2 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
244 2 : );
245 2 : return Self::Disabled;
246 0 : }
247 0 :
248 0 : let max_keys_in_list_response = 10;
249 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
250 :
251 0 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
252 :
253 0 : match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
254 0 : ControlFlow::Continue(uploads) => {
255 0 : info!("Remote objects created successfully");
256 :
257 0 : Self::Enabled(S3WithTestBlobs {
258 0 : enabled,
259 0 : remote_prefixes: uploads.prefixes,
260 0 : remote_blobs: uploads.blobs,
261 0 : })
262 : }
263 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
264 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
265 0 : S3WithTestBlobs {
266 0 : enabled,
267 0 : remote_prefixes: uploads.prefixes,
268 0 : remote_blobs: uploads.blobs,
269 0 : },
270 0 : ),
271 : }
272 4 : }
273 :
274 2 : async fn teardown(self) {
275 2 : match self {
276 2 : Self::Disabled => {}
277 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
278 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
279 : }
280 : }
281 2 : }
282 : }
283 :
284 : // NOTE: the setups for the list_prefixes test and the list_files test are very similar
285 : // However, they are not idential. The list_prefixes function is concerned with listing prefixes,
286 : // whereas the list_files function is concerned with listing files.
287 : // See `RemoteStorage::list_files` documentation for more details
288 : enum MaybeEnabledStorageWithSimpleTestBlobs {
289 : Enabled(S3WithSimpleTestBlobs),
290 : Disabled,
291 : UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
292 : }
293 : struct S3WithSimpleTestBlobs {
294 : enabled: EnabledS3,
295 : remote_blobs: HashSet<RemotePath>,
296 : }
297 :
298 : #[async_trait::async_trait]
299 : impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
300 2 : async fn setup() -> Self {
301 2 : ensure_logging_ready();
302 2 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
303 2 : info!(
304 2 : "`{}` env variable is not set, skipping the test",
305 2 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
306 2 : );
307 2 : return Self::Disabled;
308 0 : }
309 0 :
310 0 : let max_keys_in_list_response = 10;
311 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
312 :
313 0 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
314 :
315 0 : match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
316 0 : ControlFlow::Continue(uploads) => {
317 0 : info!("Remote objects created successfully");
318 :
319 0 : Self::Enabled(S3WithSimpleTestBlobs {
320 0 : enabled,
321 0 : remote_blobs: uploads,
322 0 : })
323 : }
324 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
325 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
326 0 : S3WithSimpleTestBlobs {
327 0 : enabled,
328 0 : remote_blobs: uploads,
329 0 : },
330 0 : ),
331 : }
332 4 : }
333 :
334 2 : async fn teardown(self) {
335 2 : match self {
336 2 : Self::Disabled => {}
337 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
338 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
339 : }
340 : }
341 2 : }
342 : }
343 :
344 0 : fn create_s3_client(
345 0 : max_keys_per_list_response: Option<i32>,
346 0 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
347 : use rand::Rng;
348 :
349 0 : let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
350 0 : .context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
351 0 : let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
352 0 : .context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?;
353 :
354 : // due to how time works, we've had test runners use the same nanos as bucket prefixes.
355 : // millis is just a debugging aid for easier finding the prefix later.
356 0 : let millis = std::time::SystemTime::now()
357 0 : .duration_since(UNIX_EPOCH)
358 0 : .context("random s3 test prefix part calculation")?
359 0 : .as_millis();
360 0 :
361 0 : // because nanos can be the same for two threads so can millis, add randomness
362 0 : let random = rand::thread_rng().gen::<u32>();
363 0 :
364 0 : let remote_storage_config = RemoteStorageConfig {
365 0 : storage: RemoteStorageKind::AwsS3(S3Config {
366 0 : bucket_name: remote_storage_s3_bucket,
367 0 : bucket_region: remote_storage_s3_region,
368 0 : prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
369 0 : endpoint: None,
370 0 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
371 0 : max_keys_per_list_response,
372 0 : }),
373 0 : };
374 0 : Ok(Arc::new(
375 0 : GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
376 : ))
377 0 : }
|