Line data Source code
1 : use std::env;
2 : use std::fmt::{Debug, Display};
3 : use std::future::Future;
4 : use std::num::NonZeroUsize;
5 : use std::ops::ControlFlow;
6 : use std::sync::Arc;
7 : use std::time::{Duration, UNIX_EPOCH};
8 : use std::{collections::HashSet, time::SystemTime};
9 :
10 : use crate::common::{download_to_vec, upload_stream};
11 : use anyhow::Context;
12 : use camino::Utf8Path;
13 : use futures_util::StreamExt;
14 : use remote_storage::{
15 : DownloadError, GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind,
16 : S3Config,
17 : };
18 : use test_context::test_context;
19 : use test_context::AsyncTestContext;
20 : use tokio::io::AsyncBufReadExt;
21 : use tokio_util::sync::CancellationToken;
22 : use tracing::info;
23 :
24 : mod common;
25 :
26 : #[path = "common/tests.rs"]
27 : mod tests_s3;
28 :
29 : use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
30 : use utils::backoff;
31 :
32 : const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
33 : const BASE_PREFIX: &str = "test";
34 :
35 2 : #[test_context(MaybeEnabledStorage)]
36 : #[tokio::test]
37 4 : async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
38 2 : let ctx = match ctx {
39 0 : MaybeEnabledStorage::Enabled(ctx) => ctx,
40 2 : MaybeEnabledStorage::Disabled => return Ok(()),
41 : };
42 : // Our test depends on discrepancies in the clock between S3 and the environment the tests
43 : // run in. Therefore, wait a little bit before and after. The alternative would be
44 : // to take the time from S3 response headers.
45 : const WAIT_TIME: Duration = Duration::from_millis(3_000);
46 :
47 0 : async fn retry<T, O, F, E>(op: O) -> Result<T, E>
48 0 : where
49 0 : E: Display + Debug + 'static,
50 0 : O: FnMut() -> F,
51 0 : F: Future<Output = Result<T, E>>,
52 0 : {
53 0 : let warn_threshold = 3;
54 0 : let max_retries = 10;
55 0 : backoff::retry(
56 0 : op,
57 0 : |_e| false,
58 0 : warn_threshold,
59 0 : max_retries,
60 0 : "test retry",
61 0 : &CancellationToken::new(),
62 0 : )
63 0 : .await
64 0 : .expect("never cancelled")
65 0 : }
66 :
67 0 : async fn time_point() -> SystemTime {
68 0 : tokio::time::sleep(WAIT_TIME).await;
69 0 : let ret = SystemTime::now();
70 0 : tokio::time::sleep(WAIT_TIME).await;
71 0 : ret
72 0 : }
73 :
74 0 : async fn list_files(
75 0 : client: &Arc<GenericRemoteStorage>,
76 0 : cancel: &CancellationToken,
77 0 : ) -> anyhow::Result<HashSet<RemotePath>> {
78 0 : Ok(retry(|| client.list_files(None, None, cancel))
79 0 : .await
80 0 : .context("list root files failure")?
81 0 : .into_iter()
82 0 : .collect::<HashSet<_>>())
83 0 : }
84 :
85 0 : let cancel = CancellationToken::new();
86 :
87 0 : let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
88 0 : .with_context(|| "RemotePath conversion")?;
89 :
90 0 : let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
91 0 : .with_context(|| "RemotePath conversion")?;
92 :
93 0 : let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
94 0 : .with_context(|| "RemotePath conversion")?;
95 :
96 0 : retry(|| {
97 0 : let (data, len) = upload_stream("remote blob data1".as_bytes().into());
98 0 : ctx.client.upload(data, len, &path1, None, &cancel)
99 0 : })
100 0 : .await?;
101 :
102 0 : let t0_files = list_files(&ctx.client, &cancel).await?;
103 0 : let t0 = time_point().await;
104 0 : println!("at t0: {t0_files:?}");
105 0 :
106 0 : let old_data = "remote blob data2";
107 0 :
108 0 : retry(|| {
109 0 : let (data, len) = upload_stream(old_data.as_bytes().into());
110 0 : ctx.client.upload(data, len, &path2, None, &cancel)
111 0 : })
112 0 : .await?;
113 :
114 0 : let t1_files = list_files(&ctx.client, &cancel).await?;
115 0 : let t1 = time_point().await;
116 0 : println!("at t1: {t1_files:?}");
117 :
118 : // A little check to ensure that our clock is not too far off from the S3 clock
119 : {
120 0 : let dl = retry(|| ctx.client.download(&path2, &cancel)).await?;
121 0 : let last_modified = dl.last_modified;
122 0 : let half_wt = WAIT_TIME.mul_f32(0.5);
123 0 : let t0_hwt = t0 + half_wt;
124 0 : let t1_hwt = t1 - half_wt;
125 0 : if !(t0_hwt..=t1_hwt).contains(&last_modified) {
126 0 : panic!("last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \
127 0 : This likely means a large lock discrepancy between S3 and the local clock.");
128 0 : }
129 0 : }
130 0 :
131 0 : retry(|| {
132 0 : let (data, len) = upload_stream("remote blob data3".as_bytes().into());
133 0 : ctx.client.upload(data, len, &path3, None, &cancel)
134 0 : })
135 0 : .await?;
136 :
137 0 : let new_data = "new remote blob data2";
138 0 :
139 0 : retry(|| {
140 0 : let (data, len) = upload_stream(new_data.as_bytes().into());
141 0 : ctx.client.upload(data, len, &path2, None, &cancel)
142 0 : })
143 0 : .await?;
144 :
145 0 : retry(|| ctx.client.delete(&path1, &cancel)).await?;
146 0 : let t2_files = list_files(&ctx.client, &cancel).await?;
147 0 : let t2 = time_point().await;
148 0 : println!("at t2: {t2_files:?}");
149 :
150 : // No changes after recovery to t2 (no-op)
151 0 : let t_final = time_point().await;
152 0 : ctx.client
153 0 : .time_travel_recover(None, t2, t_final, &cancel)
154 0 : .await?;
155 0 : let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
156 0 : println!("after recovery to t2: {t2_files_recovered:?}");
157 0 : assert_eq!(t2_files, t2_files_recovered);
158 0 : let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
159 0 : assert_eq!(path2_recovered_t2, new_data.as_bytes());
160 :
161 : // after recovery to t1: path1 is back, path2 has the old content
162 0 : let t_final = time_point().await;
163 0 : ctx.client
164 0 : .time_travel_recover(None, t1, t_final, &cancel)
165 0 : .await?;
166 0 : let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
167 0 : println!("after recovery to t1: {t1_files_recovered:?}");
168 0 : assert_eq!(t1_files, t1_files_recovered);
169 0 : let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
170 0 : assert_eq!(path2_recovered_t1, old_data.as_bytes());
171 :
172 : // after recovery to t0: everything is gone except for path1
173 0 : let t_final = time_point().await;
174 0 : ctx.client
175 0 : .time_travel_recover(None, t0, t_final, &cancel)
176 0 : .await?;
177 0 : let t0_files_recovered = list_files(&ctx.client, &cancel).await?;
178 0 : println!("after recovery to t0: {t0_files_recovered:?}");
179 0 : assert_eq!(t0_files, t0_files_recovered);
180 :
181 : // cleanup
182 :
183 0 : let paths = &[path1, path2, path3];
184 0 : retry(|| ctx.client.delete_objects(paths, &cancel)).await?;
185 :
186 0 : Ok(())
187 2 : }
188 :
189 : struct EnabledS3 {
190 : client: Arc<GenericRemoteStorage>,
191 : base_prefix: &'static str,
192 : }
193 :
194 : impl EnabledS3 {
195 0 : async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
196 0 : let client = create_s3_client(max_keys_in_list_response)
197 0 : .context("S3 client creation")
198 0 : .expect("S3 client creation failed");
199 0 :
200 0 : EnabledS3 {
201 0 : client,
202 0 : base_prefix: BASE_PREFIX,
203 0 : }
204 0 : }
205 :
206 0 : fn configure_request_timeout(&mut self, timeout: Duration) {
207 0 : match Arc::get_mut(&mut self.client).expect("outer Arc::get_mut") {
208 0 : GenericRemoteStorage::AwsS3(s3) => {
209 0 : let s3 = Arc::get_mut(s3).expect("inner Arc::get_mut");
210 0 : s3.timeout = timeout;
211 0 : }
212 0 : _ => unreachable!(),
213 : }
214 0 : }
215 : }
216 :
217 : enum MaybeEnabledStorage {
218 : Enabled(EnabledS3),
219 : Disabled,
220 : }
221 :
222 : impl AsyncTestContext for MaybeEnabledStorage {
223 14 : async fn setup() -> Self {
224 14 : ensure_logging_ready();
225 14 :
226 14 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
227 14 : info!(
228 14 : "`{}` env variable is not set, skipping the test",
229 14 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
230 14 : );
231 14 : return Self::Disabled;
232 0 : }
233 0 :
234 0 : Self::Enabled(EnabledS3::setup(None).await)
235 14 : }
236 : }
237 :
238 : enum MaybeEnabledStorageWithTestBlobs {
239 : Enabled(S3WithTestBlobs),
240 : Disabled,
241 : UploadsFailed(anyhow::Error, S3WithTestBlobs),
242 : }
243 :
244 : struct S3WithTestBlobs {
245 : enabled: EnabledS3,
246 : remote_prefixes: HashSet<RemotePath>,
247 : remote_blobs: HashSet<RemotePath>,
248 : }
249 :
250 : impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
251 2 : async fn setup() -> Self {
252 2 : ensure_logging_ready();
253 2 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
254 2 : info!(
255 2 : "`{}` env variable is not set, skipping the test",
256 2 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
257 2 : );
258 2 : return Self::Disabled;
259 0 : }
260 0 :
261 0 : let max_keys_in_list_response = 10;
262 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
263 :
264 0 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
265 :
266 0 : match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
267 0 : ControlFlow::Continue(uploads) => {
268 0 : info!("Remote objects created successfully");
269 :
270 0 : Self::Enabled(S3WithTestBlobs {
271 0 : enabled,
272 0 : remote_prefixes: uploads.prefixes,
273 0 : remote_blobs: uploads.blobs,
274 0 : })
275 : }
276 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
277 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
278 0 : S3WithTestBlobs {
279 0 : enabled,
280 0 : remote_prefixes: uploads.prefixes,
281 0 : remote_blobs: uploads.blobs,
282 0 : },
283 0 : ),
284 : }
285 2 : }
286 :
287 2 : async fn teardown(self) {
288 2 : match self {
289 2 : Self::Disabled => {}
290 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
291 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
292 : }
293 : }
294 2 : }
295 : }
296 :
297 : // NOTE: the setups for the list_prefixes test and the list_files test are very similar
298 : // However, they are not idential. The list_prefixes function is concerned with listing prefixes,
299 : // whereas the list_files function is concerned with listing files.
300 : // See `RemoteStorage::list_files` documentation for more details
301 : enum MaybeEnabledStorageWithSimpleTestBlobs {
302 : Enabled(S3WithSimpleTestBlobs),
303 : Disabled,
304 : UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
305 : }
306 : struct S3WithSimpleTestBlobs {
307 : enabled: EnabledS3,
308 : remote_blobs: HashSet<RemotePath>,
309 : }
310 :
311 : impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
312 2 : async fn setup() -> Self {
313 2 : ensure_logging_ready();
314 2 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
315 2 : info!(
316 2 : "`{}` env variable is not set, skipping the test",
317 2 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
318 2 : );
319 2 : return Self::Disabled;
320 0 : }
321 0 :
322 0 : let max_keys_in_list_response = 10;
323 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
324 :
325 0 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
326 :
327 0 : match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
328 0 : ControlFlow::Continue(uploads) => {
329 0 : info!("Remote objects created successfully");
330 :
331 0 : Self::Enabled(S3WithSimpleTestBlobs {
332 0 : enabled,
333 0 : remote_blobs: uploads,
334 0 : })
335 : }
336 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
337 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
338 0 : S3WithSimpleTestBlobs {
339 0 : enabled,
340 0 : remote_blobs: uploads,
341 0 : },
342 0 : ),
343 : }
344 2 : }
345 :
346 2 : async fn teardown(self) {
347 2 : match self {
348 2 : Self::Disabled => {}
349 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
350 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
351 : }
352 : }
353 2 : }
354 : }
355 :
356 0 : fn create_s3_client(
357 0 : max_keys_per_list_response: Option<i32>,
358 0 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
359 : use rand::Rng;
360 :
361 0 : let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
362 0 : .context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
363 0 : let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
364 0 : .context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?;
365 :
366 : // due to how time works, we've had test runners use the same nanos as bucket prefixes.
367 : // millis is just a debugging aid for easier finding the prefix later.
368 0 : let millis = std::time::SystemTime::now()
369 0 : .duration_since(UNIX_EPOCH)
370 0 : .context("random s3 test prefix part calculation")?
371 0 : .as_millis();
372 0 :
373 0 : // because nanos can be the same for two threads so can millis, add randomness
374 0 : let random = rand::thread_rng().gen::<u32>();
375 0 :
376 0 : let remote_storage_config = RemoteStorageConfig {
377 0 : storage: RemoteStorageKind::AwsS3(S3Config {
378 0 : bucket_name: remote_storage_s3_bucket,
379 0 : bucket_region: remote_storage_s3_region,
380 0 : prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
381 0 : endpoint: None,
382 0 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
383 0 : max_keys_per_list_response,
384 0 : }),
385 0 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
386 0 : };
387 0 : Ok(Arc::new(
388 0 : GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
389 : ))
390 0 : }
391 :
392 2 : #[test_context(MaybeEnabledStorage)]
393 : #[tokio::test]
394 4 : async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
395 2 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
396 2 : return;
397 : };
398 :
399 0 : let cancel = CancellationToken::new();
400 0 :
401 0 : let path = RemotePath::new(Utf8Path::new(
402 0 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
403 0 : ))
404 0 : .unwrap();
405 :
406 0 : let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
407 :
408 0 : let timeout = std::time::Duration::from_secs(5);
409 0 :
410 0 : ctx.configure_request_timeout(timeout);
411 0 :
412 0 : let started_at = std::time::Instant::now();
413 0 : let mut stream = ctx
414 0 : .client
415 0 : .download(&path, &cancel)
416 0 : .await
417 0 : .expect("download succeeds")
418 0 : .download_stream;
419 0 :
420 0 : if started_at.elapsed().mul_f32(0.9) >= timeout {
421 0 : tracing::warn!(
422 0 : elapsed_ms = started_at.elapsed().as_millis(),
423 0 : "timeout might be too low, consumed most of it during headers"
424 0 : );
425 0 : }
426 :
427 0 : let first = stream
428 0 : .next()
429 0 : .await
430 0 : .expect("should have the first blob")
431 0 : .expect("should have succeeded");
432 0 :
433 0 : tracing::info!(len = first.len(), "downloaded first chunk");
434 :
435 0 : assert!(
436 0 : first.len() < len,
437 0 : "uploaded file is too small, we downloaded all on first chunk"
438 : );
439 :
440 0 : tokio::time::sleep(timeout).await;
441 :
442 : {
443 0 : let started_at = std::time::Instant::now();
444 0 : let next = stream
445 0 : .next()
446 0 : .await
447 0 : .expect("stream should not have ended yet");
448 0 :
449 0 : tracing::info!(
450 0 : next.is_err = next.is_err(),
451 0 : elapsed_ms = started_at.elapsed().as_millis(),
452 0 : "received item after timeout"
453 0 : );
454 :
455 0 : let e = next.expect_err("expected an error, but got a chunk?");
456 0 :
457 0 : let inner = e.get_ref().expect("std::io::Error::inner should be set");
458 0 : assert!(
459 0 : inner
460 0 : .downcast_ref::<DownloadError>()
461 0 : .is_some_and(|e| matches!(e, DownloadError::Timeout)),
462 0 : "{inner:?}"
463 : );
464 : }
465 :
466 0 : ctx.configure_request_timeout(RemoteStorageConfig::DEFAULT_TIMEOUT);
467 0 :
468 0 : ctx.client.delete_objects(&[path], &cancel).await.unwrap()
469 2 : }
470 :
471 2 : #[test_context(MaybeEnabledStorage)]
472 : #[tokio::test]
473 4 : async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
474 2 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
475 2 : return;
476 : };
477 :
478 0 : let cancel = CancellationToken::new();
479 0 :
480 0 : let path = RemotePath::new(Utf8Path::new(
481 0 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
482 0 : ))
483 0 : .unwrap();
484 :
485 0 : let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
486 :
487 : {
488 0 : let stream = ctx
489 0 : .client
490 0 : .download(&path, &cancel)
491 0 : .await
492 0 : .expect("download succeeds")
493 0 : .download_stream;
494 0 :
495 0 : let mut reader = std::pin::pin!(tokio_util::io::StreamReader::new(stream));
496 :
497 0 : let first = reader.fill_buf().await.expect("should have the first blob");
498 0 :
499 0 : let len = first.len();
500 0 : tracing::info!(len, "downloaded first chunk");
501 :
502 0 : assert!(
503 0 : first.len() < file_len,
504 0 : "uploaded file is too small, we downloaded all on first chunk"
505 : );
506 :
507 0 : reader.consume(len);
508 0 :
509 0 : cancel.cancel();
510 :
511 0 : let next = reader.fill_buf().await;
512 :
513 0 : let e = next.expect_err("expected an error, but got a chunk?");
514 0 :
515 0 : let inner = e.get_ref().expect("std::io::Error::inner should be set");
516 0 : assert!(
517 0 : inner
518 0 : .downcast_ref::<DownloadError>()
519 0 : .is_some_and(|e| matches!(e, DownloadError::Cancelled)),
520 0 : "{inner:?}"
521 : );
522 :
523 0 : let e = DownloadError::from(e);
524 0 :
525 0 : assert!(matches!(e, DownloadError::Cancelled), "{e:?}");
526 : }
527 :
528 0 : let cancel = CancellationToken::new();
529 0 :
530 0 : ctx.client.delete_objects(&[path], &cancel).await.unwrap();
531 2 : }
532 :
533 : /// Upload a long enough file so that we cannot download it in single chunk
534 : ///
535 : /// For s3 the first chunk seems to be less than 10kB, so this has a bit of a safety margin
536 0 : async fn upload_large_enough_file(
537 0 : client: &GenericRemoteStorage,
538 0 : path: &RemotePath,
539 0 : cancel: &CancellationToken,
540 0 : ) -> usize {
541 0 : let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
542 0 : let body = bytes::Bytes::from(vec![0u8; 1024]);
543 0 : let contents = std::iter::once(header).chain(std::iter::repeat(body).take(128));
544 0 :
545 0 : let len = contents.clone().fold(0, |acc, next| acc + next.len());
546 0 :
547 0 : let contents = futures::stream::iter(contents.map(std::io::Result::Ok));
548 0 :
549 0 : client
550 0 : .upload(contents, len, path, None, cancel)
551 0 : .await
552 0 : .expect("upload succeeds");
553 0 :
554 0 : len
555 0 : }
|