Line data Source code
1 : use std::collections::HashSet;
2 : use std::env;
3 : use std::fmt::{Debug, Display};
4 : use std::future::Future;
5 : use std::num::NonZeroUsize;
6 : use std::ops::ControlFlow;
7 : use std::sync::Arc;
8 : use std::time::{Duration, SystemTime, UNIX_EPOCH};
9 :
10 : use anyhow::Context;
11 : use camino::Utf8Path;
12 : use futures_util::StreamExt;
13 : use remote_storage::{
14 : DownloadError, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
15 : RemoteStorageConfig, RemoteStorageKind, S3Config,
16 : };
17 : use test_context::{AsyncTestContext, test_context};
18 : use tokio::io::AsyncBufReadExt;
19 : use tokio_util::sync::CancellationToken;
20 : use tracing::info;
21 :
22 : use crate::common::{download_to_vec, upload_stream};
23 :
24 : mod common;
25 :
26 : #[path = "common/tests.rs"]
27 : mod tests_s3;
28 :
29 : use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
30 : use utils::backoff;
31 :
32 : const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
33 : const BASE_PREFIX: &str = "test";
34 :
35 3 : #[test_context(MaybeEnabledStorage)]
36 : #[tokio::test]
37 : async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
38 : let ctx = match ctx {
39 : MaybeEnabledStorage::Enabled(ctx) => ctx,
40 : MaybeEnabledStorage::Disabled => return Ok(()),
41 : };
42 : // Our test depends on discrepancies in the clock between S3 and the environment the tests
43 : // run in. Therefore, wait a little bit before and after. The alternative would be
44 : // to take the time from S3 response headers.
45 : const WAIT_TIME: Duration = Duration::from_millis(3_000);
46 :
47 26 : async fn retry<T, O, F, E>(op: O) -> Result<T, E>
48 26 : where
49 26 : E: Display + Debug + 'static,
50 26 : O: FnMut() -> F,
51 26 : F: Future<Output = Result<T, E>>,
52 26 : {
53 26 : let warn_threshold = 3;
54 26 : let max_retries = 10;
55 26 : backoff::retry(
56 26 : op,
57 26 : |_e| false,
58 26 : warn_threshold,
59 26 : max_retries,
60 26 : "test retry",
61 26 : &CancellationToken::new(),
62 26 : )
63 26 : .await
64 26 : .expect("never cancelled")
65 26 : }
66 :
67 12 : async fn time_point() -> SystemTime {
68 12 : tokio::time::sleep(WAIT_TIME).await;
69 12 : let ret = SystemTime::now();
70 12 : tokio::time::sleep(WAIT_TIME).await;
71 12 : ret
72 12 : }
73 :
74 12 : async fn list_files(
75 12 : client: &Arc<GenericRemoteStorage>,
76 12 : cancel: &CancellationToken,
77 12 : ) -> anyhow::Result<HashSet<RemotePath>> {
78 12 : Ok(
79 12 : retry(|| client.list(None, ListingMode::NoDelimiter, None, cancel))
80 12 : .await
81 12 : .context("list root files failure")?
82 : .keys
83 12 : .into_iter()
84 20 : .map(|o| o.key)
85 12 : .collect::<HashSet<_>>(),
86 12 : )
87 12 : }
88 :
89 : let cancel = CancellationToken::new();
90 :
91 : let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
92 0 : .with_context(|| "RemotePath conversion")?;
93 :
94 : let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
95 0 : .with_context(|| "RemotePath conversion")?;
96 :
97 : let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
98 0 : .with_context(|| "RemotePath conversion")?;
99 :
100 2 : retry(|| {
101 2 : let (data, len) = upload_stream("remote blob data1".as_bytes().into());
102 2 : ctx.client.upload(data, len, &path1, None, &cancel)
103 2 : })
104 : .await?;
105 :
106 : let t0_files = list_files(&ctx.client, &cancel).await?;
107 : let t0 = time_point().await;
108 : println!("at t0: {t0_files:?}");
109 :
110 : let old_data = "remote blob data2";
111 :
112 3 : retry(|| {
113 3 : let (data, len) = upload_stream(old_data.as_bytes().into());
114 3 : ctx.client.upload(data, len, &path2, None, &cancel)
115 3 : })
116 : .await?;
117 :
118 : let t1_files = list_files(&ctx.client, &cancel).await?;
119 : let t1 = time_point().await;
120 : println!("at t1: {t1_files:?}");
121 :
122 : // A little check to ensure that our clock is not too far off from the S3 clock
123 : {
124 : let opts = DownloadOpts::default();
125 3 : let dl = retry(|| ctx.client.download(&path2, &opts, &cancel)).await?;
126 : let last_modified = dl.last_modified;
127 : let half_wt = WAIT_TIME.mul_f32(0.5);
128 : let t0_hwt = t0 + half_wt;
129 : let t1_hwt = t1 - half_wt;
130 : if !(t0_hwt..=t1_hwt).contains(&last_modified) {
131 : panic!(
132 : "last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \
133 : This likely means a large lock discrepancy between S3 and the local clock."
134 : );
135 : }
136 : }
137 :
138 2 : retry(|| {
139 2 : let (data, len) = upload_stream("remote blob data3".as_bytes().into());
140 2 : ctx.client.upload(data, len, &path3, None, &cancel)
141 2 : })
142 : .await?;
143 :
144 : let new_data = "new remote blob data2";
145 :
146 2 : retry(|| {
147 2 : let (data, len) = upload_stream(new_data.as_bytes().into());
148 2 : ctx.client.upload(data, len, &path2, None, &cancel)
149 2 : })
150 : .await?;
151 :
152 2 : retry(|| ctx.client.delete(&path1, &cancel)).await?;
153 : let t2_files = list_files(&ctx.client, &cancel).await?;
154 : let t2 = time_point().await;
155 : println!("at t2: {t2_files:?}");
156 :
157 : // No changes after recovery to t2 (no-op)
158 : let t_final = time_point().await;
159 : ctx.client
160 : .time_travel_recover(None, t2, t_final, &cancel)
161 : .await?;
162 : let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
163 : println!("after recovery to t2: {t2_files_recovered:?}");
164 : assert_eq!(t2_files, t2_files_recovered);
165 : let path2_recovered_t2 = download_to_vec(
166 : ctx.client
167 : .download(&path2, &DownloadOpts::default(), &cancel)
168 : .await?,
169 : )
170 : .await?;
171 : assert_eq!(path2_recovered_t2, new_data.as_bytes());
172 :
173 : // after recovery to t1: path1 is back, path2 has the old content
174 : let t_final = time_point().await;
175 : ctx.client
176 : .time_travel_recover(None, t1, t_final, &cancel)
177 : .await?;
178 : let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
179 : println!("after recovery to t1: {t1_files_recovered:?}");
180 : assert_eq!(t1_files, t1_files_recovered);
181 : let path2_recovered_t1 = download_to_vec(
182 : ctx.client
183 : .download(&path2, &DownloadOpts::default(), &cancel)
184 : .await?,
185 : )
186 : .await?;
187 : assert_eq!(path2_recovered_t1, old_data.as_bytes());
188 :
189 : // after recovery to t0: everything is gone except for path1
190 : let t_final = time_point().await;
191 : ctx.client
192 : .time_travel_recover(None, t0, t_final, &cancel)
193 : .await?;
194 : let t0_files_recovered = list_files(&ctx.client, &cancel).await?;
195 : println!("after recovery to t0: {t0_files_recovered:?}");
196 : assert_eq!(t0_files, t0_files_recovered);
197 :
198 : // cleanup
199 :
200 : let paths = &[path1, path2, path3];
201 2 : retry(|| ctx.client.delete_objects(paths, &cancel)).await?;
202 :
203 : Ok(())
204 : }
205 :
206 : struct EnabledS3 {
207 : client: Arc<GenericRemoteStorage>,
208 : base_prefix: &'static str,
209 : }
210 :
211 : impl EnabledS3 {
212 26 : async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
213 26 : let client = create_s3_client(max_keys_in_list_response)
214 26 : .await
215 26 : .context("S3 client creation")
216 26 : .expect("S3 client creation failed");
217 26 :
218 26 : EnabledS3 {
219 26 : client,
220 26 : base_prefix: BASE_PREFIX,
221 26 : }
222 26 : }
223 :
224 4 : fn configure_request_timeout(&mut self, timeout: Duration) {
225 4 : match Arc::get_mut(&mut self.client).expect("outer Arc::get_mut") {
226 4 : GenericRemoteStorage::AwsS3(s3) => {
227 4 : let s3 = Arc::get_mut(s3).expect("inner Arc::get_mut");
228 4 : s3.timeout = timeout;
229 4 : }
230 0 : _ => unreachable!(),
231 : }
232 4 : }
233 : }
234 :
235 : enum MaybeEnabledStorage {
236 : Enabled(EnabledS3),
237 : Disabled,
238 : }
239 :
240 : impl AsyncTestContext for MaybeEnabledStorage {
241 27 : async fn setup() -> Self {
242 27 : ensure_logging_ready();
243 27 :
244 27 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
245 9 : info!(
246 0 : "`{}` env variable is not set, skipping the test",
247 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
248 : );
249 9 : return Self::Disabled;
250 18 : }
251 18 :
252 18 : Self::Enabled(EnabledS3::setup(None).await)
253 27 : }
254 : }
255 :
256 : enum MaybeEnabledStorageWithTestBlobs {
257 : Enabled(S3WithTestBlobs),
258 : Disabled,
259 : UploadsFailed(anyhow::Error, S3WithTestBlobs),
260 : }
261 :
262 : struct S3WithTestBlobs {
263 : enabled: EnabledS3,
264 : remote_prefixes: HashSet<RemotePath>,
265 : remote_blobs: HashSet<RemotePath>,
266 : }
267 :
268 : impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
269 3 : async fn setup() -> Self {
270 3 : ensure_logging_ready();
271 3 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
272 1 : info!(
273 0 : "`{}` env variable is not set, skipping the test",
274 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
275 : );
276 1 : return Self::Disabled;
277 2 : }
278 2 :
279 2 : let max_keys_in_list_response = 10;
280 2 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
281 :
282 2 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
283 :
284 2 : match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
285 2 : ControlFlow::Continue(uploads) => {
286 2 : info!("Remote objects created successfully");
287 :
288 2 : Self::Enabled(S3WithTestBlobs {
289 2 : enabled,
290 2 : remote_prefixes: uploads.prefixes,
291 2 : remote_blobs: uploads.blobs,
292 2 : })
293 : }
294 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
295 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
296 0 : S3WithTestBlobs {
297 0 : enabled,
298 0 : remote_prefixes: uploads.prefixes,
299 0 : remote_blobs: uploads.blobs,
300 0 : },
301 0 : ),
302 : }
303 3 : }
304 :
305 3 : async fn teardown(self) {
306 3 : match self {
307 1 : Self::Disabled => {}
308 2 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
309 2 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
310 : }
311 : }
312 3 : }
313 : }
314 :
315 : enum MaybeEnabledStorageWithSimpleTestBlobs {
316 : Enabled(S3WithSimpleTestBlobs),
317 : Disabled,
318 : UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
319 : }
320 : struct S3WithSimpleTestBlobs {
321 : enabled: EnabledS3,
322 : remote_blobs: HashSet<RemotePath>,
323 : }
324 :
325 : impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
326 9 : async fn setup() -> Self {
327 9 : ensure_logging_ready();
328 9 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
329 3 : info!(
330 0 : "`{}` env variable is not set, skipping the test",
331 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
332 : );
333 3 : return Self::Disabled;
334 6 : }
335 6 :
336 6 : let max_keys_in_list_response = 10;
337 6 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
338 :
339 6 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
340 :
341 6 : match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
342 6 : ControlFlow::Continue(uploads) => {
343 6 : info!("Remote objects created successfully");
344 :
345 6 : Self::Enabled(S3WithSimpleTestBlobs {
346 6 : enabled,
347 6 : remote_blobs: uploads,
348 6 : })
349 : }
350 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
351 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
352 0 : S3WithSimpleTestBlobs {
353 0 : enabled,
354 0 : remote_blobs: uploads,
355 0 : },
356 0 : ),
357 : }
358 9 : }
359 :
360 9 : async fn teardown(self) {
361 9 : match self {
362 3 : Self::Disabled => {}
363 6 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
364 6 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
365 : }
366 : }
367 9 : }
368 : }
369 :
370 26 : async fn create_s3_client(
371 26 : max_keys_per_list_response: Option<i32>,
372 26 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
373 : use rand::Rng;
374 :
375 26 : let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
376 26 : .context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
377 26 : let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
378 26 : .context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?;
379 :
380 : // due to how time works, we've had test runners use the same nanos as bucket prefixes.
381 : // millis is just a debugging aid for easier finding the prefix later.
382 26 : let millis = std::time::SystemTime::now()
383 26 : .duration_since(UNIX_EPOCH)
384 26 : .context("random s3 test prefix part calculation")?
385 26 : .as_millis();
386 26 :
387 26 : // because nanos can be the same for two threads so can millis, add randomness
388 26 : let random = rand::thread_rng().r#gen::<u32>();
389 26 :
390 26 : let remote_storage_config = RemoteStorageConfig {
391 26 : storage: RemoteStorageKind::AwsS3(S3Config {
392 26 : bucket_name: remote_storage_s3_bucket,
393 26 : bucket_region: remote_storage_s3_region,
394 26 : prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
395 26 : endpoint: None,
396 26 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
397 26 : max_keys_per_list_response,
398 26 : upload_storage_class: None,
399 26 : }),
400 26 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
401 26 : small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
402 26 : };
403 26 : Ok(Arc::new(
404 26 : GenericRemoteStorage::from_config(&remote_storage_config)
405 26 : .await
406 26 : .context("remote storage init")?,
407 : ))
408 26 : }
409 :
410 3 : #[test_context(MaybeEnabledStorage)]
411 : #[tokio::test]
412 : async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
413 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
414 : return;
415 : };
416 :
417 : let cancel = CancellationToken::new();
418 :
419 : let path = RemotePath::new(Utf8Path::new(
420 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
421 : ))
422 : .unwrap();
423 :
424 : let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
425 :
426 : let timeout = std::time::Duration::from_secs(5);
427 :
428 : ctx.configure_request_timeout(timeout);
429 :
430 : let started_at = std::time::Instant::now();
431 : let mut stream = ctx
432 : .client
433 : .download(&path, &DownloadOpts::default(), &cancel)
434 : .await
435 : .expect("download succeeds")
436 : .download_stream;
437 :
438 : if started_at.elapsed().mul_f32(0.9) >= timeout {
439 : tracing::warn!(
440 : elapsed_ms = started_at.elapsed().as_millis(),
441 : "timeout might be too low, consumed most of it during headers"
442 : );
443 : }
444 :
445 : let first = stream
446 : .next()
447 : .await
448 : .expect("should have the first blob")
449 : .expect("should have succeeded");
450 :
451 : tracing::info!(len = first.len(), "downloaded first chunk");
452 :
453 : assert!(
454 : first.len() < len,
455 : "uploaded file is too small, we downloaded all on first chunk"
456 : );
457 :
458 : tokio::time::sleep(timeout).await;
459 :
460 : {
461 : let started_at = std::time::Instant::now();
462 : let next = stream
463 : .next()
464 : .await
465 : .expect("stream should not have ended yet");
466 :
467 : tracing::info!(
468 : next.is_err = next.is_err(),
469 : elapsed_ms = started_at.elapsed().as_millis(),
470 : "received item after timeout"
471 : );
472 :
473 : let e = next.expect_err("expected an error, but got a chunk?");
474 :
475 : let inner = e.get_ref().expect("std::io::Error::inner should be set");
476 : assert!(
477 : inner
478 : .downcast_ref::<DownloadError>()
479 2 : .is_some_and(|e| matches!(e, DownloadError::Timeout)),
480 : "{inner:?}"
481 : );
482 : }
483 :
484 : ctx.configure_request_timeout(RemoteStorageConfig::DEFAULT_TIMEOUT);
485 :
486 : ctx.client.delete_objects(&[path], &cancel).await.unwrap()
487 : }
488 :
489 3 : #[test_context(MaybeEnabledStorage)]
490 : #[tokio::test]
491 : async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
492 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
493 : return;
494 : };
495 :
496 : let cancel = CancellationToken::new();
497 :
498 : let path = RemotePath::new(Utf8Path::new(
499 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
500 : ))
501 : .unwrap();
502 :
503 : let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
504 :
505 : {
506 : let stream = ctx
507 : .client
508 : .download(&path, &DownloadOpts::default(), &cancel)
509 : .await
510 : .expect("download succeeds")
511 : .download_stream;
512 :
513 : let mut reader = std::pin::pin!(tokio_util::io::StreamReader::new(stream));
514 :
515 : let first = reader.fill_buf().await.expect("should have the first blob");
516 :
517 : let len = first.len();
518 : tracing::info!(len, "downloaded first chunk");
519 :
520 : assert!(
521 : first.len() < file_len,
522 : "uploaded file is too small, we downloaded all on first chunk"
523 : );
524 :
525 : reader.consume(len);
526 :
527 : cancel.cancel();
528 :
529 : let next = reader.fill_buf().await;
530 :
531 : let e = next.expect_err("expected an error, but got a chunk?");
532 :
533 : let inner = e.get_ref().expect("std::io::Error::inner should be set");
534 : assert!(
535 : inner
536 : .downcast_ref::<DownloadError>()
537 2 : .is_some_and(|e| matches!(e, DownloadError::Cancelled)),
538 : "{inner:?}"
539 : );
540 :
541 : let e = DownloadError::from(e);
542 :
543 : assert!(matches!(e, DownloadError::Cancelled), "{e:?}");
544 : }
545 :
546 : let cancel = CancellationToken::new();
547 :
548 : ctx.client.delete_objects(&[path], &cancel).await.unwrap();
549 : }
550 :
551 : /// Upload a long enough file so that we cannot download it in single chunk
552 : ///
553 : /// For s3 the first chunk seems to be less than 10kB, so this has a bit of a safety margin
554 4 : async fn upload_large_enough_file(
555 4 : client: &GenericRemoteStorage,
556 4 : path: &RemotePath,
557 4 : cancel: &CancellationToken,
558 4 : ) -> usize {
559 4 : let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
560 4 : let body = bytes::Bytes::from(vec![0u8; 1024]);
561 4 : let contents = std::iter::once(header).chain(std::iter::repeat(body).take(128));
562 4 :
563 516 : let len = contents.clone().fold(0, |acc, next| acc + next.len());
564 4 :
565 4 : let contents = futures::stream::iter(contents.map(std::io::Result::Ok));
566 4 :
567 4 : client
568 4 : .upload(contents, len, path, None, cancel)
569 4 : .await
570 4 : .expect("upload succeeds");
571 4 :
572 4 : len
573 4 : }
|