Line data Source code
1 : //! Local filesystem acting as a remote storage.
2 : //! Multiple API users can use the same "storage" of this kind by using different storage roots.
3 : //!
4 : //! This storage used in tests, but can also be used in cases when a certain persistent
5 : //! volume is mounted to the local FS.
6 :
7 : use std::{borrow::Cow, future::Future, io::ErrorKind, pin::Pin, time::SystemTime};
8 :
9 : use anyhow::{bail, ensure, Context};
10 : use bytes::Bytes;
11 : use camino::{Utf8Path, Utf8PathBuf};
12 : use futures::stream::Stream;
13 : use tokio::{
14 : fs,
15 : io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
16 : };
17 : use tokio_util::{io::ReaderStream, sync::CancellationToken};
18 : use tracing::*;
19 : use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
20 :
21 : use crate::{
22 : Download, DownloadError, DownloadStream, Listing, ListingMode, RemotePath, TimeTravelError,
23 : };
24 :
25 : use super::{RemoteStorage, StorageMetadata};
26 :
27 : const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
28 :
29 4764 : #[derive(Debug, Clone)]
30 : pub struct LocalFs {
31 : storage_root: Utf8PathBuf,
32 : }
33 :
34 : impl LocalFs {
35 : /// Attempts to create local FS storage, along with its root directory.
36 : /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
37 492 : pub fn new(mut storage_root: Utf8PathBuf) -> anyhow::Result<Self> {
38 492 : if !storage_root.exists() {
39 17 : std::fs::create_dir_all(&storage_root).with_context(|| {
40 0 : format!("Failed to create all directories in the given root path {storage_root:?}")
41 17 : })?;
42 475 : }
43 492 : if !storage_root.is_absolute() {
44 86 : storage_root = storage_root.canonicalize_utf8().with_context(|| {
45 0 : format!("Failed to represent path {storage_root:?} as an absolute path")
46 86 : })?;
47 406 : }
48 :
49 492 : Ok(Self { storage_root })
50 492 : }
51 :
52 : // mirrors S3Bucket::s3_object_to_relative_path
53 243 : fn local_file_to_relative_path(&self, key: Utf8PathBuf) -> RemotePath {
54 243 : let relative_path = key
55 243 : .strip_prefix(&self.storage_root)
56 243 : .expect("relative path must contain storage_root as prefix");
57 243 : RemotePath(relative_path.into())
58 243 : }
59 :
60 1002 : async fn read_storage_metadata(
61 1002 : &self,
62 1002 : file_path: &Utf8Path,
63 1002 : ) -> anyhow::Result<Option<StorageMetadata>> {
64 1002 : let metadata_path = storage_metadata_path(file_path);
65 1002 : if metadata_path.exists() && metadata_path.is_file() {
66 4 : let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
67 0 : format!("Failed to read metadata from the local storage at '{metadata_path}'")
68 4 : })?;
69 :
70 4 : serde_json::from_str(&metadata_string)
71 4 : .with_context(|| {
72 0 : format!(
73 0 : "Failed to deserialize metadata from the local storage at '{metadata_path}'",
74 0 : )
75 4 : })
76 4 : .map(|metadata| Some(StorageMetadata(metadata)))
77 : } else {
78 998 : Ok(None)
79 : }
80 1002 : }
81 :
82 : #[cfg(test)]
83 6 : async fn list_all(&self) -> anyhow::Result<Vec<RemotePath>> {
84 6 : Ok(get_all_files(&self.storage_root, true)
85 18 : .await?
86 6 : .into_iter()
87 6 : .map(|path| {
88 6 : path.strip_prefix(&self.storage_root)
89 6 : .context("Failed to strip storage root prefix")
90 6 : .and_then(RemotePath::new)
91 6 : .expect(
92 6 : "We list files for storage root, hence should be able to remote the prefix",
93 6 : )
94 6 : })
95 6 : .collect())
96 6 : }
97 :
98 : // recursively lists all files in a directory,
99 : // mirroring the `list_files` for `s3_bucket`
100 188 : async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
101 188 : let full_path = match folder {
102 186 : Some(folder) => folder.with_base(&self.storage_root),
103 2 : None => self.storage_root.clone(),
104 : };
105 :
106 : // If we were given a directory, we may use it as our starting point.
107 : // Otherwise, we must go up to the first ancestor dir that exists. This is because
108 : // S3 object list prefixes can be arbitrary strings, but when reading
109 : // the local filesystem we need a directory to start calling read_dir on.
110 188 : let mut initial_dir = full_path.clone();
111 341 : loop {
112 341 : // Did we make it to the root?
113 341 : if initial_dir.parent().is_none() {
114 0 : anyhow::bail!("list_files: failed to find valid ancestor dir for {full_path}");
115 341 : }
116 341 :
117 341 : match fs::metadata(initial_dir.clone()).await {
118 194 : Ok(meta) if meta.is_dir() => {
119 188 : // We found a directory, break
120 188 : break;
121 : }
122 6 : Ok(_meta) => {
123 6 : // It's not a directory: strip back to the parent
124 6 : initial_dir.pop();
125 6 : }
126 147 : Err(e) if e.kind() == ErrorKind::NotFound => {
127 147 : // It's not a file that exists: strip the prefix back to the parent directory
128 147 : initial_dir.pop();
129 147 : }
130 0 : Err(e) => {
131 0 : // Unexpected I/O error
132 0 : anyhow::bail!(e)
133 : }
134 : }
135 : }
136 : // Note that Utf8PathBuf starts_with only considers full path segments, but
137 : // object prefixes are arbitrary strings, so we need the strings for doing
138 : // starts_with later.
139 188 : let prefix = full_path.as_str();
140 188 :
141 188 : let mut files = vec![];
142 188 : let mut directory_queue = vec![initial_dir];
143 384 : while let Some(cur_folder) = directory_queue.pop() {
144 196 : let mut entries = cur_folder.read_dir_utf8()?;
145 3207 : while let Some(Ok(entry)) = entries.next() {
146 3011 : let file_name = entry.file_name();
147 3011 : let full_file_name = cur_folder.join(file_name);
148 3011 : if full_file_name.as_str().starts_with(prefix) {
149 243 : let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
150 243 : files.push(file_remote_path);
151 243 : if full_file_name.is_dir() {
152 8 : directory_queue.push(full_file_name);
153 235 : }
154 2768 : }
155 : }
156 : }
157 :
158 188 : Ok(files)
159 188 : }
160 : }
161 :
162 : impl RemoteStorage for LocalFs {
163 764 : async fn list(
164 764 : &self,
165 764 : prefix: Option<&RemotePath>,
166 764 : mode: ListingMode,
167 764 : ) -> Result<Listing, DownloadError> {
168 764 : let mut result = Listing::default();
169 764 :
170 764 : if let ListingMode::NoDelimiter = mode {
171 188 : let keys = self
172 188 : .list_recursive(prefix)
173 338 : .await
174 188 : .map_err(DownloadError::Other)?;
175 :
176 188 : result.keys = keys
177 188 : .into_iter()
178 243 : .filter(|k| {
179 243 : let path = k.with_base(&self.storage_root);
180 243 : !path.is_dir()
181 243 : })
182 188 : .collect();
183 188 :
184 188 : return Ok(result);
185 576 : }
186 :
187 576 : let path = match prefix {
188 574 : Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
189 2 : None => Cow::Borrowed(&self.storage_root),
190 : };
191 :
192 576 : let prefixes_to_filter = get_all_files(path.as_ref(), false)
193 213 : .await
194 576 : .map_err(DownloadError::Other)?;
195 :
196 : // filter out empty directories to mirror s3 behavior.
197 853 : for prefix in prefixes_to_filter {
198 277 : if prefix.is_dir()
199 275 : && is_directory_empty(&prefix)
200 274 : .await
201 275 : .map_err(DownloadError::Other)?
202 : {
203 6 : continue;
204 271 : }
205 271 :
206 271 : let stripped = prefix
207 271 : .strip_prefix(&self.storage_root)
208 271 : .context("Failed to strip prefix")
209 271 : .and_then(RemotePath::new)
210 271 : .expect(
211 271 : "We list files for storage root, hence should be able to remote the prefix",
212 271 : );
213 271 :
214 271 : if prefix.is_dir() {
215 269 : result.prefixes.push(stripped);
216 269 : } else {
217 2 : result.keys.push(stripped);
218 2 : }
219 : }
220 :
221 576 : Ok(result)
222 764 : }
223 :
224 12300 : async fn upload(
225 12300 : &self,
226 12300 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
227 12300 : data_size_bytes: usize,
228 12300 : to: &RemotePath,
229 12300 : metadata: Option<StorageMetadata>,
230 12300 : ) -> anyhow::Result<()> {
231 12300 : let target_file_path = to.with_base(&self.storage_root);
232 12300 : create_target_directory(&target_file_path).await?;
233 : // We need this dance with sort of durable rename (without fsyncs)
234 : // to prevent partial uploads. This was really hit when pageserver shutdown
235 : // cancelled the upload and partial file was left on the fs
236 : // NOTE: Because temp file suffix always the same this operation is racy.
237 : // Two concurrent operations can lead to the following sequence:
238 : // T1: write(temp)
239 : // T2: write(temp) -> overwrites the content
240 : // T1: rename(temp, dst) -> succeeds
241 : // T2: rename(temp, dst) -> fails, temp no longet exists
242 : // This can be solved by supplying unique temp suffix every time, but this situation
243 : // is not normal in the first place, the error can help (and helped at least once)
244 : // to discover bugs in upper level synchronization.
245 12298 : let temp_file_path =
246 12298 : path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
247 12296 : let mut destination = io::BufWriter::new(
248 12298 : fs::OpenOptions::new()
249 12298 : .write(true)
250 12298 : .create(true)
251 12298 : .open(&temp_file_path)
252 12113 : .await
253 12296 : .with_context(|| {
254 0 : format!("Failed to open target fs destination at '{target_file_path}'")
255 12296 : })?,
256 : );
257 :
258 12296 : let from_size_bytes = data_size_bytes as u64;
259 12296 : let data = tokio_util::io::StreamReader::new(data);
260 12296 : let data = std::pin::pin!(data);
261 12296 : let mut buffer_to_read = data.take(from_size_bytes);
262 :
263 : // alternatively we could just write the bytes to a file, but local_fs is a testing utility
264 12296 : let bytes_read = io::copy_buf(&mut buffer_to_read, &mut destination)
265 1014888 : .await
266 12295 : .with_context(|| {
267 0 : format!(
268 0 : "Failed to upload file (write temp) to the local storage at '{temp_file_path}'",
269 0 : )
270 12295 : })?;
271 :
272 12295 : if bytes_read < from_size_bytes {
273 2 : bail!("Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes");
274 12293 : }
275 12293 : // Check if there is any extra data after the given size.
276 12293 : let mut from = buffer_to_read.into_inner();
277 12293 : let extra_read = from.read(&mut [1]).await?;
278 : ensure!(
279 12293 : extra_read == 0,
280 4 : "Provided stream was larger than expected: expected {from_size_bytes} bytes",
281 : );
282 :
283 12289 : destination.flush().await.with_context(|| {
284 0 : format!(
285 0 : "Failed to upload (flush temp) file to the local storage at '{temp_file_path}'",
286 0 : )
287 12289 : })?;
288 :
289 12289 : fs::rename(temp_file_path, &target_file_path)
290 12008 : .await
291 12289 : .with_context(|| {
292 0 : format!(
293 0 : "Failed to upload (rename) file to the local storage at '{target_file_path}'",
294 0 : )
295 12289 : })?;
296 :
297 12289 : if let Some(storage_metadata) = metadata {
298 2 : let storage_metadata_path = storage_metadata_path(&target_file_path);
299 2 : fs::write(
300 2 : &storage_metadata_path,
301 2 : serde_json::to_string(&storage_metadata.0)
302 2 : .context("Failed to serialize storage metadata as json")?,
303 : )
304 2 : .await
305 2 : .with_context(|| {
306 0 : format!(
307 0 : "Failed to write metadata to the local storage at '{storage_metadata_path}'",
308 0 : )
309 2 : })?;
310 12287 : }
311 :
312 12289 : Ok(())
313 12295 : }
314 :
315 1419 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
316 1419 : let target_path = from.with_base(&self.storage_root);
317 1419 : if file_exists(&target_path).map_err(DownloadError::BadInput)? {
318 994 : let source = ReaderStream::new(
319 994 : fs::OpenOptions::new()
320 994 : .read(true)
321 994 : .open(&target_path)
322 952 : .await
323 994 : .with_context(|| {
324 0 : format!("Failed to open source file {target_path:?} to use in the download")
325 994 : })
326 994 : .map_err(DownloadError::Other)?,
327 : );
328 :
329 994 : let metadata = self
330 994 : .read_storage_metadata(&target_path)
331 2 : .await
332 994 : .map_err(DownloadError::Other)?;
333 994 : Ok(Download {
334 994 : metadata,
335 994 : last_modified: None,
336 994 : etag: None,
337 994 : download_stream: Box::pin(source),
338 994 : })
339 : } else {
340 425 : Err(DownloadError::NotFound)
341 : }
342 1419 : }
343 :
344 12 : async fn download_byte_range(
345 12 : &self,
346 12 : from: &RemotePath,
347 12 : start_inclusive: u64,
348 12 : end_exclusive: Option<u64>,
349 12 : ) -> Result<Download, DownloadError> {
350 12 : if let Some(end_exclusive) = end_exclusive {
351 10 : if end_exclusive <= start_inclusive {
352 2 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
353 8 : };
354 8 : if start_inclusive == end_exclusive.saturating_sub(1) {
355 2 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
356 6 : }
357 2 : }
358 8 : let target_path = from.with_base(&self.storage_root);
359 8 : if file_exists(&target_path).map_err(DownloadError::BadInput)? {
360 8 : let mut source = tokio::fs::OpenOptions::new()
361 8 : .read(true)
362 8 : .open(&target_path)
363 8 : .await
364 8 : .with_context(|| {
365 0 : format!("Failed to open source file {target_path:?} to use in the download")
366 8 : })
367 8 : .map_err(DownloadError::Other)?;
368 8 : source
369 8 : .seek(io::SeekFrom::Start(start_inclusive))
370 8 : .await
371 8 : .context("Failed to seek to the range start in a local storage file")
372 8 : .map_err(DownloadError::Other)?;
373 8 : let metadata = self
374 8 : .read_storage_metadata(&target_path)
375 2 : .await
376 8 : .map_err(DownloadError::Other)?;
377 :
378 8 : let download_stream: DownloadStream = match end_exclusive {
379 6 : Some(end_exclusive) => Box::pin(ReaderStream::new(
380 6 : source.take(end_exclusive - start_inclusive),
381 6 : )),
382 2 : None => Box::pin(ReaderStream::new(source)),
383 : };
384 8 : Ok(Download {
385 8 : metadata,
386 8 : last_modified: None,
387 8 : etag: None,
388 8 : download_stream,
389 8 : })
390 : } else {
391 0 : Err(DownloadError::NotFound)
392 : }
393 12 : }
394 :
395 1595 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
396 1595 : let file_path = path.with_base(&self.storage_root);
397 1595 : match fs::remove_file(&file_path).await {
398 1019 : Ok(()) => Ok(()),
399 : // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
400 : // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
401 : // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
402 576 : Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
403 0 : Err(e) => Err(anyhow::anyhow!(e)),
404 : }
405 1595 : }
406 :
407 85 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
408 1660 : for path in paths {
409 1575 : self.delete(path).await?
410 : }
411 85 : Ok(())
412 85 : }
413 :
414 2 : async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
415 2 : let from_path = from.with_base(&self.storage_root);
416 2 : let to_path = to.with_base(&self.storage_root);
417 2 : create_target_directory(&to_path).await?;
418 2 : fs::copy(&from_path, &to_path).await.with_context(|| {
419 0 : format!(
420 0 : "Failed to copy file from '{from_path}' to '{to_path}'",
421 0 : from_path = from_path,
422 0 : to_path = to_path
423 0 : )
424 2 : })?;
425 2 : Ok(())
426 2 : }
427 :
428 : #[allow(clippy::diverging_sub_expression)]
429 0 : async fn time_travel_recover(
430 0 : &self,
431 0 : _prefix: Option<&RemotePath>,
432 0 : _timestamp: SystemTime,
433 0 : _done_if_after: SystemTime,
434 0 : _cancel: &CancellationToken,
435 0 : ) -> Result<(), TimeTravelError> {
436 0 : Err(TimeTravelError::Unimplemented)
437 0 : }
438 : }
439 :
440 1004 : fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
441 1004 : path_with_suffix_extension(original_path, "metadata")
442 1004 : }
443 :
444 594 : fn get_all_files<'a, P>(
445 594 : directory_path: P,
446 594 : recursive: bool,
447 594 : ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Utf8PathBuf>>> + Send + Sync + 'a>>
448 594 : where
449 594 : P: AsRef<Utf8Path> + Send + Sync + 'a,
450 594 : {
451 594 : Box::pin(async move {
452 594 : let directory_path = directory_path.as_ref();
453 594 : if directory_path.exists() {
454 234 : if directory_path.is_dir() {
455 234 : let mut paths = Vec::new();
456 234 : let mut dir_contents = fs::read_dir(directory_path).await?;
457 529 : while let Some(dir_entry) = dir_contents.next_entry().await? {
458 295 : let file_type = dir_entry.file_type().await?;
459 295 : let entry_path =
460 295 : Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| {
461 0 : anyhow::Error::msg(format!(
462 0 : "non-Unicode path: {}",
463 0 : pb.to_string_lossy()
464 0 : ))
465 295 : })?;
466 295 : if file_type.is_symlink() {
467 0 : debug!("{entry_path:?} is a symlink, skipping")
468 295 : } else if file_type.is_dir() {
469 287 : if recursive {
470 18 : paths.extend(get_all_files(&entry_path, true).await?.into_iter())
471 : } else {
472 275 : paths.push(entry_path)
473 : }
474 8 : } else {
475 8 : paths.push(entry_path);
476 8 : }
477 : }
478 234 : Ok(paths)
479 : } else {
480 0 : bail!("Path {directory_path:?} is not a directory")
481 : }
482 : } else {
483 360 : Ok(Vec::new())
484 : }
485 594 : })
486 594 : }
487 :
488 12302 : async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
489 12302 : let target_dir = match target_file_path.parent() {
490 12302 : Some(parent_dir) => parent_dir,
491 0 : None => bail!("File path '{target_file_path}' has no parent directory"),
492 : };
493 12302 : if !target_dir.exists() {
494 927 : fs::create_dir_all(target_dir).await?;
495 11375 : }
496 12300 : Ok(())
497 12300 : }
498 :
499 1427 : fn file_exists(file_path: &Utf8Path) -> anyhow::Result<bool> {
500 1427 : if file_path.exists() {
501 1002 : ensure!(file_path.is_file(), "file path '{file_path}' is not a file");
502 1002 : Ok(true)
503 : } else {
504 425 : Ok(false)
505 : }
506 1427 : }
507 :
508 : #[cfg(test)]
509 : mod fs_tests {
510 : use super::*;
511 :
512 : use bytes::Bytes;
513 : use camino_tempfile::tempdir;
514 : use futures_util::Stream;
515 : use std::{collections::HashMap, io::Write};
516 :
517 6 : async fn read_and_assert_remote_file_contents(
518 6 : storage: &LocalFs,
519 6 : #[allow(clippy::ptr_arg)]
520 6 : // have to use &Utf8PathBuf due to `storage.local_path` parameter requirements
521 6 : remote_storage_path: &RemotePath,
522 6 : expected_metadata: Option<&StorageMetadata>,
523 6 : ) -> anyhow::Result<String> {
524 6 : let download = storage
525 6 : .download(remote_storage_path)
526 8 : .await
527 6 : .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
528 : ensure!(
529 6 : download.metadata.as_ref() == expected_metadata,
530 0 : "Unexpected metadata returned for the downloaded file"
531 : );
532 :
533 12 : let contents = aggregate(download.download_stream).await?;
534 :
535 6 : String::from_utf8(contents).map_err(anyhow::Error::new)
536 6 : }
537 :
538 2 : #[tokio::test]
539 2 : async fn upload_file() -> anyhow::Result<()> {
540 2 : let storage = create_storage()?;
541 :
542 12 : let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?;
543 2 : assert_eq!(
544 6 : storage.list_all().await?,
545 2 : vec![target_path_1.clone()],
546 0 : "Should list a single file after first upload"
547 : );
548 :
549 12 : let target_path_2 = upload_dummy_file(&storage, "upload_2", None).await?;
550 2 : assert_eq!(
551 6 : list_files_sorted(&storage).await?,
552 2 : vec![target_path_1.clone(), target_path_2.clone()],
553 0 : "Should list a two different files after second upload"
554 : );
555 :
556 2 : Ok(())
557 : }
558 :
559 2 : #[tokio::test]
560 2 : async fn upload_file_negatives() -> anyhow::Result<()> {
561 2 : let storage = create_storage()?;
562 :
563 2 : let id = RemotePath::new(Utf8Path::new("dummy"))?;
564 2 : let content = Bytes::from_static(b"12345");
565 8 : let content = move || futures::stream::once(futures::future::ready(Ok(content.clone())));
566 :
567 : // Check that you get an error if the size parameter doesn't match the actual
568 : // size of the stream.
569 2 : storage
570 2 : .upload(content(), 0, &id, None)
571 2 : .await
572 2 : .expect_err("upload with zero size succeeded");
573 2 : storage
574 2 : .upload(content(), 4, &id, None)
575 4 : .await
576 2 : .expect_err("upload with too short size succeeded");
577 2 : storage
578 2 : .upload(content(), 6, &id, None)
579 4 : .await
580 2 : .expect_err("upload with too large size succeeded");
581 2 :
582 2 : // Correct size is 5, this should succeed.
583 6 : storage.upload(content(), 5, &id, None).await?;
584 :
585 2 : Ok(())
586 : }
587 :
588 16 : fn create_storage() -> anyhow::Result<LocalFs> {
589 16 : let storage_root = tempdir()?.path().to_path_buf();
590 16 : LocalFs::new(storage_root)
591 16 : }
592 :
593 2 : #[tokio::test]
594 2 : async fn download_file() -> anyhow::Result<()> {
595 2 : let storage = create_storage()?;
596 2 : let upload_name = "upload_1";
597 12 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
598 :
599 6 : let contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
600 2 : assert_eq!(
601 2 : dummy_contents(upload_name),
602 : contents,
603 0 : "We should upload and download the same contents"
604 : );
605 :
606 2 : let non_existing_path = "somewhere/else";
607 2 : match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?).await {
608 2 : Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
609 0 : other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
610 : }
611 2 : Ok(())
612 : }
613 :
614 2 : #[tokio::test]
615 2 : async fn download_file_range_positive() -> anyhow::Result<()> {
616 2 : let storage = create_storage()?;
617 2 : let upload_name = "upload_1";
618 12 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
619 :
620 2 : let full_range_download_contents =
621 6 : read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
622 2 : assert_eq!(
623 2 : dummy_contents(upload_name),
624 : full_range_download_contents,
625 0 : "Download full range should return the whole upload"
626 : );
627 :
628 2 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
629 2 : let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
630 :
631 2 : let first_part_download = storage
632 2 : .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
633 4 : .await?;
634 2 : assert!(
635 2 : first_part_download.metadata.is_none(),
636 0 : "No metadata should be returned for no metadata upload"
637 : );
638 :
639 2 : let first_part_remote = aggregate(first_part_download.download_stream).await?;
640 2 : assert_eq!(
641 : first_part_local, first_part_remote,
642 0 : "First part bytes should be returned when requested"
643 : );
644 :
645 2 : let second_part_download = storage
646 2 : .download_byte_range(
647 2 : &upload_target,
648 2 : first_part_local.len() as u64,
649 2 : Some((first_part_local.len() + second_part_local.len()) as u64),
650 2 : )
651 4 : .await?;
652 2 : assert!(
653 2 : second_part_download.metadata.is_none(),
654 0 : "No metadata should be returned for no metadata upload"
655 : );
656 :
657 2 : let second_part_remote = aggregate(second_part_download.download_stream).await?;
658 2 : assert_eq!(
659 : second_part_local, second_part_remote,
660 0 : "Second part bytes should be returned when requested"
661 : );
662 :
663 2 : Ok(())
664 : }
665 :
666 2 : #[tokio::test]
667 2 : async fn download_file_range_negative() -> anyhow::Result<()> {
668 2 : let storage = create_storage()?;
669 2 : let upload_name = "upload_1";
670 12 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
671 :
672 2 : let start = 1_000_000_000;
673 2 : let end = start + 1;
674 2 : match storage
675 2 : .download_byte_range(
676 2 : &upload_target,
677 2 : start,
678 2 : Some(end), // exclusive end
679 2 : )
680 0 : .await
681 : {
682 0 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
683 2 : Err(e) => {
684 2 : let error_string = e.to_string();
685 2 : assert!(error_string.contains("zero bytes"));
686 2 : assert!(error_string.contains(&start.to_string()));
687 2 : assert!(error_string.contains(&end.to_string()));
688 : }
689 : }
690 :
691 2 : let start = 10000;
692 2 : let end = 234;
693 2 : assert!(start > end, "Should test an incorrect range");
694 2 : match storage
695 2 : .download_byte_range(&upload_target, start, Some(end))
696 0 : .await
697 : {
698 0 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
699 2 : Err(e) => {
700 2 : let error_string = e.to_string();
701 2 : assert!(error_string.contains("Invalid range"));
702 2 : assert!(error_string.contains(&start.to_string()));
703 2 : assert!(error_string.contains(&end.to_string()));
704 : }
705 : }
706 :
707 2 : Ok(())
708 : }
709 :
710 2 : #[tokio::test]
711 2 : async fn delete_file() -> anyhow::Result<()> {
712 2 : let storage = create_storage()?;
713 2 : let upload_name = "upload_1";
714 12 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
715 :
716 2 : storage.delete(&upload_target).await?;
717 6 : assert!(storage.list_all().await?.is_empty());
718 :
719 2 : storage
720 2 : .delete(&upload_target)
721 2 : .await
722 2 : .expect("Should allow deleting non-existing storage files");
723 2 :
724 2 : Ok(())
725 : }
726 :
727 2 : #[tokio::test]
728 2 : async fn file_with_metadata() -> anyhow::Result<()> {
729 2 : let storage = create_storage()?;
730 2 : let upload_name = "upload_1";
731 2 : let metadata = StorageMetadata(HashMap::from([
732 2 : ("one".to_string(), "1".to_string()),
733 2 : ("two".to_string(), "2".to_string()),
734 2 : ]));
735 2 : let upload_target =
736 14 : upload_dummy_file(&storage, upload_name, Some(metadata.clone())).await?;
737 :
738 2 : let full_range_download_contents =
739 8 : read_and_assert_remote_file_contents(&storage, &upload_target, Some(&metadata)).await?;
740 2 : assert_eq!(
741 2 : dummy_contents(upload_name),
742 : full_range_download_contents,
743 0 : "We should upload and download the same contents"
744 : );
745 :
746 2 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
747 2 : let (first_part_local, _) = uploaded_bytes.split_at(3);
748 :
749 2 : let partial_download_with_metadata = storage
750 2 : .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
751 6 : .await?;
752 2 : let first_part_remote = aggregate(partial_download_with_metadata.download_stream).await?;
753 2 : assert_eq!(
754 2 : first_part_local,
755 2 : first_part_remote.as_slice(),
756 0 : "First part bytes should be returned when requested"
757 : );
758 :
759 2 : assert_eq!(
760 2 : partial_download_with_metadata.metadata,
761 2 : Some(metadata),
762 0 : "We should get the same metadata back for partial download"
763 : );
764 :
765 2 : Ok(())
766 : }
767 :
768 2 : #[tokio::test]
769 2 : async fn list() -> anyhow::Result<()> {
770 : // No delimiter: should recursively list everything
771 2 : let storage = create_storage()?;
772 12 : let child = upload_dummy_file(&storage, "grandparent/parent/child", None).await?;
773 12 : let uncle = upload_dummy_file(&storage, "grandparent/uncle", None).await?;
774 :
775 2 : let listing = storage.list(None, ListingMode::NoDelimiter).await?;
776 2 : assert!(listing.prefixes.is_empty());
777 2 : assert_eq!(listing.keys, [uncle.clone(), child.clone()].to_vec());
778 :
779 : // Delimiter: should only go one deep
780 4 : let listing = storage.list(None, ListingMode::WithDelimiter).await?;
781 :
782 2 : assert_eq!(
783 2 : listing.prefixes,
784 2 : [RemotePath::from_string("timelines").unwrap()].to_vec()
785 2 : );
786 2 : assert!(listing.keys.is_empty());
787 :
788 : // Delimiter & prefix
789 2 : let listing = storage
790 2 : .list(
791 2 : Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
792 2 : ListingMode::WithDelimiter,
793 2 : )
794 4 : .await?;
795 2 : assert_eq!(
796 2 : listing.prefixes,
797 2 : [RemotePath::from_string("timelines/some_timeline/grandparent/parent").unwrap()]
798 2 : .to_vec()
799 2 : );
800 2 : assert_eq!(listing.keys, [uncle.clone()].to_vec());
801 :
802 2 : Ok(())
803 : }
804 :
805 18 : async fn upload_dummy_file(
806 18 : storage: &LocalFs,
807 18 : name: &str,
808 18 : metadata: Option<StorageMetadata>,
809 18 : ) -> anyhow::Result<RemotePath> {
810 18 : let from_path = storage
811 18 : .storage_root
812 18 : .join("timelines")
813 18 : .join("some_timeline")
814 18 : .join(name);
815 18 : let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
816 :
817 18 : let relative_path = from_path
818 18 : .strip_prefix(&storage.storage_root)
819 18 : .context("Failed to strip storage root prefix")
820 18 : .and_then(RemotePath::new)
821 18 : .with_context(|| {
822 0 : format!(
823 0 : "Failed to resolve remote part of path {:?} for base {:?}",
824 0 : from_path, storage.storage_root
825 0 : )
826 18 : })?;
827 :
828 18 : let file = tokio_util::io::ReaderStream::new(file);
829 18 :
830 92 : storage.upload(file, size, &relative_path, metadata).await?;
831 18 : Ok(relative_path)
832 18 : }
833 :
834 18 : async fn create_file_for_upload(
835 18 : path: &Utf8Path,
836 18 : contents: &str,
837 18 : ) -> anyhow::Result<(fs::File, usize)> {
838 18 : std::fs::create_dir_all(path.parent().unwrap())?;
839 18 : let mut file_for_writing = std::fs::OpenOptions::new()
840 18 : .write(true)
841 18 : .create_new(true)
842 18 : .open(path)?;
843 18 : write!(file_for_writing, "{}", contents)?;
844 18 : drop(file_for_writing);
845 18 : let file_size = path.metadata()?.len() as usize;
846 18 : Ok((
847 18 : fs::OpenOptions::new().read(true).open(&path).await?,
848 18 : file_size,
849 : ))
850 18 : }
851 :
852 28 : fn dummy_contents(name: &str) -> String {
853 28 : format!("contents for {name}")
854 28 : }
855 :
856 2 : async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
857 6 : let mut files = storage.list_all().await?;
858 2 : files.sort_by(|a, b| a.0.cmp(&b.0));
859 2 : Ok(files)
860 2 : }
861 :
862 12 : async fn aggregate(
863 12 : stream: impl Stream<Item = std::io::Result<Bytes>>,
864 12 : ) -> anyhow::Result<Vec<u8>> {
865 12 : use futures::stream::StreamExt;
866 12 : let mut out = Vec::new();
867 12 : let mut stream = std::pin::pin!(stream);
868 24 : while let Some(res) = stream.next().await {
869 12 : out.extend_from_slice(&res?[..]);
870 : }
871 12 : Ok(out)
872 12 : }
873 : }
|