Line data Source code
1 : use std::env;
2 : use std::fmt::{Debug, Display};
3 : use std::future::Future;
4 : use std::num::NonZeroUsize;
5 : use std::ops::ControlFlow;
6 : use std::sync::Arc;
7 : use std::time::{Duration, UNIX_EPOCH};
8 : use std::{collections::HashSet, time::SystemTime};
9 :
10 : use crate::common::{download_to_vec, upload_stream};
11 : use anyhow::Context;
12 : use camino::Utf8Path;
13 : use futures_util::StreamExt;
14 : use remote_storage::{
15 : DownloadError, GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind,
16 : S3Config,
17 : };
18 : use test_context::test_context;
19 : use test_context::AsyncTestContext;
20 : use tokio_util::sync::CancellationToken;
21 : use tracing::info;
22 :
23 : mod common;
24 :
25 : #[path = "common/tests.rs"]
26 : mod tests_s3;
27 :
28 : use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
29 : use utils::backoff;
30 :
31 : const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
32 : const BASE_PREFIX: &str = "test";
33 :
34 2 : #[test_context(MaybeEnabledStorage)]
35 2 : #[tokio::test]
36 4 : async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
37 2 : let ctx = match ctx {
38 0 : MaybeEnabledStorage::Enabled(ctx) => ctx,
39 2 : MaybeEnabledStorage::Disabled => return Ok(()),
40 : };
41 : // Our test depends on discrepancies in the clock between S3 and the environment the tests
42 : // run in. Therefore, wait a little bit before and after. The alternative would be
43 : // to take the time from S3 response headers.
44 : const WAIT_TIME: Duration = Duration::from_millis(3_000);
45 :
46 0 : async fn retry<T, O, F, E>(op: O) -> Result<T, E>
47 0 : where
48 0 : E: Display + Debug + 'static,
49 0 : O: FnMut() -> F,
50 0 : F: Future<Output = Result<T, E>>,
51 0 : {
52 0 : let warn_threshold = 3;
53 0 : let max_retries = 10;
54 0 : backoff::retry(
55 0 : op,
56 0 : |_e| false,
57 0 : warn_threshold,
58 0 : max_retries,
59 0 : "test retry",
60 0 : &CancellationToken::new(),
61 0 : )
62 0 : .await
63 0 : .expect("never cancelled")
64 0 : }
65 :
66 0 : async fn time_point() -> SystemTime {
67 0 : tokio::time::sleep(WAIT_TIME).await;
68 0 : let ret = SystemTime::now();
69 0 : tokio::time::sleep(WAIT_TIME).await;
70 0 : ret
71 0 : }
72 :
73 0 : async fn list_files(
74 0 : client: &Arc<GenericRemoteStorage>,
75 0 : cancel: &CancellationToken,
76 0 : ) -> anyhow::Result<HashSet<RemotePath>> {
77 0 : Ok(retry(|| client.list_files(None, None, cancel))
78 0 : .await
79 0 : .context("list root files failure")?
80 0 : .into_iter()
81 0 : .collect::<HashSet<_>>())
82 0 : }
83 :
84 0 : let cancel = CancellationToken::new();
85 :
86 0 : let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
87 0 : .with_context(|| "RemotePath conversion")?;
88 :
89 0 : let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
90 0 : .with_context(|| "RemotePath conversion")?;
91 :
92 0 : let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
93 0 : .with_context(|| "RemotePath conversion")?;
94 :
95 0 : retry(|| {
96 0 : let (data, len) = upload_stream("remote blob data1".as_bytes().into());
97 0 : ctx.client.upload(data, len, &path1, None, &cancel)
98 0 : })
99 0 : .await?;
100 :
101 0 : let t0_files = list_files(&ctx.client, &cancel).await?;
102 0 : let t0 = time_point().await;
103 0 : println!("at t0: {t0_files:?}");
104 0 :
105 0 : let old_data = "remote blob data2";
106 0 :
107 0 : retry(|| {
108 0 : let (data, len) = upload_stream(old_data.as_bytes().into());
109 0 : ctx.client.upload(data, len, &path2, None, &cancel)
110 0 : })
111 0 : .await?;
112 :
113 0 : let t1_files = list_files(&ctx.client, &cancel).await?;
114 0 : let t1 = time_point().await;
115 0 : println!("at t1: {t1_files:?}");
116 :
117 : // A little check to ensure that our clock is not too far off from the S3 clock
118 : {
119 0 : let dl = retry(|| ctx.client.download(&path2, &cancel)).await?;
120 0 : let last_modified = dl.last_modified.unwrap();
121 0 : let half_wt = WAIT_TIME.mul_f32(0.5);
122 0 : let t0_hwt = t0 + half_wt;
123 0 : let t1_hwt = t1 - half_wt;
124 0 : if !(t0_hwt..=t1_hwt).contains(&last_modified) {
125 0 : panic!("last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \
126 0 : This likely means a large lock discrepancy between S3 and the local clock.");
127 0 : }
128 0 : }
129 0 :
130 0 : retry(|| {
131 0 : let (data, len) = upload_stream("remote blob data3".as_bytes().into());
132 0 : ctx.client.upload(data, len, &path3, None, &cancel)
133 0 : })
134 0 : .await?;
135 :
136 0 : let new_data = "new remote blob data2";
137 0 :
138 0 : retry(|| {
139 0 : let (data, len) = upload_stream(new_data.as_bytes().into());
140 0 : ctx.client.upload(data, len, &path2, None, &cancel)
141 0 : })
142 0 : .await?;
143 :
144 0 : retry(|| ctx.client.delete(&path1, &cancel)).await?;
145 0 : let t2_files = list_files(&ctx.client, &cancel).await?;
146 0 : let t2 = time_point().await;
147 0 : println!("at t2: {t2_files:?}");
148 :
149 : // No changes after recovery to t2 (no-op)
150 0 : let t_final = time_point().await;
151 0 : ctx.client
152 0 : .time_travel_recover(None, t2, t_final, &cancel)
153 0 : .await?;
154 0 : let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
155 0 : println!("after recovery to t2: {t2_files_recovered:?}");
156 0 : assert_eq!(t2_files, t2_files_recovered);
157 0 : let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
158 0 : assert_eq!(path2_recovered_t2, new_data.as_bytes());
159 :
160 : // after recovery to t1: path1 is back, path2 has the old content
161 0 : let t_final = time_point().await;
162 0 : ctx.client
163 0 : .time_travel_recover(None, t1, t_final, &cancel)
164 0 : .await?;
165 0 : let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
166 0 : println!("after recovery to t1: {t1_files_recovered:?}");
167 0 : assert_eq!(t1_files, t1_files_recovered);
168 0 : let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
169 0 : assert_eq!(path2_recovered_t1, old_data.as_bytes());
170 :
171 : // after recovery to t0: everything is gone except for path1
172 0 : let t_final = time_point().await;
173 0 : ctx.client
174 0 : .time_travel_recover(None, t0, t_final, &cancel)
175 0 : .await?;
176 0 : let t0_files_recovered = list_files(&ctx.client, &cancel).await?;
177 0 : println!("after recovery to t0: {t0_files_recovered:?}");
178 0 : assert_eq!(t0_files, t0_files_recovered);
179 :
180 : // cleanup
181 :
182 0 : let paths = &[path1, path2, path3];
183 0 : retry(|| ctx.client.delete_objects(paths, &cancel)).await?;
184 :
185 0 : Ok(())
186 2 : }
187 :
188 : struct EnabledS3 {
189 : client: Arc<GenericRemoteStorage>,
190 : base_prefix: &'static str,
191 : }
192 :
193 : impl EnabledS3 {
194 0 : async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
195 0 : let client = create_s3_client(max_keys_in_list_response)
196 0 : .context("S3 client creation")
197 0 : .expect("S3 client creation failed");
198 0 :
199 0 : EnabledS3 {
200 0 : client,
201 0 : base_prefix: BASE_PREFIX,
202 0 : }
203 0 : }
204 :
205 0 : fn configure_request_timeout(&mut self, timeout: Duration) {
206 0 : match Arc::get_mut(&mut self.client).expect("outer Arc::get_mut") {
207 0 : GenericRemoteStorage::AwsS3(s3) => {
208 0 : let s3 = Arc::get_mut(s3).expect("inner Arc::get_mut");
209 0 : s3.timeout = timeout;
210 0 : }
211 0 : _ => unreachable!(),
212 : }
213 0 : }
214 : }
215 :
216 : enum MaybeEnabledStorage {
217 : Enabled(EnabledS3),
218 : Disabled,
219 : }
220 :
221 : #[async_trait::async_trait]
222 : impl AsyncTestContext for MaybeEnabledStorage {
223 14 : async fn setup() -> Self {
224 14 : ensure_logging_ready();
225 14 :
226 14 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
227 14 : info!(
228 14 : "`{}` env variable is not set, skipping the test",
229 14 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
230 14 : );
231 14 : return Self::Disabled;
232 0 : }
233 0 :
234 0 : Self::Enabled(EnabledS3::setup(None).await)
235 42 : }
236 : }
237 :
238 : enum MaybeEnabledStorageWithTestBlobs {
239 : Enabled(S3WithTestBlobs),
240 : Disabled,
241 : UploadsFailed(anyhow::Error, S3WithTestBlobs),
242 : }
243 :
244 : struct S3WithTestBlobs {
245 : enabled: EnabledS3,
246 : remote_prefixes: HashSet<RemotePath>,
247 : remote_blobs: HashSet<RemotePath>,
248 : }
249 :
250 : #[async_trait::async_trait]
251 : impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
252 2 : async fn setup() -> Self {
253 2 : ensure_logging_ready();
254 2 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
255 2 : info!(
256 2 : "`{}` env variable is not set, skipping the test",
257 2 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
258 2 : );
259 2 : return Self::Disabled;
260 0 : }
261 0 :
262 0 : let max_keys_in_list_response = 10;
263 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
264 :
265 0 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
266 :
267 0 : match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
268 0 : ControlFlow::Continue(uploads) => {
269 0 : info!("Remote objects created successfully");
270 :
271 0 : Self::Enabled(S3WithTestBlobs {
272 0 : enabled,
273 0 : remote_prefixes: uploads.prefixes,
274 0 : remote_blobs: uploads.blobs,
275 0 : })
276 : }
277 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
278 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
279 0 : S3WithTestBlobs {
280 0 : enabled,
281 0 : remote_prefixes: uploads.prefixes,
282 0 : remote_blobs: uploads.blobs,
283 0 : },
284 0 : ),
285 : }
286 6 : }
287 :
288 2 : async fn teardown(self) {
289 2 : match self {
290 2 : Self::Disabled => {}
291 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
292 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
293 : }
294 : }
295 4 : }
296 : }
297 :
298 : // NOTE: the setups for the list_prefixes test and the list_files test are very similar
299 : // However, they are not idential. The list_prefixes function is concerned with listing prefixes,
300 : // whereas the list_files function is concerned with listing files.
301 : // See `RemoteStorage::list_files` documentation for more details
302 : enum MaybeEnabledStorageWithSimpleTestBlobs {
303 : Enabled(S3WithSimpleTestBlobs),
304 : Disabled,
305 : UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
306 : }
307 : struct S3WithSimpleTestBlobs {
308 : enabled: EnabledS3,
309 : remote_blobs: HashSet<RemotePath>,
310 : }
311 :
312 : #[async_trait::async_trait]
313 : impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
314 2 : async fn setup() -> Self {
315 2 : ensure_logging_ready();
316 2 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
317 2 : info!(
318 2 : "`{}` env variable is not set, skipping the test",
319 2 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
320 2 : );
321 2 : return Self::Disabled;
322 0 : }
323 0 :
324 0 : let max_keys_in_list_response = 10;
325 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
326 :
327 0 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
328 :
329 0 : match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
330 0 : ControlFlow::Continue(uploads) => {
331 0 : info!("Remote objects created successfully");
332 :
333 0 : Self::Enabled(S3WithSimpleTestBlobs {
334 0 : enabled,
335 0 : remote_blobs: uploads,
336 0 : })
337 : }
338 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
339 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
340 0 : S3WithSimpleTestBlobs {
341 0 : enabled,
342 0 : remote_blobs: uploads,
343 0 : },
344 0 : ),
345 : }
346 6 : }
347 :
348 2 : async fn teardown(self) {
349 2 : match self {
350 2 : Self::Disabled => {}
351 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
352 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
353 : }
354 : }
355 4 : }
356 : }
357 :
358 0 : fn create_s3_client(
359 0 : max_keys_per_list_response: Option<i32>,
360 0 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
361 : use rand::Rng;
362 :
363 0 : let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
364 0 : .context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
365 0 : let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
366 0 : .context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?;
367 :
368 : // due to how time works, we've had test runners use the same nanos as bucket prefixes.
369 : // millis is just a debugging aid for easier finding the prefix later.
370 0 : let millis = std::time::SystemTime::now()
371 0 : .duration_since(UNIX_EPOCH)
372 0 : .context("random s3 test prefix part calculation")?
373 0 : .as_millis();
374 0 :
375 0 : // because nanos can be the same for two threads so can millis, add randomness
376 0 : let random = rand::thread_rng().gen::<u32>();
377 0 :
378 0 : let remote_storage_config = RemoteStorageConfig {
379 0 : storage: RemoteStorageKind::AwsS3(S3Config {
380 0 : bucket_name: remote_storage_s3_bucket,
381 0 : bucket_region: remote_storage_s3_region,
382 0 : prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
383 0 : endpoint: None,
384 0 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
385 0 : max_keys_per_list_response,
386 0 : }),
387 0 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
388 0 : };
389 0 : Ok(Arc::new(
390 0 : GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
391 : ))
392 0 : }
393 :
394 2 : #[test_context(MaybeEnabledStorage)]
395 2 : #[tokio::test]
396 4 : async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
397 2 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
398 2 : return;
399 : };
400 :
401 0 : let cancel = CancellationToken::new();
402 0 :
403 0 : let path = RemotePath::new(Utf8Path::new(
404 0 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
405 0 : ))
406 0 : .unwrap();
407 :
408 0 : let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
409 :
410 0 : let timeout = std::time::Duration::from_secs(5);
411 0 :
412 0 : ctx.configure_request_timeout(timeout);
413 0 :
414 0 : let started_at = std::time::Instant::now();
415 0 : let mut stream = ctx
416 0 : .client
417 0 : .download(&path, &cancel)
418 0 : .await
419 0 : .expect("download succeeds")
420 0 : .download_stream;
421 0 :
422 0 : if started_at.elapsed().mul_f32(0.9) >= timeout {
423 0 : tracing::warn!(
424 0 : elapsed_ms = started_at.elapsed().as_millis(),
425 0 : "timeout might be too low, consumed most of it during headers"
426 0 : );
427 0 : }
428 :
429 0 : let first = stream
430 0 : .next()
431 0 : .await
432 0 : .expect("should have the first blob")
433 0 : .expect("should have succeeded");
434 :
435 0 : tracing::info!(len = first.len(), "downloaded first chunk");
436 :
437 0 : assert!(
438 0 : first.len() < len,
439 0 : "uploaded file is too small, we downloaded all on first chunk"
440 : );
441 :
442 0 : tokio::time::sleep(timeout).await;
443 :
444 : {
445 0 : let started_at = std::time::Instant::now();
446 0 : let next = stream
447 0 : .next()
448 0 : .await
449 0 : .expect("stream should not have ended yet");
450 :
451 0 : tracing::info!(
452 0 : next.is_err = next.is_err(),
453 0 : elapsed_ms = started_at.elapsed().as_millis(),
454 0 : "received item after timeout"
455 0 : );
456 :
457 0 : let e = next.expect_err("expected an error, but got a chunk?");
458 0 :
459 0 : let inner = e.get_ref().expect("std::io::Error::inner should be set");
460 0 : assert!(
461 0 : inner
462 0 : .downcast_ref::<DownloadError>()
463 0 : .is_some_and(|e| matches!(e, DownloadError::Timeout)),
464 0 : "{inner:?}"
465 : );
466 : }
467 :
468 0 : ctx.configure_request_timeout(RemoteStorageConfig::DEFAULT_TIMEOUT);
469 0 :
470 0 : ctx.client.delete_objects(&[path], &cancel).await.unwrap()
471 2 : }
472 :
473 2 : #[test_context(MaybeEnabledStorage)]
474 2 : #[tokio::test]
475 4 : async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
476 2 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
477 2 : return;
478 : };
479 :
480 0 : let cancel = CancellationToken::new();
481 0 :
482 0 : let path = RemotePath::new(Utf8Path::new(
483 0 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
484 0 : ))
485 0 : .unwrap();
486 :
487 0 : let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
488 :
489 : {
490 0 : let mut stream = ctx
491 0 : .client
492 0 : .download(&path, &cancel)
493 0 : .await
494 0 : .expect("download succeeds")
495 : .download_stream;
496 :
497 0 : let first = stream
498 0 : .next()
499 0 : .await
500 0 : .expect("should have the first blob")
501 0 : .expect("should have succeeded");
502 :
503 0 : tracing::info!(len = first.len(), "downloaded first chunk");
504 :
505 0 : assert!(
506 0 : first.len() < len,
507 0 : "uploaded file is too small, we downloaded all on first chunk"
508 : );
509 :
510 0 : cancel.cancel();
511 :
512 0 : let next = stream.next().await.expect("stream should have more");
513 0 :
514 0 : let e = next.expect_err("expected an error, but got a chunk?");
515 0 :
516 0 : let inner = e.get_ref().expect("std::io::Error::inner should be set");
517 0 : assert!(
518 0 : inner
519 0 : .downcast_ref::<DownloadError>()
520 0 : .is_some_and(|e| matches!(e, DownloadError::Cancelled)),
521 0 : "{inner:?}"
522 : );
523 : }
524 :
525 0 : let cancel = CancellationToken::new();
526 0 :
527 0 : ctx.client.delete_objects(&[path], &cancel).await.unwrap();
528 2 : }
529 :
530 : /// Upload a long enough file so that we cannot download it in single chunk
531 : ///
532 : /// For s3 the first chunk seems to be less than 10kB, so this has a bit of a safety margin
533 0 : async fn upload_large_enough_file(
534 0 : client: &GenericRemoteStorage,
535 0 : path: &RemotePath,
536 0 : cancel: &CancellationToken,
537 0 : ) -> usize {
538 0 : let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
539 0 : let body = bytes::Bytes::from(vec![0u8; 1024]);
540 0 : let contents = std::iter::once(header).chain(std::iter::repeat(body).take(128));
541 0 :
542 0 : let len = contents.clone().fold(0, |acc, next| acc + next.len());
543 0 :
544 0 : let contents = futures::stream::iter(contents.map(std::io::Result::Ok));
545 0 :
546 0 : client
547 0 : .upload(contents, len, path, None, cancel)
548 0 : .await
549 0 : .expect("upload succeeds");
550 0 :
551 0 : len
552 0 : }
|