Line data Source code
1 : use anyhow::Context;
2 : use camino::Utf8Path;
3 : use futures::StreamExt;
4 : use remote_storage::ListingMode;
5 : use remote_storage::RemotePath;
6 : use std::sync::Arc;
7 : use std::{collections::HashSet, num::NonZeroU32};
8 : use test_context::test_context;
9 : use tokio_util::sync::CancellationToken;
10 : use tracing::debug;
11 :
12 : use crate::common::{download_to_vec, upload_stream, wrap_stream};
13 :
14 : use super::{
15 : MaybeEnabledStorage, MaybeEnabledStorageWithSimpleTestBlobs, MaybeEnabledStorageWithTestBlobs,
16 : };
17 :
18 : /// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries.
19 : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified.
20 : /// See the client creation in [`create_s3_client`] for details on the required env vars.
21 : /// If real S3 tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the
22 : /// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details.
23 : ///
24 : /// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_remote_data`]
25 : /// where
26 : /// * `random_prefix_part` is set for the entire S3 client during the S3 client creation in [`create_s3_client`], to avoid multiple test runs interference
27 : /// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket
28 : ///
29 : /// Then, verifies that the client does return correct prefixes when queried:
30 : /// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only
31 : /// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}`
32 : ///
33 : /// In the `MaybeEnabledStorageWithTestBlobs::setup`, we set the `max_keys_in_list_response` param to limit the keys in a single response.
34 : /// This way, we are able to test the pagination, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to S3,
35 : /// as the current default AWS S3 pagination limit is 1000.
36 : /// (see <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>).
37 : ///
38 : /// Lastly, the test attempts to clean up and remove all uploaded S3 files.
39 : /// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished.
40 8 : #[test_context(MaybeEnabledStorageWithTestBlobs)]
41 : #[tokio::test]
42 8 : async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> anyhow::Result<()> {
43 8 : let ctx = match ctx {
44 3 : MaybeEnabledStorageWithTestBlobs::Enabled(ctx) => ctx,
45 5 : MaybeEnabledStorageWithTestBlobs::Disabled => return Ok(()),
46 0 : MaybeEnabledStorageWithTestBlobs::UploadsFailed(e, _) => {
47 0 : anyhow::bail!("S3 init failed: {e:?}")
48 : }
49 : };
50 :
51 3 : let cancel = CancellationToken::new();
52 3 :
53 3 : let test_client = Arc::clone(&ctx.enabled.client);
54 3 : let expected_remote_prefixes = ctx.remote_prefixes.clone();
55 :
56 3 : let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix))
57 3 : .context("common_prefix construction")?;
58 3 : let root_remote_prefixes = test_client
59 3 : .list(None, ListingMode::WithDelimiter, None, &cancel)
60 10 : .await?
61 : .prefixes
62 3 : .into_iter()
63 3 : .collect::<HashSet<_>>();
64 3 : assert_eq!(
65 3 : root_remote_prefixes, HashSet::from([base_prefix.clone()]),
66 0 : "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}"
67 : );
68 :
69 3 : let nested_remote_prefixes = test_client
70 3 : .list(
71 3 : Some(&base_prefix.add_trailing_slash()),
72 3 : ListingMode::WithDelimiter,
73 3 : None,
74 3 : &cancel,
75 3 : )
76 34 : .await?
77 : .prefixes
78 3 : .into_iter()
79 3 : .collect::<HashSet<_>>();
80 3 : let remote_only_prefixes = nested_remote_prefixes
81 3 : .difference(&expected_remote_prefixes)
82 3 : .collect::<HashSet<_>>();
83 3 : let missing_uploaded_prefixes = expected_remote_prefixes
84 3 : .difference(&nested_remote_prefixes)
85 3 : .collect::<HashSet<_>>();
86 3 : assert_eq!(
87 3 : remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
88 0 : "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
89 : );
90 :
91 : // list_streaming
92 :
93 3 : let prefix_with_slash = base_prefix.add_trailing_slash();
94 3 : let mut nested_remote_prefixes_st = test_client.list_streaming(
95 3 : Some(&prefix_with_slash),
96 3 : ListingMode::WithDelimiter,
97 3 : None,
98 3 : &cancel,
99 3 : );
100 3 : let mut nested_remote_prefixes_combined = HashSet::new();
101 3 : let mut segments = 0;
102 3 : let mut segment_max_size = 0;
103 33 : while let Some(st) = nested_remote_prefixes_st.next().await {
104 9 : let st = st?;
105 9 : segment_max_size = segment_max_size.max(st.prefixes.len());
106 9 : nested_remote_prefixes_combined.extend(st.prefixes.into_iter());
107 9 : segments += 1;
108 : }
109 3 : assert!(segments > 1, "less than 2 segments: {segments}");
110 3 : assert!(
111 3 : segment_max_size * 2 <= nested_remote_prefixes_combined.len(),
112 0 : "double of segment_max_size={segment_max_size} larger number of remote prefixes of {}",
113 0 : nested_remote_prefixes_combined.len()
114 : );
115 3 : let remote_only_prefixes = nested_remote_prefixes_combined
116 3 : .difference(&expected_remote_prefixes)
117 3 : .collect::<HashSet<_>>();
118 3 : let missing_uploaded_prefixes = expected_remote_prefixes
119 3 : .difference(&nested_remote_prefixes_combined)
120 3 : .collect::<HashSet<_>>();
121 3 : assert_eq!(
122 3 : remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
123 0 : "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
124 : );
125 :
126 3 : Ok(())
127 8 : }
128 :
129 : /// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries.
130 : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set.
131 : /// See `s3_pagination_should_work` for more information.
132 : ///
133 : /// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`]
134 : /// Then performs the following queries:
135 : /// 1. `list(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
136 : /// 2. `list("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt`
137 8 : #[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
138 : #[tokio::test]
139 : async fn list_no_delimiter_works(
140 : ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs,
141 8 : ) -> anyhow::Result<()> {
142 8 : let ctx = match ctx {
143 3 : MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
144 5 : MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
145 0 : MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
146 0 : anyhow::bail!("S3 init failed: {e:?}")
147 : }
148 : };
149 3 : let cancel = CancellationToken::new();
150 3 : let test_client = Arc::clone(&ctx.enabled.client);
151 3 : let base_prefix =
152 3 : RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?;
153 3 : let root_files = test_client
154 3 : .list(None, ListingMode::NoDelimiter, None, &cancel)
155 39 : .await
156 3 : .context("client list root files failure")?
157 : .keys
158 3 : .into_iter()
159 63 : .map(|o| o.key)
160 3 : .collect::<HashSet<_>>();
161 3 : assert_eq!(
162 3 : root_files,
163 3 : ctx.remote_blobs.clone(),
164 0 : "remote storage list on root mismatches with the uploads."
165 : );
166 :
167 : // Test that max_keys limit works. In total there are about 21 files (see
168 : // upload_simple_remote_data call in test_real_s3.rs).
169 3 : let limited_root_files = test_client
170 3 : .list(
171 3 : None,
172 3 : ListingMode::NoDelimiter,
173 3 : Some(NonZeroU32::new(2).unwrap()),
174 3 : &cancel,
175 3 : )
176 11 : .await
177 3 : .context("client list root files failure")?;
178 3 : assert_eq!(limited_root_files.keys.len(), 2);
179 :
180 3 : let nested_remote_files = test_client
181 3 : .list(Some(&base_prefix), ListingMode::NoDelimiter, None, &cancel)
182 13 : .await
183 3 : .context("client list nested files failure")?
184 : .keys
185 3 : .into_iter()
186 21 : .map(|o| o.key)
187 3 : .collect::<HashSet<_>>();
188 3 : let trim_remote_blobs: HashSet<_> = ctx
189 3 : .remote_blobs
190 3 : .iter()
191 63 : .map(|x| x.get_path())
192 63 : .filter(|x| x.starts_with("folder1"))
193 21 : .map(|x| RemotePath::new(x).expect("must be valid path"))
194 3 : .collect();
195 3 : assert_eq!(
196 : nested_remote_files, trim_remote_blobs,
197 0 : "remote storage list on subdirrectory mismatches with the uploads."
198 : );
199 3 : Ok(())
200 8 : }
201 :
202 8 : #[test_context(MaybeEnabledStorage)]
203 : #[tokio::test]
204 8 : async fn delete_non_exising_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
205 8 : let ctx = match ctx {
206 3 : MaybeEnabledStorage::Enabled(ctx) => ctx,
207 5 : MaybeEnabledStorage::Disabled => return Ok(()),
208 : };
209 :
210 3 : let cancel = CancellationToken::new();
211 :
212 3 : let path = RemotePath::new(Utf8Path::new(
213 3 : format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(),
214 3 : ))
215 3 : .with_context(|| "RemotePath conversion")?;
216 :
217 3 : ctx.client
218 3 : .delete(&path, &cancel)
219 27 : .await
220 3 : .expect("should succeed");
221 3 :
222 3 : Ok(())
223 8 : }
224 :
225 8 : #[test_context(MaybeEnabledStorage)]
226 : #[tokio::test]
227 8 : async fn delete_objects_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
228 8 : let ctx = match ctx {
229 3 : MaybeEnabledStorage::Enabled(ctx) => ctx,
230 5 : MaybeEnabledStorage::Disabled => return Ok(()),
231 : };
232 :
233 3 : let cancel = CancellationToken::new();
234 :
235 3 : let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
236 3 : .with_context(|| "RemotePath conversion")?;
237 :
238 3 : let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
239 3 : .with_context(|| "RemotePath conversion")?;
240 :
241 3 : let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
242 3 : .with_context(|| "RemotePath conversion")?;
243 :
244 3 : let (data, len) = upload_stream("remote blob data1".as_bytes().into());
245 23 : ctx.client.upload(data, len, &path1, None, &cancel).await?;
246 :
247 3 : let (data, len) = upload_stream("remote blob data2".as_bytes().into());
248 7 : ctx.client.upload(data, len, &path2, None, &cancel).await?;
249 :
250 3 : let (data, len) = upload_stream("remote blob data3".as_bytes().into());
251 7 : ctx.client.upload(data, len, &path3, None, &cancel).await?;
252 :
253 15 : ctx.client.delete_objects(&[path1, path2], &cancel).await?;
254 :
255 3 : let prefixes = ctx
256 3 : .client
257 3 : .list(None, ListingMode::WithDelimiter, None, &cancel)
258 22 : .await?
259 : .prefixes;
260 :
261 3 : assert_eq!(prefixes.len(), 1);
262 :
263 10 : ctx.client.delete_objects(&[path3], &cancel).await?;
264 :
265 3 : Ok(())
266 8 : }
267 :
268 8 : #[test_context(MaybeEnabledStorage)]
269 : #[tokio::test]
270 8 : async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
271 8 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
272 5 : return Ok(());
273 : };
274 :
275 3 : let cancel = CancellationToken::new();
276 :
277 3 : let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))
278 3 : .with_context(|| "RemotePath conversion")?;
279 :
280 3 : let orig = bytes::Bytes::from_static("remote blob data here".as_bytes());
281 3 :
282 3 : let (data, len) = wrap_stream(orig.clone());
283 3 :
284 22 : ctx.client.upload(data, len, &path, None, &cancel).await?;
285 :
286 : // Normal download request
287 7 : let dl = ctx.client.download(&path, &cancel).await?;
288 3 : let buf = download_to_vec(dl).await?;
289 3 : assert_eq!(&buf, &orig);
290 :
291 : // Full range (end specified)
292 3 : let dl = ctx
293 3 : .client
294 3 : .download_byte_range(&path, 0, Some(len as u64), &cancel)
295 7 : .await?;
296 3 : let buf = download_to_vec(dl).await?;
297 3 : assert_eq!(&buf, &orig);
298 :
299 : // partial range (end specified)
300 3 : let dl = ctx
301 3 : .client
302 3 : .download_byte_range(&path, 4, Some(10), &cancel)
303 7 : .await?;
304 3 : let buf = download_to_vec(dl).await?;
305 3 : assert_eq!(&buf, &orig[4..10]);
306 :
307 : // partial range (end beyond real end)
308 3 : let dl = ctx
309 3 : .client
310 3 : .download_byte_range(&path, 8, Some(len as u64 * 100), &cancel)
311 7 : .await?;
312 3 : let buf = download_to_vec(dl).await?;
313 3 : assert_eq!(&buf, &orig[8..]);
314 :
315 : // Partial range (end unspecified)
316 3 : let dl = ctx
317 3 : .client
318 3 : .download_byte_range(&path, 4, None, &cancel)
319 7 : .await?;
320 3 : let buf = download_to_vec(dl).await?;
321 3 : assert_eq!(&buf, &orig[4..]);
322 :
323 : // Full range (end unspecified)
324 3 : let dl = ctx
325 3 : .client
326 3 : .download_byte_range(&path, 0, None, &cancel)
327 7 : .await?;
328 3 : let buf = download_to_vec(dl).await?;
329 3 : assert_eq!(&buf, &orig);
330 :
331 3 : debug!("Cleanup: deleting file at path {path:?}");
332 3 : ctx.client
333 3 : .delete(&path, &cancel)
334 10 : .await
335 3 : .with_context(|| format!("{path:?} removal"))?;
336 :
337 3 : Ok(())
338 8 : }
339 :
340 8 : #[test_context(MaybeEnabledStorage)]
341 : #[tokio::test]
342 8 : async fn copy_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
343 8 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
344 5 : return Ok(());
345 : };
346 :
347 3 : let cancel = CancellationToken::new();
348 :
349 3 : let path = RemotePath::new(Utf8Path::new(
350 3 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
351 3 : ))
352 3 : .with_context(|| "RemotePath conversion")?;
353 3 : let path_dest = RemotePath::new(Utf8Path::new(
354 3 : format!("{}/file_dest", ctx.base_prefix).as_str(),
355 3 : ))
356 3 : .with_context(|| "RemotePath conversion")?;
357 :
358 3 : let orig = bytes::Bytes::from_static("remote blob data content".as_bytes());
359 3 :
360 3 : let (data, len) = wrap_stream(orig.clone());
361 3 :
362 21 : ctx.client.upload(data, len, &path, None, &cancel).await?;
363 :
364 : // Normal download request
365 8 : ctx.client.copy_object(&path, &path_dest, &cancel).await?;
366 :
367 7 : let dl = ctx.client.download(&path_dest, &cancel).await?;
368 3 : let buf = download_to_vec(dl).await?;
369 3 : assert_eq!(&buf, &orig);
370 :
371 3 : debug!("Cleanup: deleting file at path {path:?}");
372 3 : ctx.client
373 3 : .delete_objects(&[path.clone(), path_dest.clone()], &cancel)
374 16 : .await
375 3 : .with_context(|| format!("{path:?} removal"))?;
376 :
377 3 : Ok(())
378 8 : }
|