TLA Line data Source code
1 : use std::collections::HashSet;
2 : use std::env;
3 : use std::num::NonZeroUsize;
4 : use std::ops::ControlFlow;
5 : use std::sync::Arc;
6 : use std::time::UNIX_EPOCH;
7 :
8 : use anyhow::Context;
9 : use camino::Utf8Path;
10 : use remote_storage::{
11 : GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
12 : };
13 : use test_context::{test_context, AsyncTestContext};
14 : use tracing::{debug, info};
15 :
16 : mod common;
17 :
18 : use common::{
19 : cleanup, download_to_vec, ensure_logging_ready, upload_remote_data, upload_simple_remote_data,
20 : upload_stream, wrap_stream,
21 : };
22 :
23 : const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
24 :
25 : const BASE_PREFIX: &str = "test";
26 :
27 : /// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries.
28 : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified.
29 : /// See the client creation in [`create_s3_client`] for details on the required env vars.
30 : /// If real S3 tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the
31 : /// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details.
32 : ///
33 : /// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_remote_data`]
34 : /// where
35 : /// * `random_prefix_part` is set for the entire S3 client during the S3 client creation in [`create_s3_client`], to avoid multiple test runs interference
36 : /// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket
37 : ///
38 : /// Then, verifies that the client does return correct prefixes when queried:
39 : /// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only
40 : /// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}`
41 : ///
42 : /// With the real S3 enabled and `#[cfg(test)]` Rust configuration used, the S3 client test adds a `max-keys` param to limit the response keys.
43 : /// This way, we are able to test the pagination implicitly, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to S3,
44 : /// since current default AWS S3 pagination limit is 1000.
45 : /// (see https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax)
46 : ///
47 : /// Lastly, the test attempts to clean up and remove all uploaded S3 files.
48 : /// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished.
49 CBC 1 : #[test_context(MaybeEnabledS3WithTestBlobs)]
50 1 : #[tokio::test]
51 1 : async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3WithTestBlobs) -> anyhow::Result<()> {
52 1 : let ctx = match ctx {
53 UBC 0 : MaybeEnabledS3WithTestBlobs::Enabled(ctx) => ctx,
54 CBC 1 : MaybeEnabledS3WithTestBlobs::Disabled => return Ok(()),
55 UBC 0 : MaybeEnabledS3WithTestBlobs::UploadsFailed(e, _) => anyhow::bail!("S3 init failed: {e:?}"),
56 : };
57 :
58 0 : let test_client = Arc::clone(&ctx.enabled.client);
59 0 : let expected_remote_prefixes = ctx.remote_prefixes.clone();
60 :
61 0 : let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix))
62 0 : .context("common_prefix construction")?;
63 0 : let root_remote_prefixes = test_client
64 0 : .list_prefixes(None)
65 0 : .await
66 0 : .context("client list root prefixes failure")?
67 0 : .into_iter()
68 0 : .collect::<HashSet<_>>();
69 0 : assert_eq!(
70 0 : root_remote_prefixes, HashSet::from([base_prefix.clone()]),
71 0 : "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}"
72 : );
73 :
74 0 : let nested_remote_prefixes = test_client
75 0 : .list_prefixes(Some(&base_prefix))
76 0 : .await
77 0 : .context("client list nested prefixes failure")?
78 0 : .into_iter()
79 0 : .collect::<HashSet<_>>();
80 0 : let remote_only_prefixes = nested_remote_prefixes
81 0 : .difference(&expected_remote_prefixes)
82 0 : .collect::<HashSet<_>>();
83 0 : let missing_uploaded_prefixes = expected_remote_prefixes
84 0 : .difference(&nested_remote_prefixes)
85 0 : .collect::<HashSet<_>>();
86 0 : assert_eq!(
87 0 : remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
88 0 : "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
89 : );
90 :
91 0 : Ok(())
92 CBC 1 : }
93 :
94 : /// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries.
95 : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set.
96 : /// See `s3_pagination_should_work` for more information.
97 : ///
98 : /// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`]
99 : /// Then performs the following queries:
100 : /// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
101 : /// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt`
102 1 : #[test_context(MaybeEnabledS3WithSimpleTestBlobs)]
103 1 : #[tokio::test]
104 1 : async fn s3_list_files_works(ctx: &mut MaybeEnabledS3WithSimpleTestBlobs) -> anyhow::Result<()> {
105 1 : let ctx = match ctx {
106 UBC 0 : MaybeEnabledS3WithSimpleTestBlobs::Enabled(ctx) => ctx,
107 CBC 1 : MaybeEnabledS3WithSimpleTestBlobs::Disabled => return Ok(()),
108 UBC 0 : MaybeEnabledS3WithSimpleTestBlobs::UploadsFailed(e, _) => {
109 0 : anyhow::bail!("S3 init failed: {e:?}")
110 : }
111 : };
112 0 : let test_client = Arc::clone(&ctx.enabled.client);
113 0 : let base_prefix =
114 0 : RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?;
115 0 : let root_files = test_client
116 0 : .list_files(None)
117 0 : .await
118 0 : .context("client list root files failure")?
119 0 : .into_iter()
120 0 : .collect::<HashSet<_>>();
121 0 : assert_eq!(
122 0 : root_files,
123 0 : ctx.remote_blobs.clone(),
124 0 : "remote storage list_files on root mismatches with the uploads."
125 : );
126 0 : let nested_remote_files = test_client
127 0 : .list_files(Some(&base_prefix))
128 0 : .await
129 0 : .context("client list nested files failure")?
130 0 : .into_iter()
131 0 : .collect::<HashSet<_>>();
132 0 : let trim_remote_blobs: HashSet<_> = ctx
133 0 : .remote_blobs
134 0 : .iter()
135 0 : .map(|x| x.get_path())
136 0 : .filter(|x| x.starts_with("folder1"))
137 0 : .map(|x| RemotePath::new(x).expect("must be valid path"))
138 0 : .collect();
139 : assert_eq!(
140 : nested_remote_files, trim_remote_blobs,
141 0 : "remote storage list_files on subdirrectory mismatches with the uploads."
142 : );
143 0 : Ok(())
144 CBC 1 : }
145 :
146 1 : #[test_context(MaybeEnabledS3)]
147 1 : #[tokio::test]
148 1 : async fn s3_delete_non_exising_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> {
149 1 : let ctx = match ctx {
150 UBC 0 : MaybeEnabledS3::Enabled(ctx) => ctx,
151 CBC 1 : MaybeEnabledS3::Disabled => return Ok(()),
152 : };
153 :
154 UBC 0 : let path = RemotePath::new(Utf8Path::new(
155 0 : format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(),
156 0 : ))
157 0 : .with_context(|| "RemotePath conversion")?;
158 :
159 0 : ctx.client.delete(&path).await.expect("should succeed");
160 0 :
161 0 : Ok(())
162 CBC 1 : }
163 :
164 1 : #[test_context(MaybeEnabledS3)]
165 1 : #[tokio::test]
166 1 : async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> {
167 1 : let ctx = match ctx {
168 UBC 0 : MaybeEnabledS3::Enabled(ctx) => ctx,
169 CBC 1 : MaybeEnabledS3::Disabled => return Ok(()),
170 : };
171 :
172 UBC 0 : let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
173 0 : .with_context(|| "RemotePath conversion")?;
174 :
175 0 : let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
176 0 : .with_context(|| "RemotePath conversion")?;
177 :
178 0 : let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
179 0 : .with_context(|| "RemotePath conversion")?;
180 :
181 0 : let (data, len) = upload_stream("remote blob data1".as_bytes().into());
182 0 : ctx.client.upload(data, len, &path1, None).await?;
183 :
184 0 : let (data, len) = upload_stream("remote blob data2".as_bytes().into());
185 0 : ctx.client.upload(data, len, &path2, None).await?;
186 :
187 0 : let (data, len) = upload_stream("remote blob data3".as_bytes().into());
188 0 : ctx.client.upload(data, len, &path3, None).await?;
189 :
190 0 : ctx.client.delete_objects(&[path1, path2]).await?;
191 :
192 0 : let prefixes = ctx.client.list_prefixes(None).await?;
193 :
194 0 : assert_eq!(prefixes.len(), 1);
195 :
196 0 : ctx.client.delete_objects(&[path3]).await?;
197 :
198 0 : Ok(())
199 CBC 1 : }
200 :
201 1 : #[test_context(MaybeEnabledS3)]
202 1 : #[tokio::test]
203 1 : async fn s3_upload_download_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> {
204 1 : let MaybeEnabledS3::Enabled(ctx) = ctx else {
205 1 : return Ok(());
206 : };
207 :
208 UBC 0 : let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))
209 0 : .with_context(|| "RemotePath conversion")?;
210 :
211 0 : let orig = bytes::Bytes::from_static("remote blob data here".as_bytes());
212 0 :
213 0 : let (data, len) = wrap_stream(orig.clone());
214 0 :
215 0 : ctx.client.upload(data, len, &path, None).await?;
216 :
217 : // Normal download request
218 0 : let dl = ctx.client.download(&path).await?;
219 0 : let buf = download_to_vec(dl).await?;
220 0 : assert_eq!(&buf, &orig);
221 :
222 : // Full range (end specified)
223 0 : let dl = ctx
224 0 : .client
225 0 : .download_byte_range(&path, 0, Some(len as u64))
226 0 : .await?;
227 0 : let buf = download_to_vec(dl).await?;
228 0 : assert_eq!(&buf, &orig);
229 :
230 : // partial range (end specified)
231 0 : let dl = ctx.client.download_byte_range(&path, 4, Some(10)).await?;
232 0 : let buf = download_to_vec(dl).await?;
233 0 : assert_eq!(&buf, &orig[4..10]);
234 :
235 : // partial range (end beyond real end)
236 0 : let dl = ctx
237 0 : .client
238 0 : .download_byte_range(&path, 8, Some(len as u64 * 100))
239 0 : .await?;
240 0 : let buf = download_to_vec(dl).await?;
241 0 : assert_eq!(&buf, &orig[8..]);
242 :
243 : // Partial range (end unspecified)
244 0 : let dl = ctx.client.download_byte_range(&path, 4, None).await?;
245 0 : let buf = download_to_vec(dl).await?;
246 0 : assert_eq!(&buf, &orig[4..]);
247 :
248 : // Full range (end unspecified)
249 0 : let dl = ctx.client.download_byte_range(&path, 0, None).await?;
250 0 : let buf = download_to_vec(dl).await?;
251 0 : assert_eq!(&buf, &orig);
252 :
253 0 : debug!("Cleanup: deleting file at path {path:?}");
254 0 : ctx.client
255 0 : .delete(&path)
256 0 : .await
257 0 : .with_context(|| format!("{path:?} removal"))?;
258 :
259 0 : Ok(())
260 CBC 1 : }
261 :
262 : struct EnabledS3 {
263 : client: Arc<GenericRemoteStorage>,
264 : base_prefix: &'static str,
265 : }
266 :
267 : impl EnabledS3 {
268 UBC 0 : async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
269 0 : let client = create_s3_client(max_keys_in_list_response)
270 0 : .context("S3 client creation")
271 0 : .expect("S3 client creation failed");
272 0 :
273 0 : EnabledS3 {
274 0 : client,
275 0 : base_prefix: BASE_PREFIX,
276 0 : }
277 0 : }
278 : }
279 :
280 : enum MaybeEnabledS3 {
281 : Enabled(EnabledS3),
282 : Disabled,
283 : }
284 :
285 : #[async_trait::async_trait]
286 : impl AsyncTestContext for MaybeEnabledS3 {
287 CBC 3 : async fn setup() -> Self {
288 3 : ensure_logging_ready();
289 3 :
290 3 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
291 3 : info!(
292 3 : "`{}` env variable is not set, skipping the test",
293 3 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
294 3 : );
295 3 : return Self::Disabled;
296 UBC 0 : }
297 0 :
298 0 : Self::Enabled(EnabledS3::setup(None).await)
299 CBC 6 : }
300 : }
301 :
302 : enum MaybeEnabledS3WithTestBlobs {
303 : Enabled(S3WithTestBlobs),
304 : Disabled,
305 : UploadsFailed(anyhow::Error, S3WithTestBlobs),
306 : }
307 :
308 : struct S3WithTestBlobs {
309 : enabled: EnabledS3,
310 : remote_prefixes: HashSet<RemotePath>,
311 : remote_blobs: HashSet<RemotePath>,
312 : }
313 :
314 : #[async_trait::async_trait]
315 : impl AsyncTestContext for MaybeEnabledS3WithTestBlobs {
316 1 : async fn setup() -> Self {
317 1 : ensure_logging_ready();
318 1 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
319 1 : info!(
320 1 : "`{}` env variable is not set, skipping the test",
321 1 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
322 1 : );
323 1 : return Self::Disabled;
324 UBC 0 : }
325 0 :
326 0 : let max_keys_in_list_response = 10;
327 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
328 :
329 0 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
330 :
331 0 : match upload_remote_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await {
332 0 : ControlFlow::Continue(uploads) => {
333 0 : info!("Remote objects created successfully");
334 :
335 0 : Self::Enabled(S3WithTestBlobs {
336 0 : enabled,
337 0 : remote_prefixes: uploads.prefixes,
338 0 : remote_blobs: uploads.blobs,
339 0 : })
340 : }
341 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
342 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
343 0 : S3WithTestBlobs {
344 0 : enabled,
345 0 : remote_prefixes: uploads.prefixes,
346 0 : remote_blobs: uploads.blobs,
347 0 : },
348 0 : ),
349 : }
350 CBC 2 : }
351 :
352 1 : async fn teardown(self) {
353 1 : match self {
354 1 : Self::Disabled => {}
355 UBC 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
356 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
357 : }
358 : }
359 CBC 1 : }
360 : }
361 :
362 : // NOTE: the setups for the list_prefixes test and the list_files test are very similar
363 : // However, they are not idential. The list_prefixes function is concerned with listing prefixes,
364 : // whereas the list_files function is concerned with listing files.
365 : // See `RemoteStorage::list_files` documentation for more details
366 : enum MaybeEnabledS3WithSimpleTestBlobs {
367 : Enabled(S3WithSimpleTestBlobs),
368 : Disabled,
369 : UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
370 : }
371 : struct S3WithSimpleTestBlobs {
372 : enabled: EnabledS3,
373 : remote_blobs: HashSet<RemotePath>,
374 : }
375 :
376 : #[async_trait::async_trait]
377 : impl AsyncTestContext for MaybeEnabledS3WithSimpleTestBlobs {
378 1 : async fn setup() -> Self {
379 1 : ensure_logging_ready();
380 1 : if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
381 1 : info!(
382 1 : "`{}` env variable is not set, skipping the test",
383 1 : ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
384 1 : );
385 1 : return Self::Disabled;
386 UBC 0 : }
387 0 :
388 0 : let max_keys_in_list_response = 10;
389 0 : let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
390 :
391 0 : let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
392 :
393 0 : match upload_simple_remote_data(&enabled.client, upload_tasks_count).await {
394 0 : ControlFlow::Continue(uploads) => {
395 0 : info!("Remote objects created successfully");
396 :
397 0 : Self::Enabled(S3WithSimpleTestBlobs {
398 0 : enabled,
399 0 : remote_blobs: uploads,
400 0 : })
401 : }
402 0 : ControlFlow::Break(uploads) => Self::UploadsFailed(
403 0 : anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
404 0 : S3WithSimpleTestBlobs {
405 0 : enabled,
406 0 : remote_blobs: uploads,
407 0 : },
408 0 : ),
409 : }
410 CBC 2 : }
411 :
412 1 : async fn teardown(self) {
413 1 : match self {
414 1 : Self::Disabled => {}
415 UBC 0 : Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
416 0 : cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
417 : }
418 : }
419 CBC 1 : }
420 : }
421 :
422 UBC 0 : fn create_s3_client(
423 0 : max_keys_per_list_response: Option<i32>,
424 0 : ) -> anyhow::Result<Arc<GenericRemoteStorage>> {
425 : use rand::Rng;
426 :
427 0 : let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
428 0 : .context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
429 0 : let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
430 0 : .context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?;
431 :
432 : // due to how time works, we've had test runners use the same nanos as bucket prefixes.
433 : // millis is just a debugging aid for easier finding the prefix later.
434 0 : let millis = std::time::SystemTime::now()
435 0 : .duration_since(UNIX_EPOCH)
436 0 : .context("random s3 test prefix part calculation")?
437 0 : .as_millis();
438 0 :
439 0 : // because nanos can be the same for two threads so can millis, add randomness
440 0 : let random = rand::thread_rng().gen::<u32>();
441 0 :
442 0 : let remote_storage_config = RemoteStorageConfig {
443 0 : storage: RemoteStorageKind::AwsS3(S3Config {
444 0 : bucket_name: remote_storage_s3_bucket,
445 0 : bucket_region: remote_storage_s3_region,
446 0 : prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
447 0 : endpoint: None,
448 0 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
449 0 : max_keys_per_list_response,
450 0 : }),
451 0 : };
452 0 : Ok(Arc::new(
453 0 : GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
454 : ))
455 0 : }
|