Line data Source code
1 : use std::env;
2 : use std::fmt::{Debug, Display};
3 : use std::future::Future;
4 : use std::num::NonZeroUsize;
5 : use std::ops::ControlFlow;
6 : use std::sync::Arc;
7 : use std::time::{Duration, UNIX_EPOCH};
8 : use std::{collections::HashSet, time::SystemTime};
9 :
10 : use crate::common::{download_to_vec, upload_stream};
11 : use anyhow::Context;
12 : use camino::Utf8Path;
13 : use futures_util::StreamExt;
14 : use remote_storage::{
15 : DownloadError, GenericRemoteStorage, ListingMode, RemotePath, RemoteStorageConfig,
16 : RemoteStorageKind, S3Config,
17 : };
18 : use test_context::test_context;
19 : use test_context::AsyncTestContext;
20 : use tokio::io::AsyncBufReadExt;
21 : use tokio_util::sync::CancellationToken;
22 : use tracing::info;
23 :
24 : mod common;
25 :
26 : #[path = "common/tests.rs"]
27 : mod tests_s3;
28 :
29 : use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
30 : use utils::backoff;
31 :
32 : const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
33 : const BASE_PREFIX: &str = "test";
34 :
35 3 : #[test_context(MaybeEnabledStorage)]
36 : #[tokio::test]
37 : async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
38 : let ctx = match ctx {
39 : MaybeEnabledStorage::Enabled(ctx) => ctx,
40 : MaybeEnabledStorage::Disabled => return Ok(()),
41 : };
42 : // Our test depends on discrepancies in the clock between S3 and the environment the tests
43 : // run in. Therefore, wait a little bit before and after. The alternative would be
44 : // to take the time from S3 response headers.
45 : const WAIT_TIME: Duration = Duration::from_millis(3_000);
46 :
47 26 : async fn retry<T, O, F, E>(op: O) -> Result<T, E>
48 26 : where
49 26 : E: Display + Debug + 'static,
50 26 : O: FnMut() -> F,
51 26 : F: Future<Output = Result<T, E>>,
52 26 : {
53 26 : let warn_threshold = 3;
54 26 : let max_retries = 10;
55 26 : backoff::retry(
56 26 : op,
57 26 : |_e| false,
58 26 : warn_threshold,
59 26 : max_retries,
60 26 : "test retry",
61 26 : &CancellationToken::new(),
62 26 : )
63 97 : .await
64 26 : .expect("never cancelled")
65 26 : }
66 :
67 12 : async fn time_point() -> SystemTime {
68 12 : tokio::time::sleep(WAIT_TIME).await;
69 12 : let ret = SystemTime::now();
70 12 : tokio::time::sleep(WAIT_TIME).await;
71 12 : ret
72 12 : }
73 :
74 12 : async fn list_files(
75 12 : client: &Arc<GenericRemoteStorage>,
76 12 : cancel: &CancellationToken,
77 12 : ) -> anyhow::Result<HashSet<RemotePath>> {
78 12 : Ok(
79 12 : retry(|| client.list(None, ListingMode::NoDelimiter, None, cancel))
80 56 : .await
81 12 : .context("list root files failure")?
82 : .keys
83 12 : .into_iter()
84 20 : .map(|o| o.key)
85 12 : .collect::<HashSet<_>>(),
86 12 : )
87 12 : }
88 :
89 : let cancel = CancellationToken::new();
90 :
91 : let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
92 0 : .with_context(|| "RemotePath conversion")?;
93 :
94 : let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
95 0 : .with_context(|| "RemotePath conversion")?;
96 :
97 : let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
98 0 : .with_context(|| "RemotePath conversion")?;
99 :
100 2 : retry(|| {
101 2 : let (data, len) = upload_stream("remote blob data1".as_bytes().into());
102 2 : ctx.client.upload(data, len, &path1, None, &cancel)
103 2 : })
104 : .await?;
105 :
106 : let t0_files = list_files(&ctx.client, &cancel).await?;
107 : let t0 = time_point().await;
108 : println!("at t0: {t0_files:?}");
109 :
110 : let old_data = "remote blob data2";
111 :
112 2 : retry(|| {
113 2 : let (data, len) = upload_stream(old_data.as_bytes().into());
114 2 : ctx.client.upload(data, len, &path2, None, &cancel)
115 2 : })
116 : .await?;
117 :
118 : let t1_files = list_files(&ctx.client, &cancel).await?;
119 : let t1 = time_point().await;
120 : println!("at t1: {t1_files:?}");
121 :
122 : // A little check to ensure that our clock is not too far off from the S3 clock
123 : {
124 2 : let dl = retry(|| ctx.client.download(&path2, &cancel)).await?;
125 : let last_modified = dl.last_modified;
126 : let half_wt = WAIT_TIME.mul_f32(0.5);
127 : let t0_hwt = t0 + half_wt;
128 : let t1_hwt = t1 - half_wt;
129 : if !(t0_hwt..=t1_hwt).contains(&last_modified) {
130 : panic!("last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \
131 : This likely means a large lock discrepancy between S3 and the local clock.");
132 : }
133 : }
134 :
135 2 : retry(|| {
136 2 : let (data, len) = upload_stream("remote blob data3".as_bytes().into());
137 2 : ctx.client.upload(data, len, &path3, None, &cancel)
138 2 : })
139 : .await?;
140 :
141 : let new_data = "new remote blob data2";
142 :
143 2 : retry(|| {
144 2 : let (data, len) = upload_stream(new_data.as_bytes().into());
145 2 : ctx.client.upload(data, len, &path2, None, &cancel)
146 2 : })
147 : .await?;
148 :
149 2 : retry(|| ctx.client.delete(&path1, &cancel)).await?;
150 : let t2_files = list_files(&ctx.client, &cancel).await?;
151 : let t2 = time_point().await;
152 : println!("at t2: {t2_files:?}");
153 :
154 : // No changes after recovery to t2 (no-op)
155 : let t_final = time_point().await;
156 : ctx.client
157 : .time_travel_recover(None, t2, t_final, &cancel)
158 : .await?;
159 : let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
160 : println!("after recovery to t2: {t2_files_recovered:?}");
161 : assert_eq!(t2_files, t2_files_recovered);
162 : let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
163 : assert_eq!(path2_recovered_t2, new_data.as_bytes());
164 :
165 : // after recovery to t1: path1 is back, path2 has the old content
166 : let t_final = time_point().await;
167 : ctx.client
168 : .time_travel_recover(None, t1, t_final, &cancel)
169 : .await?;
170 : let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
171 : println!("after recovery to t1: {t1_files_recovered:?}");
172 : assert_eq!(t1_files, t1_files_recovered);
173 : let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
174 : assert_eq!(path2_recovered_t1, old_data.as_bytes());
175 :
176 : // after recovery to t0: everything is gone except for path1
177 : let t_final = time_point().await;
178 : ctx.client
179 : .time_travel_recover(None, t0, t_final, &cancel)
180 : .await?;
181 : let t0_files_recovered = list_files(&ctx.client, &cancel).await?;
182 : println!("after recovery to t0: {t0_files_recovered:?}");
183 : assert_eq!(t0_files, t0_files_recovered);
184 :
185 : // cleanup
186 :
187 : let paths = &[path1, path2, path3];
188 2 : retry(|| ctx.client.delete_objects(paths, &cancel)).await?;
189 :
190 : Ok(())
191 : }
192 :
193 : struct EnabledS3 {
194 : client: Arc<GenericRemoteStorage>,
195 : base_prefix: &'static str,
196 : }
197 :
198 : impl EnabledS3 {
199 18 : async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
200 18 : let client = create_s3_client(max_keys_in_list_response)
201 0 : .await
202 18 : .context("S3 client creation")
203 18 : .expect("S3 client creation failed");
204 18 :
205 18 : EnabledS3 {
206 18 : client,
207 18 : base_prefix: BASE_PREFIX,
208 18 : }
209 18 : }
210 :
211 4 : fn configure_request_timeout(&mut self, timeout: Duration) {
212 4 : match Arc::get_mut(&mut self.client).expect("outer Arc::get_mut") {
213 4 : GenericRemoteStorage::AwsS3(s3) => {
214 4 : let s3 = Arc::get_mut(s3).expect("inner Arc::get_mut");
215 4 : s3.timeout = timeout;
216 4 : }
217 0 : _ => unreachable!(),
218 : }
219 4 : }
220 : }
221 :
222 : enum MaybeEnabledStorage {
223 : Enabled(EnabledS3),
224 : Disabled,
225 : }
226 :
227 : impl AsyncTestContext for MaybeEnabledStorage {
228 21 : async fn setup() -> Self {
229 21 : ensure_logging_ready();
230 21 :
231 21 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
232 7 : info!(
233 0 : "`{}` env variable is not set, skipping the test",
234 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
235 : );
236 7 : return Self::Disabled;
237 14 : }
238 14 :
239 14 : Self::Enabled(EnabledS3::setup(None).await)
240 21 : }
241 : }
242 :
243 : enum MaybeEnabledStorageWithTestBlobs {
244 : Enabled(S3WithTestBlobs),
245 : Disabled,
246 : UploadsFailed(anyhow::Error, S3WithTestBlobs),
247 : }
248 :
249 : struct S3WithTestBlobs {
250 : enabled: EnabledS3,
251 : remote_prefixes: HashSet<RemotePath>,
252 : remote_blobs: HashSet<RemotePath>,
253 : }
254 :
255 : impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
256 3 : async fn setup() -> Self {
257 3 : ensure_logging_ready();
258 3 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
259 1 : info!(
260 0 : "`{}` env variable is not set, skipping the test",
261 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
262 : );
263 1 : return Self::Disabled;
264 2 : }
265 2 :
266 2 : let max_keys_in_list_response = 10;
267 2 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
268 :
269 2 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
270 :
271 41 : match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
272 2 : ControlFlow::Continue(uploads) => {
273 2 : info!("Remote objects created successfully");
274 :
275 2 : Self::Enabled(S3WithTestBlobs {
276 2 : enabled,
277 2 : remote_prefixes: uploads.prefixes,
278 2 : remote_blobs: uploads.blobs,
279 2 : })
280 : }
281 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
282 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
283 0 : S3WithTestBlobs {
284 0 : enabled,
285 0 : remote_prefixes: uploads.prefixes,
286 0 : remote_blobs: uploads.blobs,
287 0 : },
288 0 : ),
289 : }
290 3 : }
291 :
292 3 : async fn teardown(self) {
293 3 : match self {
294 1 : Self::Disabled => {}
295 2 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
296 13 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
297 : }
298 : }
299 3 : }
300 : }
301 :
302 : enum MaybeEnabledStorageWithSimpleTestBlobs {
303 : Enabled(S3WithSimpleTestBlobs),
304 : Disabled,
305 : UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
306 : }
307 : struct S3WithSimpleTestBlobs {
308 : enabled: EnabledS3,
309 : remote_blobs: HashSet<RemotePath>,
310 : }
311 :
312 : impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
313 3 : async fn setup() -> Self {
314 3 : ensure_logging_ready();
315 3 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
316 1 : info!(
317 0 : "`{}` env variable is not set, skipping the test",
318 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
319 : );
320 1 : return Self::Disabled;
321 2 : }
322 2 :
323 2 : let max_keys_in_list_response = 10;
324 2 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
325 :
326 2 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
327 :
328 41 : match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
329 2 : ControlFlow::Continue(uploads) => {
330 2 : info!("Remote objects created successfully");
331 :
332 2 : Self::Enabled(S3WithSimpleTestBlobs {
333 2 : enabled,
334 2 : remote_blobs: uploads,
335 2 : })
336 : }
337 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
338 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
339 0 : S3WithSimpleTestBlobs {
340 0 : enabled,
341 0 : remote_blobs: uploads,
342 0 : },
343 0 : ),
344 : }
345 3 : }
346 :
347 3 : async fn teardown(self) {
348 3 : match self {
349 1 : Self::Disabled => {}
350 2 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
351 16 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
352 : }
353 : }
354 3 : }
355 : }
356 :
357 18 : async fn create_s3_client(
358 18 : max_keys_per_list_response: Option<i32>,
359 18 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
360 : use rand::Rng;
361 :
362 18 : let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
363 18 : .context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
364 18 : let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
365 18 : .context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?;
366 :
367 : // due to how time works, we've had test runners use the same nanos as bucket prefixes.
368 : // millis is just a debugging aid for easier finding the prefix later.
369 18 : let millis = std::time::SystemTime::now()
370 18 : .duration_since(UNIX_EPOCH)
371 18 : .context("random s3 test prefix part calculation")?
372 18 : .as_millis();
373 18 :
374 18 : // because nanos can be the same for two threads so can millis, add randomness
375 18 : let random = rand::thread_rng().gen::<u32>();
376 18 :
377 18 : let remote_storage_config = RemoteStorageConfig {
378 18 : storage: RemoteStorageKind::AwsS3(S3Config {
379 18 : bucket_name: remote_storage_s3_bucket,
380 18 : bucket_region: remote_storage_s3_region,
381 18 : prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
382 18 : endpoint: None,
383 18 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
384 18 : max_keys_per_list_response,
385 18 : upload_storage_class: None,
386 18 : }),
387 18 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
388 18 : };
389 18 : Ok(Arc::new(
390 18 : GenericRemoteStorage::from_config(&remote_storage_config)
391 0 : .await
392 18 : .context("remote storage init")?,
393 : ))
394 18 : }
395 :
396 3 : #[test_context(MaybeEnabledStorage)]
397 : #[tokio::test]
398 : async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
399 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
400 : return;
401 : };
402 :
403 : let cancel = CancellationToken::new();
404 :
405 : let path = RemotePath::new(Utf8Path::new(
406 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
407 : ))
408 : .unwrap();
409 :
410 : let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
411 :
412 : let timeout = std::time::Duration::from_secs(5);
413 :
414 : ctx.configure_request_timeout(timeout);
415 :
416 : let started_at = std::time::Instant::now();
417 : let mut stream = ctx
418 : .client
419 : .download(&path, &cancel)
420 : .await
421 : .expect("download succeeds")
422 : .download_stream;
423 :
424 : if started_at.elapsed().mul_f32(0.9) >= timeout {
425 : tracing::warn!(
426 : elapsed_ms = started_at.elapsed().as_millis(),
427 : "timeout might be too low, consumed most of it during headers"
428 : );
429 : }
430 :
431 : let first = stream
432 : .next()
433 : .await
434 : .expect("should have the first blob")
435 : .expect("should have succeeded");
436 :
437 : tracing::info!(len = first.len(), "downloaded first chunk");
438 :
439 : assert!(
440 : first.len() < len,
441 : "uploaded file is too small, we downloaded all on first chunk"
442 : );
443 :
444 : tokio::time::sleep(timeout).await;
445 :
446 : {
447 : let started_at = std::time::Instant::now();
448 : let next = stream
449 : .next()
450 : .await
451 : .expect("stream should not have ended yet");
452 :
453 : tracing::info!(
454 : next.is_err = next.is_err(),
455 : elapsed_ms = started_at.elapsed().as_millis(),
456 : "received item after timeout"
457 : );
458 :
459 : let e = next.expect_err("expected an error, but got a chunk?");
460 :
461 : let inner = e.get_ref().expect("std::io::Error::inner should be set");
462 : assert!(
463 : inner
464 : .downcast_ref::<DownloadError>()
465 2 : .is_some_and(|e| matches!(e, DownloadError::Timeout)),
466 : "{inner:?}"
467 : );
468 : }
469 :
470 : ctx.configure_request_timeout(RemoteStorageConfig::DEFAULT_TIMEOUT);
471 :
472 : ctx.client.delete_objects(&[path], &cancel).await.unwrap()
473 : }
474 :
475 3 : #[test_context(MaybeEnabledStorage)]
476 : #[tokio::test]
477 : async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
478 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
479 : return;
480 : };
481 :
482 : let cancel = CancellationToken::new();
483 :
484 : let path = RemotePath::new(Utf8Path::new(
485 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
486 : ))
487 : .unwrap();
488 :
489 : let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
490 :
491 : {
492 : let stream = ctx
493 : .client
494 : .download(&path, &cancel)
495 : .await
496 : .expect("download succeeds")
497 : .download_stream;
498 :
499 : let mut reader = std::pin::pin!(tokio_util::io::StreamReader::new(stream));
500 :
501 : let first = reader.fill_buf().await.expect("should have the first blob");
502 :
503 : let len = first.len();
504 : tracing::info!(len, "downloaded first chunk");
505 :
506 : assert!(
507 : first.len() < file_len,
508 : "uploaded file is too small, we downloaded all on first chunk"
509 : );
510 :
511 : reader.consume(len);
512 :
513 : cancel.cancel();
514 :
515 : let next = reader.fill_buf().await;
516 :
517 : let e = next.expect_err("expected an error, but got a chunk?");
518 :
519 : let inner = e.get_ref().expect("std::io::Error::inner should be set");
520 : assert!(
521 : inner
522 : .downcast_ref::<DownloadError>()
523 2 : .is_some_and(|e| matches!(e, DownloadError::Cancelled)),
524 : "{inner:?}"
525 : );
526 :
527 : let e = DownloadError::from(e);
528 :
529 : assert!(matches!(e, DownloadError::Cancelled), "{e:?}");
530 : }
531 :
532 : let cancel = CancellationToken::new();
533 :
534 : ctx.client.delete_objects(&[path], &cancel).await.unwrap();
535 : }
536 :
537 : /// Upload a long enough file so that we cannot download it in single chunk
538 : ///
539 : /// For s3 the first chunk seems to be less than 10kB, so this has a bit of a safety margin
540 4 : async fn upload_large_enough_file(
541 4 : client: &GenericRemoteStorage,
542 4 : path: &RemotePath,
543 4 : cancel: &CancellationToken,
544 4 : ) -> usize {
545 4 : let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
546 4 : let body = bytes::Bytes::from(vec![0u8; 1024]);
547 4 : let contents = std::iter::once(header).chain(std::iter::repeat(body).take(128));
548 4 :
549 516 : let len = contents.clone().fold(0, |acc, next| acc + next.len());
550 4 :
551 4 : let contents = futures::stream::iter(contents.map(std::io::Result::Ok));
552 4 :
553 4 : client
554 4 : .upload(contents, len, path, None, cancel)
555 29 : .await
556 4 : .expect("upload succeeds");
557 4 :
558 4 : len
559 4 : }
|