Line data Source code
1 : use anyhow::Context;
2 : use camino::Utf8Path;
3 : use remote_storage::RemotePath;
4 : use std::sync::Arc;
5 : use std::{collections::HashSet, num::NonZeroU32};
6 : use test_context::test_context;
7 : use tokio_util::sync::CancellationToken;
8 : use tracing::debug;
9 :
10 : use crate::common::{download_to_vec, upload_stream, wrap_stream};
11 :
12 : use super::{
13 : MaybeEnabledStorage, MaybeEnabledStorageWithSimpleTestBlobs, MaybeEnabledStorageWithTestBlobs,
14 : };
15 :
16 : /// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries.
17 : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified.
18 : /// See the client creation in [`create_s3_client`] for details on the required env vars.
19 : /// If real S3 tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the
20 : /// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details.
21 : ///
22 : /// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_remote_data`]
23 : /// where
24 : /// * `random_prefix_part` is set for the entire S3 client during the S3 client creation in [`create_s3_client`], to avoid multiple test runs interference
25 : /// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket
26 : ///
27 : /// Then, verifies that the client does return correct prefixes when queried:
28 : /// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only
29 : /// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}`
30 : ///
31 : /// With the real S3 enabled and `#[cfg(test)]` Rust configuration used, the S3 client test adds a `max-keys` param to limit the response keys.
32 : /// This way, we are able to test the pagination implicitly, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to S3,
33 : /// since current default AWS S3 pagination limit is 1000.
34 : /// (see https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax)
35 : ///
36 : /// Lastly, the test attempts to clean up and remove all uploaded S3 files.
37 : /// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished.
38 4 : #[test_context(MaybeEnabledStorageWithTestBlobs)]
39 4 : #[tokio::test]
40 8 : async fn pagination_should_work(ctx: &mut MaybeEnabledStorageWithTestBlobs) -> anyhow::Result<()> {
41 4 : let ctx = match ctx {
42 0 : MaybeEnabledStorageWithTestBlobs::Enabled(ctx) => ctx,
43 4 : MaybeEnabledStorageWithTestBlobs::Disabled => return Ok(()),
44 0 : MaybeEnabledStorageWithTestBlobs::UploadsFailed(e, _) => {
45 0 : anyhow::bail!("S3 init failed: {e:?}")
46 : }
47 : };
48 :
49 0 : let cancel = CancellationToken::new();
50 0 :
51 0 : let test_client = Arc::clone(&ctx.enabled.client);
52 0 : let expected_remote_prefixes = ctx.remote_prefixes.clone();
53 :
54 0 : let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix))
55 0 : .context("common_prefix construction")?;
56 0 : let root_remote_prefixes = test_client
57 0 : .list_prefixes(None, &cancel)
58 0 : .await
59 0 : .context("client list root prefixes failure")?
60 0 : .into_iter()
61 0 : .collect::<HashSet<_>>();
62 0 : assert_eq!(
63 0 : root_remote_prefixes, HashSet::from([base_prefix.clone()]),
64 0 : "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}"
65 : );
66 :
67 0 : let nested_remote_prefixes = test_client
68 0 : .list_prefixes(Some(&base_prefix), &cancel)
69 0 : .await
70 0 : .context("client list nested prefixes failure")?
71 0 : .into_iter()
72 0 : .collect::<HashSet<_>>();
73 0 : let remote_only_prefixes = nested_remote_prefixes
74 0 : .difference(&expected_remote_prefixes)
75 0 : .collect::<HashSet<_>>();
76 0 : let missing_uploaded_prefixes = expected_remote_prefixes
77 0 : .difference(&nested_remote_prefixes)
78 0 : .collect::<HashSet<_>>();
79 0 : assert_eq!(
80 0 : remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
81 0 : "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
82 : );
83 :
84 0 : Ok(())
85 4 : }
86 :
87 : /// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries.
88 : /// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set.
89 : /// See `s3_pagination_should_work` for more information.
90 : ///
91 : /// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_remote_data`]
92 : /// Then performs the following queries:
93 : /// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
94 : /// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt`
95 4 : #[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
96 4 : #[tokio::test]
97 8 : async fn list_files_works(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> anyhow::Result<()> {
98 4 : let ctx = match ctx {
99 0 : MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
100 4 : MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
101 0 : MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
102 0 : anyhow::bail!("S3 init failed: {e:?}")
103 : }
104 : };
105 0 : let cancel = CancellationToken::new();
106 0 : let test_client = Arc::clone(&ctx.enabled.client);
107 0 : let base_prefix =
108 0 : RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?;
109 0 : let root_files = test_client
110 0 : .list_files(None, None, &cancel)
111 0 : .await
112 0 : .context("client list root files failure")?
113 0 : .into_iter()
114 0 : .collect::<HashSet<_>>();
115 0 : assert_eq!(
116 0 : root_files,
117 0 : ctx.remote_blobs.clone(),
118 0 : "remote storage list_files on root mismatches with the uploads."
119 : );
120 :
121 : // Test that max_keys limit works. In total there are about 21 files (see
122 : // upload_simple_remote_data call in test_real_s3.rs).
123 0 : let limited_root_files = test_client
124 0 : .list_files(None, Some(NonZeroU32::new(2).unwrap()), &cancel)
125 0 : .await
126 0 : .context("client list root files failure")?;
127 0 : assert_eq!(limited_root_files.len(), 2);
128 :
129 0 : let nested_remote_files = test_client
130 0 : .list_files(Some(&base_prefix), None, &cancel)
131 0 : .await
132 0 : .context("client list nested files failure")?
133 0 : .into_iter()
134 0 : .collect::<HashSet<_>>();
135 0 : let trim_remote_blobs: HashSet<_> = ctx
136 0 : .remote_blobs
137 0 : .iter()
138 0 : .map(|x| x.get_path())
139 0 : .filter(|x| x.starts_with("folder1"))
140 0 : .map(|x| RemotePath::new(x).expect("must be valid path"))
141 0 : .collect();
142 : assert_eq!(
143 : nested_remote_files, trim_remote_blobs,
144 0 : "remote storage list_files on subdirrectory mismatches with the uploads."
145 : );
146 0 : Ok(())
147 4 : }
148 :
149 4 : #[test_context(MaybeEnabledStorage)]
150 4 : #[tokio::test]
151 8 : async fn delete_non_exising_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
152 4 : let ctx = match ctx {
153 0 : MaybeEnabledStorage::Enabled(ctx) => ctx,
154 4 : MaybeEnabledStorage::Disabled => return Ok(()),
155 : };
156 :
157 0 : let cancel = CancellationToken::new();
158 :
159 0 : let path = RemotePath::new(Utf8Path::new(
160 0 : format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(),
161 0 : ))
162 0 : .with_context(|| "RemotePath conversion")?;
163 :
164 0 : ctx.client
165 0 : .delete(&path, &cancel)
166 0 : .await
167 0 : .expect("should succeed");
168 0 :
169 0 : Ok(())
170 4 : }
171 :
172 4 : #[test_context(MaybeEnabledStorage)]
173 4 : #[tokio::test]
174 8 : async fn delete_objects_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
175 4 : let ctx = match ctx {
176 0 : MaybeEnabledStorage::Enabled(ctx) => ctx,
177 4 : MaybeEnabledStorage::Disabled => return Ok(()),
178 : };
179 :
180 0 : let cancel = CancellationToken::new();
181 :
182 0 : let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
183 0 : .with_context(|| "RemotePath conversion")?;
184 :
185 0 : let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
186 0 : .with_context(|| "RemotePath conversion")?;
187 :
188 0 : let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
189 0 : .with_context(|| "RemotePath conversion")?;
190 :
191 0 : let (data, len) = upload_stream("remote blob data1".as_bytes().into());
192 0 : ctx.client.upload(data, len, &path1, None, &cancel).await?;
193 :
194 0 : let (data, len) = upload_stream("remote blob data2".as_bytes().into());
195 0 : ctx.client.upload(data, len, &path2, None, &cancel).await?;
196 :
197 0 : let (data, len) = upload_stream("remote blob data3".as_bytes().into());
198 0 : ctx.client.upload(data, len, &path3, None, &cancel).await?;
199 :
200 0 : ctx.client.delete_objects(&[path1, path2], &cancel).await?;
201 :
202 0 : let prefixes = ctx.client.list_prefixes(None, &cancel).await?;
203 :
204 0 : assert_eq!(prefixes.len(), 1);
205 :
206 0 : ctx.client.delete_objects(&[path3], &cancel).await?;
207 :
208 0 : Ok(())
209 4 : }
210 :
211 4 : #[test_context(MaybeEnabledStorage)]
212 4 : #[tokio::test]
213 8 : async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
214 4 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
215 4 : return Ok(());
216 : };
217 :
218 0 : let cancel = CancellationToken::new();
219 :
220 0 : let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str()))
221 0 : .with_context(|| "RemotePath conversion")?;
222 :
223 0 : let orig = bytes::Bytes::from_static("remote blob data here".as_bytes());
224 0 :
225 0 : let (data, len) = wrap_stream(orig.clone());
226 0 :
227 0 : ctx.client.upload(data, len, &path, None, &cancel).await?;
228 :
229 : // Normal download request
230 0 : let dl = ctx.client.download(&path, &cancel).await?;
231 0 : let buf = download_to_vec(dl).await?;
232 0 : assert_eq!(&buf, &orig);
233 :
234 : // Full range (end specified)
235 0 : let dl = ctx
236 0 : .client
237 0 : .download_byte_range(&path, 0, Some(len as u64), &cancel)
238 0 : .await?;
239 0 : let buf = download_to_vec(dl).await?;
240 0 : assert_eq!(&buf, &orig);
241 :
242 : // partial range (end specified)
243 0 : let dl = ctx
244 0 : .client
245 0 : .download_byte_range(&path, 4, Some(10), &cancel)
246 0 : .await?;
247 0 : let buf = download_to_vec(dl).await?;
248 0 : assert_eq!(&buf, &orig[4..10]);
249 :
250 : // partial range (end beyond real end)
251 0 : let dl = ctx
252 0 : .client
253 0 : .download_byte_range(&path, 8, Some(len as u64 * 100), &cancel)
254 0 : .await?;
255 0 : let buf = download_to_vec(dl).await?;
256 0 : assert_eq!(&buf, &orig[8..]);
257 :
258 : // Partial range (end unspecified)
259 0 : let dl = ctx
260 0 : .client
261 0 : .download_byte_range(&path, 4, None, &cancel)
262 0 : .await?;
263 0 : let buf = download_to_vec(dl).await?;
264 0 : assert_eq!(&buf, &orig[4..]);
265 :
266 : // Full range (end unspecified)
267 0 : let dl = ctx
268 0 : .client
269 0 : .download_byte_range(&path, 0, None, &cancel)
270 0 : .await?;
271 0 : let buf = download_to_vec(dl).await?;
272 0 : assert_eq!(&buf, &orig);
273 :
274 0 : debug!("Cleanup: deleting file at path {path:?}");
275 0 : ctx.client
276 0 : .delete(&path, &cancel)
277 0 : .await
278 0 : .with_context(|| format!("{path:?} removal"))?;
279 :
280 0 : Ok(())
281 4 : }
282 :
283 4 : #[test_context(MaybeEnabledStorage)]
284 4 : #[tokio::test]
285 8 : async fn copy_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
286 4 : let MaybeEnabledStorage::Enabled(ctx) = ctx else {
287 4 : return Ok(());
288 : };
289 :
290 0 : let cancel = CancellationToken::new();
291 :
292 0 : let path = RemotePath::new(Utf8Path::new(
293 0 : format!("{}/file_to_copy", ctx.base_prefix).as_str(),
294 0 : ))
295 0 : .with_context(|| "RemotePath conversion")?;
296 0 : let path_dest = RemotePath::new(Utf8Path::new(
297 0 : format!("{}/file_dest", ctx.base_prefix).as_str(),
298 0 : ))
299 0 : .with_context(|| "RemotePath conversion")?;
300 :
301 0 : let orig = bytes::Bytes::from_static("remote blob data content".as_bytes());
302 0 :
303 0 : let (data, len) = wrap_stream(orig.clone());
304 0 :
305 0 : ctx.client.upload(data, len, &path, None, &cancel).await?;
306 :
307 : // Normal download request
308 0 : ctx.client.copy_object(&path, &path_dest, &cancel).await?;
309 :
310 0 : let dl = ctx.client.download(&path_dest, &cancel).await?;
311 0 : let buf = download_to_vec(dl).await?;
312 0 : assert_eq!(&buf, &orig);
313 :
314 0 : debug!("Cleanup: deleting file at path {path:?}");
315 0 : ctx.client
316 0 : .delete_objects(&[path.clone(), path_dest.clone()], &cancel)
317 0 : .await
318 0 : .with_context(|| format!("{path:?} removal"))?;
319 :
320 0 : Ok(())
321 4 : }
|