Line data Source code
1 : use std::env;
2 : use std::fmt::{Debug, Display};
3 : use std::future::Future;
4 : use std::num::NonZeroUsize;
5 : use std::ops::ControlFlow;
6 : use std::sync::Arc;
7 : use std::time::{Duration, UNIX_EPOCH};
8 : use std::{collections::HashSet, time::SystemTime};
9 :
10 : use crate::common::{download_to_vec, upload_stream};
11 : use anyhow::Context;
12 : use camino::Utf8Path;
13 : use futures_util::StreamExt;
14 : use remote_storage::{
15 : DownloadError, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
16 : RemoteStorageConfig, RemoteStorageKind, S3Config,
17 : };
18 : use test_context::test_context;
19 : use test_context::AsyncTestContext;
20 : use tokio::io::AsyncBufReadExt;
21 : use tokio_util::sync::CancellationToken;
22 : use tracing::info;
23 :
24 : mod common;
25 :
26 : #[path = "common/tests.rs"]
27 : mod tests_s3;
28 :
29 : use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
30 : use utils::backoff;
31 :
32 : const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
33 : const BASE_PREFIX: &str = "test";
34 :
35 3 : #[test_context(MaybeEnabledStorage)]
36 : #[tokio::test]
37 : async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
38 : let ctx = match ctx {
39 : MaybeEnabledStorage::Enabled(ctx) => ctx,
40 : MaybeEnabledStorage::Disabled => return Ok(()),
41 : };
42 : // Our test depends on discrepancies in the clock between S3 and the environment the tests
43 : // run in. Therefore, wait a little bit before and after. The alternative would be
44 : // to take the time from S3 response headers.
45 : const WAIT_TIME: Duration = Duration::from_millis(3_000);
46 :
47 26 : async fn retry<T, O, F, E>(op: O) -> Result<T, E>
48 26 : where
49 26 : E: Display + Debug + 'static,
50 26 : O: FnMut() -> F,
51 26 : F: Future<Output = Result<T, E>>,
52 26 : {
53 26 : let warn_threshold = 3;
54 26 : let max_retries = 10;
55 26 : backoff::retry(
56 26 : op,
57 26 : |_e| false,
58 26 : warn_threshold,
59 26 : max_retries,
60 26 : "test retry",
61 26 : &CancellationToken::new(),
62 26 : )
63 85 : .await
64 26 : .expect("never cancelled")
65 26 : }
66 :
67 12 : async fn time_point() -> SystemTime {
68 12 : tokio::time::sleep(WAIT_TIME).await;
69 12 : let ret = SystemTime::now();
70 12 : tokio::time::sleep(WAIT_TIME).await;
71 12 : ret
72 12 : }
73 :
74 12 : async fn list_files(
75 12 : client: &Arc<GenericRemoteStorage>,
76 12 : cancel: &CancellationToken,
77 12 : ) -> anyhow::Result<HashSet<RemotePath>> {
78 12 : Ok(
79 12 : retry(|| client.list(None, ListingMode::NoDelimiter, None, cancel))
80 34 : .await
81 12 : .context("list root files failure")?
82 : .keys
83 12 : .into_iter()
84 20 : .map(|o| o.key)
85 12 : .collect::<HashSet<_>>(),
86 12 : )
87 12 : }
88 :
89 : let cancel = CancellationToken::new();
90 :
91 : let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
92 0 : .with_context(|| "RemotePath conversion")?;
93 :
94 : let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
95 0 : .with_context(|| "RemotePath conversion")?;
96 :
97 : let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
98 0 : .with_context(|| "RemotePath conversion")?;
99 :
100 2 : retry(|| {
101 2 : let (data, len) = upload_stream("remote blob data1".as_bytes().into());
102 2 : ctx.client.upload(data, len, &path1, None, &cancel)
103 2 : })
104 : .await?;
105 :
106 : let t0_files = list_files(&ctx.client, &cancel).await?;
107 : let t0 = time_point().await;
108 : println!("at t0: {t0_files:?}");
109 :
110 : let old_data = "remote blob data2";
111 :
112 2 : retry(|| {
113 2 : let (data, len) = upload_stream(old_data.as_bytes().into());
114 2 : ctx.client.upload(data, len, &path2, None, &cancel)
115 2 : })
116 : .await?;
117 :
118 : let t1_files = list_files(&ctx.client, &cancel).await?;
119 : let t1 = time_point().await;
120 : println!("at t1: {t1_files:?}");
121 :
122 : // A little check to ensure that our clock is not too far off from the S3 clock
123 : {
124 : let opts = DownloadOpts::default();
125 4 : let dl = retry(|| ctx.client.download(&path2, &opts, &cancel)).await?;
126 : let last_modified = dl.last_modified;
127 : let half_wt = WAIT_TIME.mul_f32(0.5);
128 : let t0_hwt = t0 + half_wt;
129 : let t1_hwt = t1 - half_wt;
130 : if !(t0_hwt..=t1_hwt).contains(&last_modified) {
131 : panic!("last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \
132 : This likely means a large lock discrepancy between S3 and the local clock.");
133 : }
134 : }
135 :
136 2 : retry(|| {
137 2 : let (data, len) = upload_stream("remote blob data3".as_bytes().into());
138 2 : ctx.client.upload(data, len, &path3, None, &cancel)
139 2 : })
140 : .await?;
141 :
142 : let new_data = "new remote blob data2";
143 :
144 2 : retry(|| {
145 2 : let (data, len) = upload_stream(new_data.as_bytes().into());
146 2 : ctx.client.upload(data, len, &path2, None, &cancel)
147 2 : })
148 : .await?;
149 :
150 2 : retry(|| ctx.client.delete(&path1, &cancel)).await?;
151 : let t2_files = list_files(&ctx.client, &cancel).await?;
152 : let t2 = time_point().await;
153 : println!("at t2: {t2_files:?}");
154 :
155 : // No changes after recovery to t2 (no-op)
156 : let t_final = time_point().await;
157 : ctx.client
158 : .time_travel_recover(None, t2, t_final, &cancel)
159 : .await?;
160 : let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
161 : println!("after recovery to t2: {t2_files_recovered:?}");
162 : assert_eq!(t2_files, t2_files_recovered);
163 : let path2_recovered_t2 = download_to_vec(
164 : ctx.client
165 : .download(&path2, &DownloadOpts::default(), &cancel)
166 : .await?,
167 : )
168 : .await?;
169 : assert_eq!(path2_recovered_t2, new_data.as_bytes());
170 :
171 : // after recovery to t1: path1 is back, path2 has the old content
172 : let t_final = time_point().await;
173 : ctx.client
174 : .time_travel_recover(None, t1, t_final, &cancel)
175 : .await?;
176 : let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
177 : println!("after recovery to t1: {t1_files_recovered:?}");
178 : assert_eq!(t1_files, t1_files_recovered);
179 : let path2_recovered_t1 = download_to_vec(
180 : ctx.client
181 : .download(&path2, &DownloadOpts::default(), &cancel)
182 : .await?,
183 : )
184 : .await?;
185 : assert_eq!(path2_recovered_t1, old_data.as_bytes());
186 :
187 : // after recovery to t0: everything is gone except for path1
188 : let t_final = time_point().await;
189 : ctx.client
190 : .time_travel_recover(None, t0, t_final, &cancel)
191 : .await?;
192 : let t0_files_recovered = list_files(&ctx.client, &cancel).await?;
193 : println!("after recovery to t0: {t0_files_recovered:?}");
194 : assert_eq!(t0_files, t0_files_recovered);
195 :
196 : // cleanup
197 :
198 : let paths = &[path1, path2, path3];
199 2 : retry(|| ctx.client.delete_objects(paths, &cancel)).await?;
200 :
201 : Ok(())
202 : }
203 :
204 : struct EnabledS3 {
205 : client: Arc<GenericRemoteStorage>,
206 : base_prefix: &'static str,
207 : }
208 :
209 : impl EnabledS3 {
210 26 : async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
211 26 : let client = create_s3_client(max_keys_in_list_response)
212 0 : .await
213 26 : .context("S3 client creation")
214 26 : .expect("S3 client creation failed");
215 26 :
216 26 : EnabledS3 {
217 26 : client,
218 26 : base_prefix: BASE_PREFIX,
219 26 : }
220 26 : }
221 :
222 4 : fn configure_request_timeout(&mut self, timeout: Duration) {
223 4 : match Arc::get_mut(&mut self.client).expect("outer Arc::get_mut") {
224 4 : GenericRemoteStorage::AwsS3(s3) => {
225 4 : let s3 = Arc::get_mut(s3).expect("inner Arc::get_mut");
226 4 : s3.timeout = timeout;
227 4 : }
228 0 : _ => unreachable!(),
229 : }
230 4 : }
231 : }
232 :
233 : enum MaybeEnabledStorage {
234 : Enabled(EnabledS3),
235 : Disabled,
236 : }
237 :
238 : impl AsyncTestContext for MaybeEnabledStorage {
239 27 : async fn setup() -> Self {
240 27 : ensure_logging_ready();
241 27 :
242 27 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
243 9 : info!(
244 0 : "`{}` env variable is not set, skipping the test",
245 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
246 : );
247 9 : return Self::Disabled;
248 18 : }
249 18 :
250 18 : Self::Enabled(EnabledS3::setup(None).await)
251 27 : }
252 : }
253 :
254 : enum MaybeEnabledStorageWithTestBlobs {
255 : Enabled(S3WithTestBlobs),
256 : Disabled,
257 : UploadsFailed(anyhow::Error, S3WithTestBlobs),
258 : }
259 :
260 : struct S3WithTestBlobs {
261 : enabled: EnabledS3,
262 : remote_prefixes: HashSet<RemotePath>,
263 : remote_blobs: HashSet<RemotePath>,
264 : }
265 :
266 : impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
267 3 : async fn setup() -> Self {
268 3 : ensure_logging_ready();
269 3 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
270 1 : info!(
271 0 : "`{}` env variable is not set, skipping the test",
272 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
273 : );
274 1 : return Self::Disabled;
275 2 : }
276 2 :
277 2 : let max_keys_in_list_response = 10;
278 2 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
279 :
280 2 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
281 :
282 42 : match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
283 2 : ControlFlow::Continue(uploads) => {
284 2 : info!("Remote objects created successfully");
285 :
286 2 : Self::Enabled(S3WithTestBlobs {
287 2 : enabled,
288 2 : remote_prefixes: uploads.prefixes,
289 2 : remote_blobs: uploads.blobs,
290 2 : })
291 : }
292 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
293 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
294 0 : S3WithTestBlobs {
295 0 : enabled,
296 0 : remote_prefixes: uploads.prefixes,
297 0 : remote_blobs: uploads.blobs,
298 0 : },
299 0 : ),
300 : }
301 3 : }
302 :
303 3 : async fn teardown(self) {
304 3 : match self {
305 1 : Self::Disabled => {}
306 2 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
307 12 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
308 : }
309 : }
310 3 : }
311 : }
312 :
313 : enum MaybeEnabledStorageWithSimpleTestBlobs {
314 : Enabled(S3WithSimpleTestBlobs),
315 : Disabled,
316 : UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
317 : }
318 : struct S3WithSimpleTestBlobs {
319 : enabled: EnabledS3,
320 : remote_blobs: HashSet<RemotePath>,
321 : }
322 :
323 : impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
324 9 : async fn setup() -> Self {
325 9 : ensure_logging_ready();
326 9 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
327 3 : info!(
328 0 : "`{}` env variable is not set, skipping the test",
329 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
330 : );
331 3 : return Self::Disabled;
332 6 : }
333 6 :
334 6 : let max_keys_in_list_response = 10;
335 6 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
336 :
337 6 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
338 :
339 117 : match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
340 6 : ControlFlow::Continue(uploads) => {
341 6 : info!("Remote objects created successfully");
342 :
343 6 : Self::Enabled(S3WithSimpleTestBlobs {
344 6 : enabled,
345 6 : remote_blobs: uploads,
346 6 : })
347 : }
348 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
349 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
350 0 : S3WithSimpleTestBlobs {
351 0 : enabled,
352 0 : remote_blobs: uploads,
353 0 : },
354 0 : ),
355 : }
356 9 : }
357 :
358 9 : async fn teardown(self) {
359 9 : match self {
360 3 : Self::Disabled => {}
361 6 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
362 39 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
363 : }
364 : }
365 9 : }
366 : }
367 :
368 26 : async fn create_s3_client(
369 26 : max_keys_per_list_response: Option<i32>,
370 26 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
371 : use rand::Rng;
372 :
373 26 : let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
374 26 : .context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
375 26 : let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
376 26 : .context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?;
377 :
378 : // due to how time works, we've had test runners use the same nanos as bucket prefixes.
379 : // millis is just a debugging aid for easier finding the prefix later.
380 26 : let millis = std::time::SystemTime::now()
381 26 : .duration_since(UNIX_EPOCH)
382 26 : .context("random s3 test prefix part calculation")?
383 26 : .as_millis();
384 26 :
385 26 : // because nanos can be the same for two threads so can millis, add randomness
386 26 : let random = rand::thread_rng().gen::<u32>();
387 26 :
388 26 : let remote_storage_config = RemoteStorageConfig {
389 26 : storage: RemoteStorageKind::AwsS3(S3Config {
390 26 : bucket_name: remote_storage_s3_bucket,
391 26 : bucket_region: remote_storage_s3_region,
392 26 : prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
393 26 : endpoint: None,
394 26 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
395 26 : max_keys_per_list_response,
396 26 : upload_storage_class: None,
397 26 : }),
398 26 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
399 26 : };
400 26 : Ok(Arc::new(
401 26 : GenericRemoteStorage::from_config(&remote_storage_config)
402 0 : .await
403 26 : .context("remote storage init")?,
404 : ))
405 26 : }
406 :
407 3 : #[test_context(MaybeEnabledStorage)]
408 : #[tokio::test]
409 : async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
410 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
411 : return;
412 : };
413 :
414 : let cancel = CancellationToken::new();
415 :
416 : let path = RemotePath::new(Utf8Path::new(
417 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
418 : ))
419 : .unwrap();
420 :
421 : let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
422 :
423 : let timeout = std::time::Duration::from_secs(5);
424 :
425 : ctx.configure_request_timeout(timeout);
426 :
427 : let started_at = std::time::Instant::now();
428 : let mut stream = ctx
429 : .client
430 : .download(&path, &DownloadOpts::default(), &cancel)
431 : .await
432 : .expect("download succeeds")
433 : .download_stream;
434 :
435 : if started_at.elapsed().mul_f32(0.9) >= timeout {
436 : tracing::warn!(
437 : elapsed_ms = started_at.elapsed().as_millis(),
438 : "timeout might be too low, consumed most of it during headers"
439 : );
440 : }
441 :
442 : let first = stream
443 : .next()
444 : .await
445 : .expect("should have the first blob")
446 : .expect("should have succeeded");
447 :
448 : tracing::info!(len = first.len(), "downloaded first chunk");
449 :
450 : assert!(
451 : first.len() < len,
452 : "uploaded file is too small, we downloaded all on first chunk"
453 : );
454 :
455 : tokio::time::sleep(timeout).await;
456 :
457 : {
458 : let started_at = std::time::Instant::now();
459 : let next = stream
460 : .next()
461 : .await
462 : .expect("stream should not have ended yet");
463 :
464 : tracing::info!(
465 : next.is_err = next.is_err(),
466 : elapsed_ms = started_at.elapsed().as_millis(),
467 : "received item after timeout"
468 : );
469 :
470 : let e = next.expect_err("expected an error, but got a chunk?");
471 :
472 : let inner = e.get_ref().expect("std::io::Error::inner should be set");
473 : assert!(
474 : inner
475 : .downcast_ref::<DownloadError>()
476 2 : .is_some_and(|e| matches!(e, DownloadError::Timeout)),
477 : "{inner:?}"
478 : );
479 : }
480 :
481 : ctx.configure_request_timeout(RemoteStorageConfig::DEFAULT_TIMEOUT);
482 :
483 : ctx.client.delete_objects(&[path], &cancel).await.unwrap()
484 : }
485 :
486 3 : #[test_context(MaybeEnabledStorage)]
487 : #[tokio::test]
488 : async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
489 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
490 : return;
491 : };
492 :
493 : let cancel = CancellationToken::new();
494 :
495 : let path = RemotePath::new(Utf8Path::new(
496 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
497 : ))
498 : .unwrap();
499 :
500 : let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
501 :
502 : {
503 : let stream = ctx
504 : .client
505 : .download(&path, &DownloadOpts::default(), &cancel)
506 : .await
507 : .expect("download succeeds")
508 : .download_stream;
509 :
510 : let mut reader = std::pin::pin!(tokio_util::io::StreamReader::new(stream));
511 :
512 : let first = reader.fill_buf().await.expect("should have the first blob");
513 :
514 : let len = first.len();
515 : tracing::info!(len, "downloaded first chunk");
516 :
517 : assert!(
518 : first.len() < file_len,
519 : "uploaded file is too small, we downloaded all on first chunk"
520 : );
521 :
522 : reader.consume(len);
523 :
524 : cancel.cancel();
525 :
526 : let next = reader.fill_buf().await;
527 :
528 : let e = next.expect_err("expected an error, but got a chunk?");
529 :
530 : let inner = e.get_ref().expect("std::io::Error::inner should be set");
531 : assert!(
532 : inner
533 : .downcast_ref::<DownloadError>()
534 2 : .is_some_and(|e| matches!(e, DownloadError::Cancelled)),
535 : "{inner:?}"
536 : );
537 :
538 : let e = DownloadError::from(e);
539 :
540 : assert!(matches!(e, DownloadError::Cancelled), "{e:?}");
541 : }
542 :
543 : let cancel = CancellationToken::new();
544 :
545 : ctx.client.delete_objects(&[path], &cancel).await.unwrap();
546 : }
547 :
548 : /// Upload a long enough file so that we cannot download it in single chunk
549 : ///
550 : /// For s3 the first chunk seems to be less than 10kB, so this has a bit of a safety margin
551 4 : async fn upload_large_enough_file(
552 4 : client: &GenericRemoteStorage,
553 4 : path: &RemotePath,
554 4 : cancel: &CancellationToken,
555 4 : ) -> usize {
556 4 : let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
557 4 : let body = bytes::Bytes::from(vec![0u8; 1024]);
558 4 : let contents = std::iter::once(header).chain(std::iter::repeat(body).take(128));
559 4 :
560 516 : let len = contents.clone().fold(0, |acc, next| acc + next.len());
561 4 :
562 4 : let contents = futures::stream::iter(contents.map(std::io::Result::Ok));
563 4 :
564 4 : client
565 4 : .upload(contents, len, path, None, cancel)
566 25 : .await
567 4 : .expect("upload succeeds");
568 4 :
569 4 : len
570 4 : }
|