Line data Source code
1 : use anyhow::Context;
2 : use camino::Utf8Path;
3 : use remote_storage::ListingMode;
4 : use remote_storage::RemotePath;
5 : use std::sync::Arc;
6 : use std::{collections::HashSet, num::NonZeroU32};
7 : use test_context::test_context;
8 : use tokio_util::sync::CancellationToken;
9 : use tracing::debug;
10 :
11 : use crate::common::{download_to_vec, upload_stream, wrap_stream};
12 :
13 : use super::{
14 : MaybeEnabledStorage, MaybeEnabledStorageWithSimpleTestBlobs, MaybeEnabledStorageWithTestBlobs,
15 : };
16 :
17 : /// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries.
18 : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified.
19 : /// See the client creation in [`create_s3_client`] for details on the required env vars.
20 : /// If real S3 tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the
21 : /// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details.
22 : ///
23 : /// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_remote_data`]
24 : /// where
25 : /// * `random_prefix_part` is set for the entire S3 client during the S3 client creation in [`create_s3_client`], to avoid multiple test runs interference
26 : /// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket
27 : ///
28 : /// Then, verifies that the client does return correct prefixes when queried:
29 : /// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only
30 : /// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}`
31 : ///
32 : /// With the real S3 enabled and `#[cfg(test)]` Rust configuration used, the S3 client test adds a `max-keys` param to limit the response keys.
33 : /// This way, we are able to test the pagination implicitly, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to S3,
34 : /// since current default AWS S3 pagination limit is 1000.
35 : /// (see https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax)
36 : ///
37 : /// Lastly, the test attempts to clean up and remove all uploaded S3 files.
38 : /// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished.
39 8 : #[test_context(MaybeEnabledStorageWithTestBlobs)]
40 : #[tokio::test]
41 8 : async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> anyhow::Result<()> {
42 8 : let ctx = match ctx {
43 3 : MaybeEnabledStorageWithTestBlobs::Enabled(ctx) => ctx,
44 5 : MaybeEnabledStorageWithTestBlobs::Disabled => return Ok(()),
45 0 : MaybeEnabledStorageWithTestBlobs::UploadsFailed(e, _) => {
46 0 : anyhow::bail!("S3 init failed: {e:?}")
47 : }
48 : };
49 :
50 3 : let cancel = CancellationToken::new();
51 3 :
52 3 : let test_client = Arc::clone(&ctx.enabled.client);
53 3 : let expected_remote_prefixes = ctx.remote_prefixes.clone();
54 :
55 3 : let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix))
56 3 : .context("common_prefix construction")?;
57 3 : let root_remote_prefixes = test_client
58 3 : .list(None, ListingMode::WithDelimiter, None, &cancel)
59 10 : .await?
60 : .prefixes
61 3 : .into_iter()
62 3 : .collect::<HashSet<_>>();
63 3 : assert_eq!(
64 3 : root_remote_prefixes, HashSet::from([base_prefix.clone()]),
65 0 : "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}"
66 : );
67 :
68 3 : let nested_remote_prefixes = test_client
69 3 : .list(
70 3 : Some(&base_prefix.add_trailing_slash()),
71 3 : ListingMode::WithDelimiter,
72 3 : None,
73 3 : &cancel,
74 3 : )
75 32 : .await?
76 : .prefixes
77 3 : .into_iter()
78 3 : .collect::<HashSet<_>>();
79 3 : let remote_only_prefixes = nested_remote_prefixes
80 3 : .difference(&expected_remote_prefixes)
81 3 : .collect::<HashSet<_>>();
82 3 : let missing_uploaded_prefixes = expected_remote_prefixes
83 3 : .difference(&nested_remote_prefixes)
84 3 : .collect::<HashSet<_>>();
85 3 : assert_eq!(
86 3 : remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
87 0 : "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
88 : );
89 :
90 3 : Ok(())
91 8 : }
92 :
93 : /// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries.
94 : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set.
95 : /// See `s3_pagination_should_work` for more information.
96 : ///
97 : /// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`]
98 : /// Then performs the following queries:
99 : /// 1. `list(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
100 : /// 2. `list("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt`
101 8 : #[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
102 : #[tokio::test]
103 : async fn list_no_delimiter_works(
104 : ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs,
105 8 : ) -> anyhow::Result<()> {
106 8 : let ctx = match ctx {
107 3 : MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
108 5 : MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
109 0 : MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
110 0 : anyhow::bail!("S3 init failed: {e:?}")
111 : }
112 : };
113 3 : let cancel = CancellationToken::new();
114 3 : let test_client = Arc::clone(&ctx.enabled.client);
115 3 : let base_prefix =
116 3 : RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?;
117 3 : let root_files = test_client
118 3 : .list(None, ListingMode::NoDelimiter, None, &cancel)
119 37 : .await
120 3 : .context("client list root files failure")?
121 : .keys
122 3 : .into_iter()
123 3 : .collect::<HashSet<_>>();
124 3 : assert_eq!(
125 3 : root_files,
126 3 : ctx.remote_blobs.clone(),
127 0 : "remote storage list on root mismatches with the uploads."
128 : );
129 :
130 : // Test that max_keys limit works. In total there are about 21 files (see
131 : // upload_simple_remote_data call in test_real_s3.rs).
132 3 : let limited_root_files = test_client
133 3 : .list(
134 3 : None,
135 3 : ListingMode::NoDelimiter,
136 3 : Some(NonZeroU32::new(2).unwrap()),
137 3 : &cancel,
138 3 : )
139 12 : .await
140 3 : .context("client list root files failure")?;
141 3 : assert_eq!(limited_root_files.keys.len(), 2);
142 :
143 3 : let nested_remote_files = test_client
144 3 : .list(Some(&base_prefix), ListingMode::NoDelimiter, None, &cancel)
145 14 : .await
146 3 : .context("client list nested files failure")?
147 : .keys
148 3 : .into_iter()
149 3 : .collect::<HashSet<_>>();
150 3 : let trim_remote_blobs: HashSet<_> = ctx
151 3 : .remote_blobs
152 3 : .iter()
153 63 : .map(|x| x.get_path())
154 63 : .filter(|x| x.starts_with("folder1"))
155 21 : .map(|x| RemotePath::new(x).expect("must be valid path"))
156 3 : .collect();
157 3 : assert_eq!(
158 : nested_remote_files, trim_remote_blobs,
159 0 : "remote storage list on subdirrectory mismatches with the uploads."
160 : );
161 3 : Ok(())
162 8 : }
163 :
164 8 : #[test_context(MaybeEnabledStorage)]
165 : #[tokio::test]
166 8 : async fn delete_non_exising_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
167 8 : let ctx = match ctx {
168 3 : MaybeEnabledStorage::Enabled(ctx) => ctx,
169 5 : MaybeEnabledStorage::Disabled => return Ok(()),
170 : };
171 :
172 3 : let cancel = CancellationToken::new();
173 :
174 3 : let path = RemotePath::new(Utf8Path::new(
175 3 : format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(),
176 3 : ))
177 3 : .with_context(|| "RemotePath conversion")?;
178 :
179 3 : ctx.client
180 3 : .delete(&path, &cancel)
181 24 : .await
182 3 : .expect("should succeed");
183 3 :
184 3 : Ok(())
185 8 : }
186 :
187 8 : #[test_context(MaybeEnabledStorage)]
188 : #[tokio::test]
189 8 : async fn delete_objects_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
190 8 : let ctx = match ctx {
191 3 : MaybeEnabledStorage::Enabled(ctx) => ctx,
192 5 : MaybeEnabledStorage::Disabled => return Ok(()),
193 : };
194 :
195 3 : let cancel = CancellationToken::new();
196 :
197 3 : let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
198 3 : .with_context(|| "RemotePath conversion")?;
199 :
200 3 : let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
201 3 : .with_context(|| "RemotePath conversion")?;
202 :
203 3 : let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
204 3 : .with_context(|| "RemotePath conversion")?;
205 :
206 3 : let (data, len) = upload_stream("remote blob data1".as_bytes().into());
207 22 : ctx.client.upload(data, len, &path1, None, &cancel).await?;
208 :
209 3 : let (data, len) = upload_stream("remote blob data2".as_bytes().into());
210 7 : ctx.client.upload(data, len, &path2, None, &cancel).await?;
211 :
212 3 : let (data, len) = upload_stream("remote blob data3".as_bytes().into());
213 7 : ctx.client.upload(data, len, &path3, None, &cancel).await?;
214 :
215 14 : ctx.client.delete_objects(&[path1, path2], &cancel).await?;
216 :
217 3 : let prefixes = ctx
218 3 : .client
219 3 : .list(None, ListingMode::WithDelimiter, None, &cancel)
220 24 : .await?
221 : .prefixes;
222 :
223 3 : assert_eq!(prefixes.len(), 1);
224 :
225 10 : ctx.client.delete_objects(&[path3], &cancel).await?;
226 :
227 3 : Ok(())
228 8 : }
229 :
230 8 : #[test_context(MaybeEnabledStorage)]
231 : #[tokio::test]
232 8 : async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
233 8 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
234 5 : return Ok(());
235 : };
236 :
237 3 : let cancel = CancellationToken::new();
238 :
239 3 : let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))
240 3 : .with_context(|| "RemotePath conversion")?;
241 :
242 3 : let orig = bytes::Bytes::from_static("remote blob data here".as_bytes());
243 3 :
244 3 : let (data, len) = wrap_stream(orig.clone());
245 3 :
246 26 : ctx.client.upload(data, len, &path, None, &cancel).await?;
247 :
248 : // Normal download request
249 8 : let dl = ctx.client.download(&path, &cancel).await?;
250 3 : let buf = download_to_vec(dl).await?;
251 3 : assert_eq!(&buf, &orig);
252 :
253 : // Full range (end specified)
254 3 : let dl = ctx
255 3 : .client
256 3 : .download_byte_range(&path, 0, Some(len as u64), &cancel)
257 7 : .await?;
258 3 : let buf = download_to_vec(dl).await?;
259 3 : assert_eq!(&buf, &orig);
260 :
261 : // partial range (end specified)
262 3 : let dl = ctx
263 3 : .client
264 3 : .download_byte_range(&path, 4, Some(10), &cancel)
265 7 : .await?;
266 3 : let buf = download_to_vec(dl).await?;
267 3 : assert_eq!(&buf, &orig[4..10]);
268 :
269 : // partial range (end beyond real end)
270 3 : let dl = ctx
271 3 : .client
272 3 : .download_byte_range(&path, 8, Some(len as u64 * 100), &cancel)
273 7 : .await?;
274 3 : let buf = download_to_vec(dl).await?;
275 3 : assert_eq!(&buf, &orig[8..]);
276 :
277 : // Partial range (end unspecified)
278 3 : let dl = ctx
279 3 : .client
280 3 : .download_byte_range(&path, 4, None, &cancel)
281 7 : .await?;
282 3 : let buf = download_to_vec(dl).await?;
283 3 : assert_eq!(&buf, &orig[4..]);
284 :
285 : // Full range (end unspecified)
286 3 : let dl = ctx
287 3 : .client
288 3 : .download_byte_range(&path, 0, None, &cancel)
289 7 : .await?;
290 3 : let buf = download_to_vec(dl).await?;
291 3 : assert_eq!(&buf, &orig);
292 :
293 3 : debug!("Cleanup: deleting file at path {path:?}");
294 3 : ctx.client
295 3 : .delete(&path, &cancel)
296 10 : .await
297 3 : .with_context(|| format!("{path:?} removal"))?;
298 :
299 3 : Ok(())
300 8 : }
301 :
302 8 : #[test_context(MaybeEnabledStorage)]
303 : #[tokio::test]
304 8 : async fn copy_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
305 8 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
306 5 : return Ok(());
307 : };
308 :
309 3 : let cancel = CancellationToken::new();
310 :
311 3 : let path = RemotePath::new(Utf8Path::new(
312 3 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
313 3 : ))
314 3 : .with_context(|| "RemotePath conversion")?;
315 3 : let path_dest = RemotePath::new(Utf8Path::new(
316 3 : format!("{}/file_dest", ctx.base_prefix).as_str(),
317 3 : ))
318 3 : .with_context(|| "RemotePath conversion")?;
319 :
320 3 : let orig = bytes::Bytes::from_static("remote blob data content".as_bytes());
321 3 :
322 3 : let (data, len) = wrap_stream(orig.clone());
323 3 :
324 20 : ctx.client.upload(data, len, &path, None, &cancel).await?;
325 :
326 : // Normal download request
327 7 : ctx.client.copy_object(&path, &path_dest, &cancel).await?;
328 :
329 7 : let dl = ctx.client.download(&path_dest, &cancel).await?;
330 3 : let buf = download_to_vec(dl).await?;
331 3 : assert_eq!(&buf, &orig);
332 :
333 3 : debug!("Cleanup: deleting file at path {path:?}");
334 3 : ctx.client
335 3 : .delete_objects(&[path.clone(), path_dest.clone()], &cancel)
336 15 : .await
337 3 : .with_context(|| format!("{path:?} removal"))?;
338 :
339 3 : Ok(())
340 8 : }
|