Line data Source code
1 : use std::env;
2 : use std::fmt::{Debug, Display};
3 : use std::future::Future;
4 : use std::num::NonZeroUsize;
5 : use std::ops::ControlFlow;
6 : use std::sync::Arc;
7 : use std::time::{Duration, UNIX_EPOCH};
8 : use std::{collections::HashSet, time::SystemTime};
9 :
10 : use crate::common::{download_to_vec, upload_stream};
11 : use anyhow::Context;
12 : use camino::Utf8Path;
13 : use futures_util::StreamExt;
14 : use remote_storage::{
15 : DownloadError, GenericRemoteStorage, ListingMode, RemotePath, RemoteStorageConfig,
16 : RemoteStorageKind, S3Config,
17 : };
18 : use test_context::test_context;
19 : use test_context::AsyncTestContext;
20 : use tokio::io::AsyncBufReadExt;
21 : use tokio_util::sync::CancellationToken;
22 : use tracing::info;
23 :
24 : mod common;
25 :
26 : #[path = "common/tests.rs"]
27 : mod tests_s3;
28 :
29 : use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
30 : use utils::backoff;
31 :
32 : const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
33 : const BASE_PREFIX: &str = "test";
34 :
35 2 : #[test_context(MaybeEnabledStorage)]
36 : #[tokio::test]
37 2 : async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
38 2 : let ctx = match ctx {
39 0 : MaybeEnabledStorage::Enabled(ctx) => ctx,
40 2 : MaybeEnabledStorage::Disabled => return Ok(()),
41 : };
42 : // Our test depends on discrepancies in the clock between S3 and the environment the tests
43 : // run in. Therefore, wait a little bit before and after. The alternative would be
44 : // to take the time from S3 response headers.
45 : const WAIT_TIME: Duration = Duration::from_millis(3_000);
46 :
47 0 : async fn retry<T, O, F, E>(op: O) -> Result<T, E>
48 0 : where
49 0 : E: Display + Debug + 'static,
50 0 : O: FnMut() -> F,
51 0 : F: Future<Output = Result<T, E>>,
52 0 : {
53 0 : let warn_threshold = 3;
54 0 : let max_retries = 10;
55 0 : backoff::retry(
56 0 : op,
57 0 : |_e| false,
58 0 : warn_threshold,
59 0 : max_retries,
60 0 : "test retry",
61 0 : &CancellationToken::new(),
62 0 : )
63 0 : .await
64 0 : .expect("never cancelled")
65 0 : }
66 :
67 0 : async fn time_point() -> SystemTime {
68 0 : tokio::time::sleep(WAIT_TIME).await;
69 0 : let ret = SystemTime::now();
70 0 : tokio::time::sleep(WAIT_TIME).await;
71 0 : ret
72 0 : }
73 :
74 0 : async fn list_files(
75 0 : client: &Arc<GenericRemoteStorage>,
76 0 : cancel: &CancellationToken,
77 0 : ) -> anyhow::Result<HashSet<RemotePath>> {
78 0 : Ok(
79 0 : retry(|| client.list(None, ListingMode::NoDelimiter, None, cancel))
80 0 : .await
81 0 : .context("list root files failure")?
82 : .keys
83 0 : .into_iter()
84 0 : .collect::<HashSet<_>>(),
85 : )
86 0 : }
87 :
88 0 : let cancel = CancellationToken::new();
89 :
90 0 : let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
91 0 : .with_context(|| "RemotePath conversion")?;
92 :
93 0 : let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
94 0 : .with_context(|| "RemotePath conversion")?;
95 :
96 0 : let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
97 0 : .with_context(|| "RemotePath conversion")?;
98 :
99 0 : retry(|| {
100 0 : let (data, len) = upload_stream("remote blob data1".as_bytes().into());
101 0 : ctx.client.upload(data, len, &path1, None, &cancel)
102 0 : })
103 0 : .await?;
104 :
105 0 : let t0_files = list_files(&ctx.client, &cancel).await?;
106 0 : let t0 = time_point().await;
107 0 : println!("at t0: {t0_files:?}");
108 0 :
109 0 : let old_data = "remote blob data2";
110 0 :
111 0 : retry(|| {
112 0 : let (data, len) = upload_stream(old_data.as_bytes().into());
113 0 : ctx.client.upload(data, len, &path2, None, &cancel)
114 0 : })
115 0 : .await?;
116 :
117 0 : let t1_files = list_files(&ctx.client, &cancel).await?;
118 0 : let t1 = time_point().await;
119 0 : println!("at t1: {t1_files:?}");
120 :
121 : // A little check to ensure that our clock is not too far off from the S3 clock
122 : {
123 0 : let dl = retry(|| ctx.client.download(&path2, &cancel)).await?;
124 0 : let last_modified = dl.last_modified;
125 0 : let half_wt = WAIT_TIME.mul_f32(0.5);
126 0 : let t0_hwt = t0 + half_wt;
127 0 : let t1_hwt = t1 - half_wt;
128 0 : if !(t0_hwt..=t1_hwt).contains(&last_modified) {
129 0 : panic!("last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \
130 0 : This likely means a large lock discrepancy between S3 and the local clock.");
131 0 : }
132 0 : }
133 0 :
134 0 : retry(|| {
135 0 : let (data, len) = upload_stream("remote blob data3".as_bytes().into());
136 0 : ctx.client.upload(data, len, &path3, None, &cancel)
137 0 : })
138 0 : .await?;
139 :
140 0 : let new_data = "new remote blob data2";
141 0 :
142 0 : retry(|| {
143 0 : let (data, len) = upload_stream(new_data.as_bytes().into());
144 0 : ctx.client.upload(data, len, &path2, None, &cancel)
145 0 : })
146 0 : .await?;
147 :
148 0 : retry(|| ctx.client.delete(&path1, &cancel)).await?;
149 0 : let t2_files = list_files(&ctx.client, &cancel).await?;
150 0 : let t2 = time_point().await;
151 0 : println!("at t2: {t2_files:?}");
152 :
153 : // No changes after recovery to t2 (no-op)
154 0 : let t_final = time_point().await;
155 0 : ctx.client
156 0 : .time_travel_recover(None, t2, t_final, &cancel)
157 0 : .await?;
158 0 : let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
159 0 : println!("after recovery to t2: {t2_files_recovered:?}");
160 0 : assert_eq!(t2_files, t2_files_recovered);
161 0 : let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
162 0 : assert_eq!(path2_recovered_t2, new_data.as_bytes());
163 :
164 : // after recovery to t1: path1 is back, path2 has the old content
165 0 : let t_final = time_point().await;
166 0 : ctx.client
167 0 : .time_travel_recover(None, t1, t_final, &cancel)
168 0 : .await?;
169 0 : let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
170 0 : println!("after recovery to t1: {t1_files_recovered:?}");
171 0 : assert_eq!(t1_files, t1_files_recovered);
172 0 : let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
173 0 : assert_eq!(path2_recovered_t1, old_data.as_bytes());
174 :
175 : // after recovery to t0: everything is gone except for path1
176 0 : let t_final = time_point().await;
177 0 : ctx.client
178 0 : .time_travel_recover(None, t0, t_final, &cancel)
179 0 : .await?;
180 0 : let t0_files_recovered = list_files(&ctx.client, &cancel).await?;
181 0 : println!("after recovery to t0: {t0_files_recovered:?}");
182 0 : assert_eq!(t0_files, t0_files_recovered);
183 :
184 : // cleanup
185 :
186 0 : let paths = &[path1, path2, path3];
187 0 : retry(|| ctx.client.delete_objects(paths, &cancel)).await?;
188 :
189 0 : Ok(())
190 2 : }
191 :
192 : struct EnabledS3 {
193 : client: Arc<GenericRemoteStorage>,
194 : base_prefix: &'static str,
195 : }
196 :
197 : impl EnabledS3 {
198 0 : async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
199 0 : let client = create_s3_client(max_keys_in_list_response)
200 0 : .context("S3 client creation")
201 0 : .expect("S3 client creation failed");
202 0 :
203 0 : EnabledS3 {
204 0 : client,
205 0 : base_prefix: BASE_PREFIX,
206 0 : }
207 0 : }
208 :
209 0 : fn configure_request_timeout(&mut self, timeout: Duration) {
210 0 : match Arc::get_mut(&mut self.client).expect("outer Arc::get_mut") {
211 0 : GenericRemoteStorage::AwsS3(s3) => {
212 0 : let s3 = Arc::get_mut(s3).expect("inner Arc::get_mut");
213 0 : s3.timeout = timeout;
214 0 : }
215 0 : _ => unreachable!(),
216 : }
217 0 : }
218 : }
219 :
220 : enum MaybeEnabledStorage {
221 : Enabled(EnabledS3),
222 : Disabled,
223 : }
224 :
225 : impl AsyncTestContext for MaybeEnabledStorage {
226 14 : async fn setup() -> Self {
227 14 : ensure_logging_ready();
228 14 :
229 14 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
230 14 : info!(
231 0 : "`{}` env variable is not set, skipping the test",
232 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
233 : );
234 14 : return Self::Disabled;
235 0 : }
236 0 :
237 0 : Self::Enabled(EnabledS3::setup(None).await)
238 14 : }
239 : }
240 :
241 : enum MaybeEnabledStorageWithTestBlobs {
242 : Enabled(S3WithTestBlobs),
243 : Disabled,
244 : UploadsFailed(anyhow::Error, S3WithTestBlobs),
245 : }
246 :
247 : struct S3WithTestBlobs {
248 : enabled: EnabledS3,
249 : remote_prefixes: HashSet<RemotePath>,
250 : remote_blobs: HashSet<RemotePath>,
251 : }
252 :
253 : impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
254 2 : async fn setup() -> Self {
255 2 : ensure_logging_ready();
256 2 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
257 2 : info!(
258 0 : "`{}` env variable is not set, skipping the test",
259 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
260 : );
261 2 : return Self::Disabled;
262 0 : }
263 0 :
264 0 : let max_keys_in_list_response = 10;
265 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
266 :
267 0 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
268 :
269 0 : match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
270 0 : ControlFlow::Continue(uploads) => {
271 0 : info!("Remote objects created successfully");
272 :
273 0 : Self::Enabled(S3WithTestBlobs {
274 0 : enabled,
275 0 : remote_prefixes: uploads.prefixes,
276 0 : remote_blobs: uploads.blobs,
277 0 : })
278 : }
279 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
280 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
281 0 : S3WithTestBlobs {
282 0 : enabled,
283 0 : remote_prefixes: uploads.prefixes,
284 0 : remote_blobs: uploads.blobs,
285 0 : },
286 0 : ),
287 : }
288 2 : }
289 :
290 2 : async fn teardown(self) {
291 2 : match self {
292 2 : Self::Disabled => {}
293 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
294 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
295 : }
296 : }
297 2 : }
298 : }
299 :
300 : enum MaybeEnabledStorageWithSimpleTestBlobs {
301 : Enabled(S3WithSimpleTestBlobs),
302 : Disabled,
303 : UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
304 : }
305 : struct S3WithSimpleTestBlobs {
306 : enabled: EnabledS3,
307 : remote_blobs: HashSet<RemotePath>,
308 : }
309 :
310 : impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
311 2 : async fn setup() -> Self {
312 2 : ensure_logging_ready();
313 2 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
314 2 : info!(
315 0 : "`{}` env variable is not set, skipping the test",
316 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
317 : );
318 2 : return Self::Disabled;
319 0 : }
320 0 :
321 0 : let max_keys_in_list_response = 10;
322 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
323 :
324 0 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
325 :
326 0 : match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
327 0 : ControlFlow::Continue(uploads) => {
328 0 : info!("Remote objects created successfully");
329 :
330 0 : Self::Enabled(S3WithSimpleTestBlobs {
331 0 : enabled,
332 0 : remote_blobs: uploads,
333 0 : })
334 : }
335 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
336 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
337 0 : S3WithSimpleTestBlobs {
338 0 : enabled,
339 0 : remote_blobs: uploads,
340 0 : },
341 0 : ),
342 : }
343 2 : }
344 :
345 2 : async fn teardown(self) {
346 2 : match self {
347 2 : Self::Disabled => {}
348 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
349 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
350 : }
351 : }
352 2 : }
353 : }
354 :
355 0 : fn create_s3_client(
356 0 : max_keys_per_list_response: Option<i32>,
357 0 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
358 : use rand::Rng;
359 :
360 0 : let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
361 0 : .context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
362 0 : let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
363 0 : .context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?;
364 :
365 : // due to how time works, we've had test runners use the same nanos as bucket prefixes.
366 : // millis is just a debugging aid for easier finding the prefix later.
367 0 : let millis = std::time::SystemTime::now()
368 0 : .duration_since(UNIX_EPOCH)
369 0 : .context("random s3 test prefix part calculation")?
370 0 : .as_millis();
371 0 :
372 0 : // because nanos can be the same for two threads so can millis, add randomness
373 0 : let random = rand::thread_rng().gen::<u32>();
374 0 :
375 0 : let remote_storage_config = RemoteStorageConfig {
376 0 : storage: RemoteStorageKind::AwsS3(S3Config {
377 0 : bucket_name: remote_storage_s3_bucket,
378 0 : bucket_region: remote_storage_s3_region,
379 0 : prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
380 0 : endpoint: None,
381 0 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
382 0 : max_keys_per_list_response,
383 0 : upload_storage_class: None,
384 0 : }),
385 0 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
386 0 : };
387 0 : Ok(Arc::new(
388 0 : GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
389 : ))
390 0 : }
391 :
392 2 : #[test_context(MaybeEnabledStorage)]
393 : #[tokio::test]
394 2 : async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
395 2 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
396 2 : return;
397 : };
398 :
399 0 : let cancel = CancellationToken::new();
400 0 :
401 0 : let path = RemotePath::new(Utf8Path::new(
402 0 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
403 0 : ))
404 0 : .unwrap();
405 :
406 0 : let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
407 :
408 0 : let timeout = std::time::Duration::from_secs(5);
409 0 :
410 0 : ctx.configure_request_timeout(timeout);
411 0 :
412 0 : let started_at = std::time::Instant::now();
413 0 : let mut stream = ctx
414 0 : .client
415 0 : .download(&path, &cancel)
416 0 : .await
417 0 : .expect("download succeeds")
418 0 : .download_stream;
419 0 :
420 0 : if started_at.elapsed().mul_f32(0.9) >= timeout {
421 0 : tracing::warn!(
422 0 : elapsed_ms = started_at.elapsed().as_millis(),
423 0 : "timeout might be too low, consumed most of it during headers"
424 : );
425 0 : }
426 :
427 0 : let first = stream
428 0 : .next()
429 0 : .await
430 0 : .expect("should have the first blob")
431 0 : .expect("should have succeeded");
432 0 :
433 0 : tracing::info!(len = first.len(), "downloaded first chunk");
434 :
435 0 : assert!(
436 0 : first.len() < len,
437 0 : "uploaded file is too small, we downloaded all on first chunk"
438 : );
439 :
440 0 : tokio::time::sleep(timeout).await;
441 :
442 : {
443 0 : let started_at = std::time::Instant::now();
444 0 : let next = stream
445 0 : .next()
446 0 : .await
447 0 : .expect("stream should not have ended yet");
448 0 :
449 0 : tracing::info!(
450 0 : next.is_err = next.is_err(),
451 0 : elapsed_ms = started_at.elapsed().as_millis(),
452 0 : "received item after timeout"
453 : );
454 :
455 0 : let e = next.expect_err("expected an error, but got a chunk?");
456 0 :
457 0 : let inner = e.get_ref().expect("std::io::Error::inner should be set");
458 0 : assert!(
459 0 : inner
460 0 : .downcast_ref::<DownloadError>()
461 0 : .is_some_and(|e| matches!(e, DownloadError::Timeout)),
462 0 : "{inner:?}"
463 : );
464 : }
465 :
466 0 : ctx.configure_request_timeout(RemoteStorageConfig::DEFAULT_TIMEOUT);
467 0 :
468 0 : ctx.client.delete_objects(&[path], &cancel).await.unwrap()
469 2 : }
470 :
471 2 : #[test_context(MaybeEnabledStorage)]
472 : #[tokio::test]
473 2 : async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
474 2 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
475 2 : return;
476 : };
477 :
478 0 : let cancel = CancellationToken::new();
479 0 :
480 0 : let path = RemotePath::new(Utf8Path::new(
481 0 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
482 0 : ))
483 0 : .unwrap();
484 :
485 0 : let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
486 :
487 : {
488 0 : let stream = ctx
489 0 : .client
490 0 : .download(&path, &cancel)
491 0 : .await
492 0 : .expect("download succeeds")
493 0 : .download_stream;
494 0 :
495 0 : let mut reader = std::pin::pin!(tokio_util::io::StreamReader::new(stream));
496 :
497 0 : let first = reader.fill_buf().await.expect("should have the first blob");
498 0 :
499 0 : let len = first.len();
500 0 : tracing::info!(len, "downloaded first chunk");
501 :
502 0 : assert!(
503 0 : first.len() < file_len,
504 0 : "uploaded file is too small, we downloaded all on first chunk"
505 : );
506 :
507 0 : reader.consume(len);
508 0 :
509 0 : cancel.cancel();
510 :
511 0 : let next = reader.fill_buf().await;
512 :
513 0 : let e = next.expect_err("expected an error, but got a chunk?");
514 0 :
515 0 : let inner = e.get_ref().expect("std::io::Error::inner should be set");
516 0 : assert!(
517 0 : inner
518 0 : .downcast_ref::<DownloadError>()
519 0 : .is_some_and(|e| matches!(e, DownloadError::Cancelled)),
520 0 : "{inner:?}"
521 : );
522 :
523 0 : let e = DownloadError::from(e);
524 0 :
525 0 : assert!(matches!(e, DownloadError::Cancelled), "{e:?}");
526 : }
527 :
528 0 : let cancel = CancellationToken::new();
529 0 :
530 0 : ctx.client.delete_objects(&[path], &cancel).await.unwrap();
531 2 : }
532 :
533 : /// Upload a long enough file so that we cannot download it in single chunk
534 : ///
535 : /// For s3 the first chunk seems to be less than 10kB, so this has a bit of a safety margin
536 0 : async fn upload_large_enough_file(
537 0 : client: &GenericRemoteStorage,
538 0 : path: &RemotePath,
539 0 : cancel: &CancellationToken,
540 0 : ) -> usize {
541 0 : let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
542 0 : let body = bytes::Bytes::from(vec![0u8; 1024]);
543 0 : let contents = std::iter::once(header).chain(std::iter::repeat(body).take(128));
544 0 :
545 0 : let len = contents.clone().fold(0, |acc, next| acc + next.len());
546 0 :
547 0 : let contents = futures::stream::iter(contents.map(std::io::Result::Ok));
548 0 :
549 0 : client
550 0 : .upload(contents, len, path, None, cancel)
551 0 : .await
552 0 : .expect("upload succeeds");
553 0 :
554 0 : len
555 0 : }
|