Line data Source code
1 : use std::env;
2 : use std::fmt::{Debug, Display};
3 : use std::future::Future;
4 : use std::num::NonZeroUsize;
5 : use std::ops::ControlFlow;
6 : use std::sync::Arc;
7 : use std::time::{Duration, UNIX_EPOCH};
8 : use std::{collections::HashSet, time::SystemTime};
9 :
10 : use crate::common::{download_to_vec, upload_stream};
11 : use anyhow::Context;
12 : use camino::Utf8Path;
13 : use futures_util::StreamExt;
14 : use remote_storage::{
15 : DownloadError, GenericRemoteStorage, ListingMode, RemotePath, RemoteStorageConfig,
16 : RemoteStorageKind, S3Config,
17 : };
18 : use test_context::test_context;
19 : use test_context::AsyncTestContext;
20 : use tokio::io::AsyncBufReadExt;
21 : use tokio_util::sync::CancellationToken;
22 : use tracing::info;
23 :
24 : mod common;
25 :
26 : #[path = "common/tests.rs"]
27 : mod tests_s3;
28 :
29 : use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
30 : use utils::backoff;
31 :
32 : const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
33 : const BASE_PREFIX: &str = "test";
34 :
35 4 : #[test_context(MaybeEnabledStorage)]
36 : #[tokio::test]
37 4 : async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
38 4 : let ctx = match ctx {
39 2 : MaybeEnabledStorage::Enabled(ctx) => ctx,
40 2 : MaybeEnabledStorage::Disabled => return Ok(()),
41 : };
42 : // Our test depends on discrepancies in the clock between S3 and the environment the tests
43 : // run in. Therefore, wait a little bit before and after. The alternative would be
44 : // to take the time from S3 response headers.
45 : const WAIT_TIME: Duration = Duration::from_millis(3_000);
46 :
47 26 : async fn retry<T, O, F, E>(op: O) -> Result<T, E>
48 26 : where
49 26 : E: Display + Debug + 'static,
50 26 : O: FnMut() -> F,
51 26 : F: Future<Output = Result<T, E>>,
52 26 : {
53 26 : let warn_threshold = 3;
54 26 : let max_retries = 10;
55 26 : backoff::retry(
56 26 : op,
57 26 : |_e| false,
58 26 : warn_threshold,
59 26 : max_retries,
60 26 : "test retry",
61 26 : &CancellationToken::new(),
62 26 : )
63 81 : .await
64 26 : .expect("never cancelled")
65 26 : }
66 :
67 12 : async fn time_point() -> SystemTime {
68 12 : tokio::time::sleep(WAIT_TIME).await;
69 12 : let ret = SystemTime::now();
70 12 : tokio::time::sleep(WAIT_TIME).await;
71 12 : ret
72 12 : }
73 :
74 12 : async fn list_files(
75 12 : client: &Arc<GenericRemoteStorage>,
76 12 : cancel: &CancellationToken,
77 12 : ) -> anyhow::Result<HashSet<RemotePath>> {
78 12 : Ok(
79 12 : retry(|| client.list(None, ListingMode::NoDelimiter, None, cancel))
80 49 : .await
81 12 : .context("list root files failure")?
82 : .keys
83 12 : .into_iter()
84 12 : .collect::<HashSet<_>>(),
85 : )
86 12 : }
87 :
88 2 : let cancel = CancellationToken::new();
89 :
90 2 : let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
91 2 : .with_context(|| "RemotePath conversion")?;
92 :
93 2 : let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
94 2 : .with_context(|| "RemotePath conversion")?;
95 :
96 2 : let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
97 2 : .with_context(|| "RemotePath conversion")?;
98 :
99 2 : retry(|| {
100 2 : let (data, len) = upload_stream("remote blob data1".as_bytes().into());
101 2 : ctx.client.upload(data, len, &path1, None, &cancel)
102 2 : })
103 16 : .await?;
104 :
105 5 : let t0_files = list_files(&ctx.client, &cancel).await?;
106 4 : let t0 = time_point().await;
107 2 : println!("at t0: {t0_files:?}");
108 2 :
109 2 : let old_data = "remote blob data2";
110 2 :
111 2 : retry(|| {
112 2 : let (data, len) = upload_stream(old_data.as_bytes().into());
113 2 : ctx.client.upload(data, len, &path2, None, &cancel)
114 2 : })
115 2 : .await?;
116 :
117 4 : let t1_files = list_files(&ctx.client, &cancel).await?;
118 4 : let t1 = time_point().await;
119 2 : println!("at t1: {t1_files:?}");
120 :
121 : // A little check to ensure that our clock is not too far off from the S3 clock
122 : {
123 2 : let dl = retry(|| ctx.client.download(&path2, &cancel)).await?;
124 2 : let last_modified = dl.last_modified;
125 2 : let half_wt = WAIT_TIME.mul_f32(0.5);
126 2 : let t0_hwt = t0 + half_wt;
127 2 : let t1_hwt = t1 - half_wt;
128 2 : if !(t0_hwt..=t1_hwt).contains(&last_modified) {
129 0 : panic!("last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \
130 0 : This likely means a large lock discrepancy between S3 and the local clock.");
131 2 : }
132 2 : }
133 2 :
134 2 : retry(|| {
135 2 : let (data, len) = upload_stream("remote blob data3".as_bytes().into());
136 2 : ctx.client.upload(data, len, &path3, None, &cancel)
137 2 : })
138 2 : .await?;
139 :
140 2 : let new_data = "new remote blob data2";
141 2 :
142 2 : retry(|| {
143 2 : let (data, len) = upload_stream(new_data.as_bytes().into());
144 2 : ctx.client.upload(data, len, &path2, None, &cancel)
145 2 : })
146 2 : .await?;
147 :
148 4 : retry(|| ctx.client.delete(&path1, &cancel)).await?;
149 16 : let t2_files = list_files(&ctx.client, &cancel).await?;
150 4 : let t2 = time_point().await;
151 2 : println!("at t2: {t2_files:?}");
152 :
153 : // No changes after recovery to t2 (no-op)
154 4 : let t_final = time_point().await;
155 2 : ctx.client
156 2 : .time_travel_recover(None, t2, t_final, &cancel)
157 17 : .await?;
158 4 : let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
159 2 : println!("after recovery to t2: {t2_files_recovered:?}");
160 2 : assert_eq!(t2_files, t2_files_recovered);
161 2 : let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
162 2 : assert_eq!(path2_recovered_t2, new_data.as_bytes());
163 :
164 : // after recovery to t1: path1 is back, path2 has the old content
165 4 : let t_final = time_point().await;
166 2 : ctx.client
167 2 : .time_travel_recover(None, t1, t_final, &cancel)
168 36 : .await?;
169 10 : let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
170 2 : println!("after recovery to t1: {t1_files_recovered:?}");
171 2 : assert_eq!(t1_files, t1_files_recovered);
172 2 : let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2, &cancel).await?).await?;
173 2 : assert_eq!(path2_recovered_t1, old_data.as_bytes());
174 :
175 : // after recovery to t0: everything is gone except for path1
176 4 : let t_final = time_point().await;
177 2 : ctx.client
178 2 : .time_travel_recover(None, t0, t_final, &cancel)
179 23 : .await?;
180 10 : let t0_files_recovered = list_files(&ctx.client, &cancel).await?;
181 2 : println!("after recovery to t0: {t0_files_recovered:?}");
182 2 : assert_eq!(t0_files, t0_files_recovered);
183 :
184 : // cleanup
185 :
186 2 : let paths = &[path1, path2, path3];
187 4 : retry(|| ctx.client.delete_objects(paths, &cancel)).await?;
188 :
189 2 : Ok(())
190 4 : }
191 :
192 : struct EnabledS3 {
193 : client: Arc<GenericRemoteStorage>,
194 : base_prefix: &'static str,
195 : }
196 :
197 : impl EnabledS3 {
198 18 : async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
199 18 : let client = create_s3_client(max_keys_in_list_response)
200 18 : .context("S3 client creation")
201 18 : .expect("S3 client creation failed");
202 18 :
203 18 : EnabledS3 {
204 18 : client,
205 18 : base_prefix: BASE_PREFIX,
206 18 : }
207 18 : }
208 :
209 4 : fn configure_request_timeout(&mut self, timeout: Duration) {
210 4 : match Arc::get_mut(&mut self.client).expect("outer Arc::get_mut") {
211 4 : GenericRemoteStorage::AwsS3(s3) => {
212 4 : let s3 = Arc::get_mut(s3).expect("inner Arc::get_mut");
213 4 : s3.timeout = timeout;
214 4 : }
215 0 : _ => unreachable!(),
216 : }
217 4 : }
218 : }
219 :
220 : enum MaybeEnabledStorage {
221 : Enabled(EnabledS3),
222 : Disabled,
223 : }
224 :
225 : impl AsyncTestContext for MaybeEnabledStorage {
226 28 : async fn setup() -> Self {
227 28 : ensure_logging_ready();
228 28 :
229 28 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
230 14 : info!(
231 0 : "`{}` env variable is not set, skipping the test",
232 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
233 : );
234 14 : return Self::Disabled;
235 14 : }
236 14 :
237 14 : Self::Enabled(EnabledS3::setup(None).await)
238 28 : }
239 : }
240 :
241 : enum MaybeEnabledStorageWithTestBlobs {
242 : Enabled(S3WithTestBlobs),
243 : Disabled,
244 : UploadsFailed(anyhow::Error, S3WithTestBlobs),
245 : }
246 :
247 : struct S3WithTestBlobs {
248 : enabled: EnabledS3,
249 : remote_prefixes: HashSet<RemotePath>,
250 : remote_blobs: HashSet<RemotePath>,
251 : }
252 :
253 : impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs {
254 4 : async fn setup() -> Self {
255 4 : ensure_logging_ready();
256 4 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
257 2 : info!(
258 0 : "`{}` env variable is not set, skipping the test",
259 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
260 : );
261 2 : return Self::Disabled;
262 2 : }
263 2 :
264 2 : let max_keys_in_list_response = 10;
265 2 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
266 :
267 2 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
268 :
269 40 : match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
270 2 : ControlFlow::Continue(uploads) => {
271 2 : info!("Remote objects created successfully");
272 :
273 2 : Self::Enabled(S3WithTestBlobs {
274 2 : enabled,
275 2 : remote_prefixes: uploads.prefixes,
276 2 : remote_blobs: uploads.blobs,
277 2 : })
278 : }
279 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
280 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
281 0 : S3WithTestBlobs {
282 0 : enabled,
283 0 : remote_prefixes: uploads.prefixes,
284 0 : remote_blobs: uploads.blobs,
285 0 : },
286 0 : ),
287 : }
288 4 : }
289 :
290 4 : async fn teardown(self) {
291 4 : match self {
292 2 : Self::Disabled => {}
293 2 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
294 11 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
295 : }
296 : }
297 4 : }
298 : }
299 :
300 : enum MaybeEnabledStorageWithSimpleTestBlobs {
301 : Enabled(S3WithSimpleTestBlobs),
302 : Disabled,
303 : UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
304 : }
305 : struct S3WithSimpleTestBlobs {
306 : enabled: EnabledS3,
307 : remote_blobs: HashSet<RemotePath>,
308 : }
309 :
310 : impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
311 4 : async fn setup() -> Self {
312 4 : ensure_logging_ready();
313 4 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
314 2 : info!(
315 0 : "`{}` env variable is not set, skipping the test",
316 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
317 : );
318 2 : return Self::Disabled;
319 2 : }
320 2 :
321 2 : let max_keys_in_list_response = 10;
322 2 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
323 :
324 2 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
325 :
326 39 : match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
327 2 : ControlFlow::Continue(uploads) => {
328 2 : info!("Remote objects created successfully");
329 :
330 2 : Self::Enabled(S3WithSimpleTestBlobs {
331 2 : enabled,
332 2 : remote_blobs: uploads,
333 2 : })
334 : }
335 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
336 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
337 0 : S3WithSimpleTestBlobs {
338 0 : enabled,
339 0 : remote_blobs: uploads,
340 0 : },
341 0 : ),
342 : }
343 4 : }
344 :
345 4 : async fn teardown(self) {
346 4 : match self {
347 2 : Self::Disabled => {}
348 2 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
349 19 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
350 : }
351 : }
352 4 : }
353 : }
354 :
355 18 : fn create_s3_client(
356 18 : max_keys_per_list_response: Option<i32>,
357 18 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
358 : use rand::Rng;
359 :
360 18 : let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
361 18 : .context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
362 18 : let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
363 18 : .context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?;
364 :
365 : // due to how time works, we've had test runners use the same nanos as bucket prefixes.
366 : // millis is just a debugging aid for easier finding the prefix later.
367 18 : let millis = std::time::SystemTime::now()
368 18 : .duration_since(UNIX_EPOCH)
369 18 : .context("random s3 test prefix part calculation")?
370 18 : .as_millis();
371 18 :
372 18 : // because nanos can be the same for two threads so can millis, add randomness
373 18 : let random = rand::thread_rng().gen::<u32>();
374 18 :
375 18 : let remote_storage_config = RemoteStorageConfig {
376 18 : storage: RemoteStorageKind::AwsS3(S3Config {
377 18 : bucket_name: remote_storage_s3_bucket,
378 18 : bucket_region: remote_storage_s3_region,
379 18 : prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
380 18 : endpoint: None,
381 18 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
382 18 : max_keys_per_list_response,
383 18 : upload_storage_class: None,
384 18 : }),
385 18 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
386 18 : };
387 18 : Ok(Arc::new(
388 18 : GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
389 : ))
390 18 : }
391 :
392 4 : #[test_context(MaybeEnabledStorage)]
393 : #[tokio::test]
394 4 : async fn download_is_timeouted(ctx: &mut MaybeEnabledStorage) {
395 4 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
396 2 : return;
397 : };
398 :
399 2 : let cancel = CancellationToken::new();
400 2 :
401 2 : let path = RemotePath::new(Utf8Path::new(
402 2 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
403 2 : ))
404 2 : .unwrap();
405 :
406 15 : let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
407 :
408 2 : let timeout = std::time::Duration::from_secs(5);
409 2 :
410 2 : ctx.configure_request_timeout(timeout);
411 2 :
412 2 : let started_at = std::time::Instant::now();
413 2 : let mut stream = ctx
414 2 : .client
415 2 : .download(&path, &cancel)
416 2 : .await
417 2 : .expect("download succeeds")
418 2 : .download_stream;
419 2 :
420 2 : if started_at.elapsed().mul_f32(0.9) >= timeout {
421 0 : tracing::warn!(
422 0 : elapsed_ms = started_at.elapsed().as_millis(),
423 0 : "timeout might be too low, consumed most of it during headers"
424 : );
425 2 : }
426 :
427 2 : let first = stream
428 2 : .next()
429 2 : .await
430 2 : .expect("should have the first blob")
431 2 : .expect("should have succeeded");
432 2 :
433 2 : tracing::info!(len = first.len(), "downloaded first chunk");
434 :
435 2 : assert!(
436 2 : first.len() < len,
437 0 : "uploaded file is too small, we downloaded all on first chunk"
438 : );
439 :
440 6 : tokio::time::sleep(timeout).await;
441 :
442 : {
443 2 : let started_at = std::time::Instant::now();
444 2 : let next = stream
445 2 : .next()
446 0 : .await
447 2 : .expect("stream should not have ended yet");
448 2 :
449 2 : tracing::info!(
450 0 : next.is_err = next.is_err(),
451 0 : elapsed_ms = started_at.elapsed().as_millis(),
452 0 : "received item after timeout"
453 : );
454 :
455 2 : let e = next.expect_err("expected an error, but got a chunk?");
456 2 :
457 2 : let inner = e.get_ref().expect("std::io::Error::inner should be set");
458 2 : assert!(
459 2 : inner
460 2 : .downcast_ref::<DownloadError>()
461 2 : .is_some_and(|e| matches!(e, DownloadError::Timeout)),
462 0 : "{inner:?}"
463 : );
464 : }
465 :
466 2 : ctx.configure_request_timeout(RemoteStorageConfig::DEFAULT_TIMEOUT);
467 2 :
468 17 : ctx.client.delete_objects(&[path], &cancel).await.unwrap()
469 4 : }
470 :
471 4 : #[test_context(MaybeEnabledStorage)]
472 : #[tokio::test]
473 4 : async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
474 4 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
475 2 : return;
476 : };
477 :
478 2 : let cancel = CancellationToken::new();
479 2 :
480 2 : let path = RemotePath::new(Utf8Path::new(
481 2 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
482 2 : ))
483 2 : .unwrap();
484 :
485 15 : let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
486 :
487 : {
488 2 : let stream = ctx
489 2 : .client
490 2 : .download(&path, &cancel)
491 2 : .await
492 2 : .expect("download succeeds")
493 2 : .download_stream;
494 2 :
495 2 : let mut reader = std::pin::pin!(tokio_util::io::StreamReader::new(stream));
496 :
497 2 : let first = reader.fill_buf().await.expect("should have the first blob");
498 2 :
499 2 : let len = first.len();
500 2 : tracing::info!(len, "downloaded first chunk");
501 :
502 2 : assert!(
503 2 : first.len() < file_len,
504 0 : "uploaded file is too small, we downloaded all on first chunk"
505 : );
506 :
507 2 : reader.consume(len);
508 2 :
509 2 : cancel.cancel();
510 :
511 2 : let next = reader.fill_buf().await;
512 :
513 2 : let e = next.expect_err("expected an error, but got a chunk?");
514 2 :
515 2 : let inner = e.get_ref().expect("std::io::Error::inner should be set");
516 2 : assert!(
517 2 : inner
518 2 : .downcast_ref::<DownloadError>()
519 2 : .is_some_and(|e| matches!(e, DownloadError::Cancelled)),
520 0 : "{inner:?}"
521 : );
522 :
523 2 : let e = DownloadError::from(e);
524 2 :
525 2 : assert!(matches!(e, DownloadError::Cancelled), "{e:?}");
526 : }
527 :
528 2 : let cancel = CancellationToken::new();
529 2 :
530 19 : ctx.client.delete_objects(&[path], &cancel).await.unwrap();
531 4 : }
532 :
533 : /// Upload a long enough file so that we cannot download it in single chunk
534 : ///
535 : /// For s3 the first chunk seems to be less than 10kB, so this has a bit of a safety margin
536 4 : async fn upload_large_enough_file(
537 4 : client: &GenericRemoteStorage,
538 4 : path: &RemotePath,
539 4 : cancel: &CancellationToken,
540 4 : ) -> usize {
541 4 : let header = bytes::Bytes::from_static("remote blob data content".as_bytes());
542 4 : let body = bytes::Bytes::from(vec![0u8; 1024]);
543 4 : let contents = std::iter::once(header).chain(std::iter::repeat(body).take(128));
544 4 :
545 516 : let len = contents.clone().fold(0, |acc, next| acc + next.len());
546 4 :
547 4 : let contents = futures::stream::iter(contents.map(std::io::Result::Ok));
548 4 :
549 4 : client
550 4 : .upload(contents, len, path, None, cancel)
551 30 : .await
552 4 : .expect("upload succeeds");
553 4 :
554 4 : len
555 4 : }
|