Line data Source code
1 : use std::env;
2 : use std::fmt::{Debug, Display};
3 : use std::future::Future;
4 : use std::num::NonZeroUsize;
5 : use std::ops::ControlFlow;
6 : use std::sync::Arc;
7 : use std::time::{Duration, UNIX_EPOCH};
8 : use std::{collections::HashSet, time::SystemTime};
9 :
10 : use crate::common::{download_to_vec, upload_stream};
11 : use anyhow::Context;
12 : use camino::Utf8Path;
13 : use futures_util::StreamExt;
14 : use remote_storage::{
15 : DownloadError, GenericRemoteStorage, ListingMode, RemotePath, RemoteStorageConfig,
16 : RemoteStorageKind, S3Config,
17 : };
18 : use test_context::test_context;
19 : use test_context::AsyncTestContext;
20 : use tokio::io::AsyncBufReadExt;
21 : use tokio_util::sync::CancellationToken;
22 : use tracing::info;
23 :
24 : mod common;
25 :
26 : #[path = "common/tests.rs"]
27 : mod tests_s3;
28 :
29 : use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
30 : use utils::backoff;
31 :
32 : const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
33 : const BASE_PREFIX: &str = "test";
34 :
35 4 : #[test_context(MaybeEnabledStorage)]
36 : #[tokio::test]
37 4 : async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
38 4 : let ctx = match ctx {
39 2 : MaybeEnabledStorage::Enabled(ctx) => ctx,
40 2 : MaybeEnabledStorage::Disabled => return Ok(()),
41 : };
42 : // Our test depends on discrepancies in the clock between S3 and the environment the tests
43 : // run in. Therefore, wait a little bit before and after. The alternative would be
44 : // to take the time from S3 response headers.
45 : const WAIT_TIME: Duration = Duration::from_millis(3_000);
46 :
47 26 : async fn retry<T, O, F, E>(op: O) -> Result<T, E>
48 26 : where
49 26 : E: Display + Debug + 'static,
50 26 : O: FnMut() -> F,
51 26 : F: Future<Output = Result<T, E>>,
52 26 : {
53 26 : let warn_threshold = 3;
54 26 : let max_retries = 10;
55 26 : backoff::retry(
56 26 : op,
57 26 : |_e| false,
58 26 : warn_threshold,
59 26 : max_retries,
60 26 : "test retry",
61 26 : &CancellationToken::new(),
62 26 : )
63 106 : .await
64 26 : .expect("never cancelled")
65 26 : }
66 :
67 12 : async fn time_point() -> SystemTime {
68 12 : tokio::time::sleep(WAIT_TIME).await;
69 12 : let ret = SystemTime::now();
70 12 : tokio::time::sleep(WAIT_TIME).await;
71 12 : ret
72 12 : }
73 :
74 12 : async fn list_files(
75 12 : client: &Arc<GenericRemoteStorage>,
76 12 : cancel: &CancellationToken,
77 12 : ) -> anyhow::Result<HashSet<RemotePath>> {
78 12 : Ok(
79 12 : retry(|| client.list(None, ListingMode::NoDelimiter, None, cancel))
80 58 : .await
81 12 : .context("list root files failure")?
82 : .keys
83 12 : .into_iter()
84 12 : .collect::<HashSet<_>>(),
85 : )
86 12 : }
87 :
88 2 : let cancel = CancellationToken::new();
89 :
90 2 : let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
91 2 : .with_context(|| "RemotePath conversion")?;
92 :
93 2 : let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
94 2 : .with_context(|| "RemotePath conversion")?;
95 :
96 2 : let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
97 2 : .with_context(|| "RemotePath conversion")?;
98 :
99 2 : retry(|| {
100 2 : let (data, len) = upload_stream("remote blob data1".as_bytes().into());
101 2 : ctx.client.upload(data, len, &path1, None, &cancel)
102 2 : })
103 18 : .await?;
104 :
105 6 : let t0_files = list_files(&ctx.client, &cancel).await?;
106 4 : let t0 = time_point().await;
107 2 : println!("at t0: {t0_files:?}");
108 2 :
109 2 : let old_data = "remote blob data2";
110 2 :
111 3 : retry(|| {
112 3 : let (data, len) = upload_stream(old_data.as_bytes().into());
113 3 : ctx.client.upload(data, len, &path2, None, &cancel)
114 3 : })
115 16 : .await?;
116 :
117 4 : let t1_files = list_files(&ctx.client, &cancel).await?;
118 4 : let t1 = time_point().await;
119 2 : println!("at t1: {t1_files:?}");
120 :
121 : // A little check to ensure that our clock is not too far off from the S3 clock
122 : {
123 2 : let dl = retry(|| ctx.client.download(&path2, &cancel)).await?;
124 2 : let last_modified = dl.last_modified;
125 2 : let half_wt = WAIT_TIME.mul_f32(0.5);
126 2 : let t0_hwt = t0 + half_wt;
127 2 : let t1_hwt = t1 - half_wt;
128 2 : if !(t0_hwt..=t1_hwt).contains(&last_modified) {
129 0 : panic!("last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \
130 0 : This likely means a large lock discrepancy between S3 and the local clock.");
131 2 : }
132 2 : }
133 2 :
134 2 : retry(|| {
135 2 : let (data, len) = upload_stream("remote blob data3".as_bytes().into());
136 2 : ctx.client.upload(data, len, &path3, None, &cancel)
137 2 : })
138 2 : .await?;
139 :
140 2 : let new_data = "new remote blob data2";
141 2 :
142 2 : retry(|| {
143 2 : let (data, len) = upload_stream(new_data.as_bytes().into());
144 2 : ctx.client.upload(data, len, &path2, None, &cancel)
145 2 : })
146 2 : .await?;
147 :
148 4 : retry(|| ctx.client.delete(&path1, &cancel)).await?;
149 17 : let t2_files = list_files(&ctx.client, &cancel).await?;
150 4 : let t2 = time_point().await;
151 2 : println!("at t2: {t2_files:?}");
152 :
153 : // No changes after recovery to t2 (no-op)
154 4 : let t_final = time_point().await;
155 2 : ctx.client
156 2 : .time_travel_recover(None, t2, t_final, &cancel)
157 14 : .await?;
158 4 : let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
159 2 : println!("after recovery to t2: {t2_files_recovered:?}");
160 2 : assert_eq!(t2_files, t2_files_recovered);
161 2 : let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
162 2 : assert_eq!(path2_recovered_t2, new_data.as_bytes());
163 :
164 : // after recovery to t1: path1 is back, path2 has the old content
165 4 : let t_final = time_point().await;
166 2 : ctx.client
167 2 : .time_travel_recover(None, t1, t_final, &cancel)
168 33 : .await?;
169 11 : let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
170 2 : println!("after recovery to t1: {t1_files_recovered:?}");
171 2 : assert_eq!(t1_files, t1_files_recovered);
172 2 : let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
173 2 : assert_eq!(path2_recovered_t1, old_data.as_bytes());
174 :
175 : // after recovery to t0: everything is gone except for path1
176 4 : let t_final = time_point().await;
177 2 : ctx.client
178 2 : .time_travel_recover(None, t0, t_final, &cancel)
179 22 : .await?;
180 16 : let t0_files_recovered = list_files(&ctx.client, &cancel).await?;
181 2 : println!("after recovery to t0: {t0_files_recovered:?}");
182 2 : assert_eq!(t0_files, t0_files_recovered);
183 :
184 : // cleanup
185 :
186 2 : let paths = &[path1, path2, path3];
187 4 : retry(|| ctx.client.delete_objects(paths, &cancel)).await?;
188 :
189 2 : Ok(())
190 4 : }
191 :
192 : struct EnabledS3 {
193 : client: Arc<GenericRemoteStorage>,
194 : base_prefix: &'static str,
195 : }
196 :
197 : impl EnabledS3 {
198 18 : async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
199 18 : let client = create_s3_client(max_keys_in_list_response)
200 0 : .await
201 18 : .context("S3 client creation")
202 18 : .expect("S3 client creation failed");
203 18 :
204 18 : EnabledS3 {
205 18 : client,
206 18 : base_prefix: BASE_PREFIX,
207 18 : }
208 18 : }
209 :
210 4 : fn configure_request_timeout(&mut self, timeout: Duration) {
211 4 : match Arc::get_mut(&mut self.client).expect("outer Arc::get_mut") {
212 4 : GenericRemoteStorage::AwsS3(s3) => {
213 4 : let s3 = Arc::get_mut(s3).expect("inner Arc::get_mut");
214 4 : s3.timeout = timeout;
215 4 : }
216 0 : _ => unreachable!(),
217 : }
218 4 : }
219 : }
220 :
221 : enum MaybeEnabledStorage {
222 : Enabled(EnabledS3),
223 : Disabled,
224 : }
225 :
226 : impl AsyncTestContext for MaybeEnabledStorage {
227 28 : async fn setup() -> Self {
228 28 : ensure_logging_ready();
229 28 :
230 28 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
231 14 : info!(
232 0 : "`{}` env variable is not set, skipping the test",
233 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
234 : );
235 14 : return Self::Disabled;
236 14 : }
237 14 :
238 14 : Self::Enabled(EnabledS3::setup(None).await)
239 28 : }
240 : }
241 :
242 : enum MaybeEnabledStorageWithTestBlobs {
243 : Enabled(S3WithTestBlobs),
244 : Disabled,
245 : UploadsFailed(anyhow::Error, S3WithTestBlobs),
246 : }
247 :
248 : struct S3WithTestBlobs {
249 : enabled: EnabledS3,
250 : remote_prefixes: HashSet<RemotePath>,
251 : remote_blobs: HashSet<RemotePath>,
252 : }
253 :
254 : impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
255 4 : async fn setup() -> Self {
256 4 : ensure_logging_ready();
257 4 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
258 2 : info!(
259 0 : "`{}` env variable is not set, skipping the test",
260 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
261 : );
262 2 : return Self::Disabled;
263 2 : }
264 2 :
265 2 : let max_keys_in_list_response = 10;
266 2 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
267 :
268 2 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
269 :
270 40 : match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
271 2 : ControlFlow::Continue(uploads) => {
272 2 : info!("Remote objects created successfully");
273 :
274 2 : Self::Enabled(S3WithTestBlobs {
275 2 : enabled,
276 2 : remote_prefixes: uploads.prefixes,
277 2 : remote_blobs: uploads.blobs,
278 2 : })
279 : }
280 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
281 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
282 0 : S3WithTestBlobs {
283 0 : enabled,
284 0 : remote_prefixes: uploads.prefixes,
285 0 : remote_blobs: uploads.blobs,
286 0 : },
287 0 : ),
288 : }
289 4 : }
290 :
291 4 : async fn teardown(self) {
292 4 : match self {
293 2 : Self::Disabled => {}
294 2 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
295 15 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
296 : }
297 : }
298 4 : }
299 : }
300 :
301 : enum MaybeEnabledStorageWithSimpleTestBlobs {
302 : Enabled(S3WithSimpleTestBlobs),
303 : Disabled,
304 : UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
305 : }
306 : struct S3WithSimpleTestBlobs {
307 : enabled: EnabledS3,
308 : remote_blobs: HashSet<RemotePath>,
309 : }
310 :
311 : impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
312 4 : async fn setup() -> Self {
313 4 : ensure_logging_ready();
314 4 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
315 2 : info!(
316 0 : "`{}` env variable is not set, skipping the test",
317 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
318 : );
319 2 : return Self::Disabled;
320 2 : }
321 2 :
322 2 : let max_keys_in_list_response = 10;
323 2 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
324 :
325 2 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
326 :
327 39 : match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
328 2 : ControlFlow::Continue(uploads) => {
329 2 : info!("Remote objects created successfully");
330 :
331 2 : Self::Enabled(S3WithSimpleTestBlobs {
332 2 : enabled,
333 2 : remote_blobs: uploads,
334 2 : })
335 : }
336 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
337 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
338 0 : S3WithSimpleTestBlobs {
339 0 : enabled,
340 0 : remote_blobs: uploads,
341 0 : },
342 0 : ),
343 : }
344 4 : }
345 :
346 4 : async fn teardown(self) {
347 4 : match self {
348 2 : Self::Disabled => {}
349 2 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
350 21 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
351 : }
352 : }
353 4 : }
354 : }
355 :
356 18 : async fn create_s3_client(
357 18 : max_keys_per_list_response: Option<i32>,
358 18 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
359 : use rand::Rng;
360 :
361 18 : let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
362 18 : .context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
363 18 : let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
364 18 : .context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?;
365 :
366 : // due to how time works, we've had test runners use the same nanos as bucket prefixes.
367 : // millis is just a debugging aid for easier finding the prefix later.
368 18 : let millis = std::time::SystemTime::now()
369 18 : .duration_since(UNIX_EPOCH)
370 18 : .context("random s3 test prefix part calculation")?
371 18 : .as_millis();
372 18 :
373 18 : // because nanos can be the same for two threads so can millis, add randomness
374 18 : let random = rand::thread_rng().gen::<u32>();
375 18 :
376 18 : let remote_storage_config = RemoteStorageConfig {
377 18 : storage: RemoteStorageKind::AwsS3(S3Config {
378 18 : bucket_name: remote_storage_s3_bucket,
379 18 : bucket_region: remote_storage_s3_region,
380 18 : prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
381 18 : endpoint: None,
382 18 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
383 18 : max_keys_per_list_response,
384 18 : upload_storage_class: None,
385 18 : }),
386 18 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
387 18 : };
388 18 : Ok(Arc::new(
389 18 : GenericRemoteStorage::from_config(&remote_storage_config)
390 0 : .await
391 18 : .context("remote storage init")?,
392 : ))
393 18 : }
394 :
395 4 : #[test_context(MaybeEnabledStorage)]
396 : #[tokio::test]
397 4 : async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
398 4 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
399 2 : return;
400 : };
401 :
402 2 : let cancel = CancellationToken::new();
403 2 :
404 2 : let path = RemotePath::new(Utf8Path::new(
405 2 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
406 2 : ))
407 2 : .unwrap();
408 :
409 17 : let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
410 :
411 2 : let timeout = std::time::Duration::from_secs(5);
412 2 :
413 2 : ctx.configure_request_timeout(timeout);
414 2 :
415 2 : let started_at = std::time::Instant::now();
416 2 : let mut stream = ctx
417 2 : .client
418 2 : .download(&path, &cancel)
419 2 : .await
420 2 : .expect("download succeeds")
421 2 : .download_stream;
422 2 :
423 2 : if started_at.elapsed().mul_f32(0.9) >= timeout {
424 0 : tracing::warn!(
425 0 : elapsed_ms = started_at.elapsed().as_millis(),
426 0 : "timeout might be too low, consumed most of it during headers"
427 : );
428 2 : }
429 :
430 2 : let first = stream
431 2 : .next()
432 2 : .await
433 2 : .expect("should have the first blob")
434 2 : .expect("should have succeeded");
435 2 :
436 2 : tracing::info!(len = first.len(), "downloaded first chunk");
437 :
438 2 : assert!(
439 2 : first.len() < len,
440 0 : "uploaded file is too small, we downloaded all on first chunk"
441 : );
442 :
443 6 : tokio::time::sleep(timeout).await;
444 :
445 : {
446 2 : let started_at = std::time::Instant::now();
447 2 : let next = stream
448 2 : .next()
449 0 : .await
450 2 : .expect("stream should not have ended yet");
451 2 :
452 2 : tracing::info!(
453 0 : next.is_err = next.is_err(),
454 0 : elapsed_ms = started_at.elapsed().as_millis(),
455 0 : "received item after timeout"
456 : );
457 :
458 2 : let e = next.expect_err("expected an error, but got a chunk?");
459 2 :
460 2 : let inner = e.get_ref().expect("std::io::Error::inner should be set");
461 2 : assert!(
462 2 : inner
463 2 : .downcast_ref::<DownloadError>()
464 2 : .is_some_and(|e| matches!(e, DownloadError::Timeout)),
465 0 : "{inner:?}"
466 : );
467 : }
468 :
469 2 : ctx.configure_request_timeout(RemoteStorageConfig::DEFAULT_TIMEOUT);
470 2 :
471 15 : ctx.client.delete_objects(&[path], &cancel).await.unwrap()
472 4 : }
473 :
474 4 : #[test_context(MaybeEnabledStorage)]
475 : #[tokio::test]
476 4 : async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
477 4 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
478 2 : return;
479 : };
480 :
481 2 : let cancel = CancellationToken::new();
482 2 :
483 2 : let path = RemotePath::new(Utf8Path::new(
484 2 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
485 2 : ))
486 2 : .unwrap();
487 :
488 15 : let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
489 :
490 : {
491 2 : let stream = ctx
492 2 : .client
493 2 : .download(&path, &cancel)
494 2 : .await
495 2 : .expect("download succeeds")
496 2 : .download_stream;
497 2 :
498 2 : let mut reader = std::pin::pin!(tokio_util::io::StreamReader::new(stream));
499 :
500 2 : let first = reader.fill_buf().await.expect("should have the first blob");
501 2 :
502 2 : let len = first.len();
503 2 : tracing::info!(len, "downloaded first chunk");
504 :
505 2 : assert!(
506 2 : first.len() < file_len,
507 0 : "uploaded file is too small, we downloaded all on first chunk"
508 : );
509 :
510 2 : reader.consume(len);
511 2 :
512 2 : cancel.cancel();
513 :
514 2 : let next = reader.fill_buf().await;
515 :
516 2 : let e = next.expect_err("expected an error, but got a chunk?");
517 2 :
518 2 : let inner = e.get_ref().expect("std::io::Error::inner should be set");
519 2 : assert!(
520 2 : inner
521 2 : .downcast_ref::<DownloadError>()
522 2 : .is_some_and(|e| matches!(e, DownloadError::Cancelled)),
523 0 : "{inner:?}"
524 : );
525 :
526 2 : let e = DownloadError::from(e);
527 2 :
528 2 : assert!(matches!(e, DownloadError::Cancelled), "{e:?}");
529 : }
530 :
531 2 : let cancel = CancellationToken::new();
532 2 :
533 21 : ctx.client.delete_objects(&[path], &cancel).await.unwrap();
534 4 : }
535 :
536 : /// Upload a long enough file so that we cannot download it in single chunk
537 : ///
538 : /// For s3 the first chunk seems to be less than 10kB, so this has a bit of a safety margin
539 4 : async fn upload_large_enough_file(
540 4 : client: &GenericRemoteStorage,
541 4 : path: &RemotePath,
542 4 : cancel: &CancellationToken,
543 4 : ) -> usize {
544 4 : let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
545 4 : let body = bytes::Bytes::from(vec![0u8; 1024]);
546 4 : let contents = std::iter::once(header).chain(std::iter::repeat(body).take(128));
547 4 :
548 516 : let len = contents.clone().fold(0, |acc, next| acc + next.len());
549 4 :
550 4 : let contents = futures::stream::iter(contents.map(std::io::Result::Ok));
551 4 :
552 4 : client
553 4 : .upload(contents, len, path, None, cancel)
554 32 : .await
555 4 : .expect("upload succeeds");
556 4 :
557 4 : len
558 4 : }
|