Line data Source code
1 : //! Local filesystem acting as a remote storage.
2 : //! Multiple API users can use the same "storage" of this kind by using different storage roots.
3 : //!
4 : //! This storage used in tests, but can also be used in cases when a certain persistent
5 : //! volume is mounted to the local FS.
6 :
7 : use std::{
8 : borrow::Cow,
9 : future::Future,
10 : io::ErrorKind,
11 : path::{Path, PathBuf},
12 : pin::Pin,
13 : };
14 :
15 : use anyhow::{bail, ensure, Context};
16 : use tokio::{
17 : fs,
18 : io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
19 : };
20 : use tracing::*;
21 : use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
22 :
23 : use crate::{Download, DownloadError, RemotePath};
24 :
25 : use super::{RemoteStorage, StorageMetadata};
26 :
27 : const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
28 :
29 815 : #[derive(Debug, Clone)]
30 : pub struct LocalFs {
31 : storage_root: PathBuf,
32 : }
33 :
34 : impl LocalFs {
35 : /// Attempts to create local FS storage, along with its root directory.
36 : /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
37 146 : pub fn new(mut storage_root: PathBuf) -> anyhow::Result<Self> {
38 146 : if !storage_root.exists() {
39 68 : std::fs::create_dir_all(&storage_root).with_context(|| {
40 0 : format!("Failed to create all directories in the given root path {storage_root:?}")
41 68 : })?;
42 78 : }
43 146 : if !storage_root.is_absolute() {
44 36 : storage_root = storage_root.canonicalize().with_context(|| {
45 0 : format!("Failed to represent path {storage_root:?} as an absolute path")
46 36 : })?;
47 110 : }
48 :
49 146 : Ok(Self { storage_root })
50 146 : }
51 :
52 : // mirrors S3Bucket::s3_object_to_relative_path
53 18 : fn local_file_to_relative_path(&self, key: PathBuf) -> RemotePath {
54 18 : let relative_path = key
55 18 : .strip_prefix(&self.storage_root)
56 18 : .expect("relative path must contain storage_root as prefix");
57 18 : RemotePath(relative_path.into())
58 18 : }
59 :
60 739 : async fn read_storage_metadata(
61 739 : &self,
62 739 : file_path: &Path,
63 739 : ) -> anyhow::Result<Option<StorageMetadata>> {
64 739 : let metadata_path = storage_metadata_path(file_path);
65 739 : if metadata_path.exists() && metadata_path.is_file() {
66 2 : let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
67 0 : format!(
68 0 : "Failed to read metadata from the local storage at '{}'",
69 0 : metadata_path.display()
70 0 : )
71 2 : })?;
72 :
73 2 : serde_json::from_str(&metadata_string)
74 2 : .with_context(|| {
75 0 : format!(
76 0 : "Failed to deserialize metadata from the local storage at '{}'",
77 0 : metadata_path.display()
78 0 : )
79 2 : })
80 2 : .map(|metadata| Some(StorageMetadata(metadata)))
81 : } else {
82 737 : Ok(None)
83 : }
84 739 : }
85 :
86 : #[cfg(test)]
87 3 : async fn list(&self) -> anyhow::Result<Vec<RemotePath>> {
88 3 : Ok(get_all_files(&self.storage_root, true)
89 18 : .await?
90 3 : .into_iter()
91 3 : .map(|path| {
92 3 : path.strip_prefix(&self.storage_root)
93 3 : .context("Failed to strip storage root prefix")
94 3 : .and_then(RemotePath::new)
95 3 : .expect(
96 3 : "We list files for storage root, hence should be able to remote the prefix",
97 3 : )
98 3 : })
99 3 : .collect())
100 3 : }
101 : }
102 :
103 : #[async_trait::async_trait]
104 : impl RemoteStorage for LocalFs {
105 19 : async fn list_prefixes(
106 19 : &self,
107 19 : prefix: Option<&RemotePath>,
108 19 : ) -> Result<Vec<RemotePath>, DownloadError> {
109 19 : let path = match prefix {
110 19 : Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
111 0 : None => Cow::Borrowed(&self.storage_root),
112 : };
113 :
114 19 : let prefixes_to_filter = get_all_files(path.as_ref(), false)
115 38 : .await
116 19 : .map_err(DownloadError::Other)?;
117 :
118 19 : let mut prefixes = Vec::with_capacity(prefixes_to_filter.len());
119 :
120 : // filter out empty directories to mirror s3 behavior.
121 46 : for prefix in prefixes_to_filter {
122 27 : if prefix.is_dir()
123 27 : && is_directory_empty(&prefix)
124 29 : .await
125 27 : .map_err(DownloadError::Other)?
126 : {
127 2 : continue;
128 25 : }
129 25 :
130 25 : prefixes.push(
131 25 : prefix
132 25 : .strip_prefix(&self.storage_root)
133 25 : .context("Failed to strip prefix")
134 25 : .and_then(RemotePath::new)
135 25 : .expect(
136 25 : "We list files for storage root, hence should be able to remote the prefix",
137 25 : ),
138 25 : )
139 : }
140 :
141 19 : Ok(prefixes)
142 38 : }
143 :
144 : // recursively lists all files in a directory,
145 : // mirroring the `list_files` for `s3_bucket`
146 15 : async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
147 15 : let full_path = match folder {
148 15 : Some(folder) => folder.with_base(&self.storage_root),
149 0 : None => self.storage_root.clone(),
150 : };
151 15 : let mut files = vec![];
152 15 : let mut directory_queue = vec![full_path.clone()];
153 :
154 30 : while let Some(cur_folder) = directory_queue.pop() {
155 15 : let mut entries = fs::read_dir(cur_folder.clone()).await?;
156 33 : while let Some(entry) = entries.next_entry().await? {
157 18 : let file_name: PathBuf = entry.file_name().into();
158 18 : let full_file_name = cur_folder.clone().join(&file_name);
159 18 : let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
160 18 : files.push(file_remote_path.clone());
161 18 : if full_file_name.is_dir() {
162 0 : directory_queue.push(full_file_name);
163 18 : }
164 : }
165 : }
166 15 : Ok(files)
167 30 : }
168 :
169 6303 : async fn upload(
170 6303 : &self,
171 6303 : data: impl io::AsyncRead + Unpin + Send + Sync + 'static,
172 6303 : data_size_bytes: usize,
173 6303 : to: &RemotePath,
174 6303 : metadata: Option<StorageMetadata>,
175 6303 : ) -> anyhow::Result<()> {
176 6303 : let target_file_path = to.with_base(&self.storage_root);
177 6303 : create_target_directory(&target_file_path).await?;
178 : // We need this dance with sort of durable rename (without fsyncs)
179 : // to prevent partial uploads. This was really hit when pageserver shutdown
180 : // cancelled the upload and partial file was left on the fs
181 : // NOTE: Because temp file suffix always the same this operation is racy.
182 : // Two concurrent operations can lead to the following sequence:
183 : // T1: write(temp)
184 : // T2: write(temp) -> overwrites the content
185 : // T1: rename(temp, dst) -> succeeds
186 : // T2: rename(temp, dst) -> fails, temp no longet exists
187 : // This can be solved by supplying unique temp suffix every time, but this situation
188 : // is not normal in the first place, the error can help (and helped at least once)
189 : // to discover bugs in upper level synchronization.
190 6303 : let temp_file_path =
191 6303 : path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
192 6300 : let mut destination = io::BufWriter::new(
193 6303 : fs::OpenOptions::new()
194 6303 : .write(true)
195 6303 : .create(true)
196 6303 : .open(&temp_file_path)
197 6257 : .await
198 6300 : .with_context(|| {
199 0 : format!(
200 0 : "Failed to open target fs destination at '{}'",
201 0 : target_file_path.display()
202 0 : )
203 6300 : })?,
204 : );
205 :
206 6300 : let from_size_bytes = data_size_bytes as u64;
207 6300 : let mut buffer_to_read = data.take(from_size_bytes);
208 :
209 6300 : let bytes_read = io::copy(&mut buffer_to_read, &mut destination)
210 1300002 : .await
211 6299 : .with_context(|| {
212 0 : format!(
213 0 : "Failed to upload file (write temp) to the local storage at '{}'",
214 0 : temp_file_path.display()
215 0 : )
216 6299 : })?;
217 :
218 6299 : if bytes_read < from_size_bytes {
219 1 : bail!("Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes");
220 6298 : }
221 6298 : // Check if there is any extra data after the given size.
222 6298 : let mut from = buffer_to_read.into_inner();
223 6298 : let extra_read = from.read(&mut [1]).await?;
224 6298 : ensure!(
225 6298 : extra_read == 0,
226 2 : "Provided stream was larger than expected: expected {from_size_bytes} bytes",
227 : );
228 :
229 6296 : destination.flush().await.with_context(|| {
230 0 : format!(
231 0 : "Failed to upload (flush temp) file to the local storage at '{}'",
232 0 : temp_file_path.display()
233 0 : )
234 6296 : })?;
235 :
236 6296 : fs::rename(temp_file_path, &target_file_path)
237 6254 : .await
238 6295 : .with_context(|| {
239 1 : format!(
240 1 : "Failed to upload (rename) file to the local storage at '{}'",
241 1 : target_file_path.display()
242 1 : )
243 6295 : })?;
244 :
245 6294 : if let Some(storage_metadata) = metadata {
246 1 : let storage_metadata_path = storage_metadata_path(&target_file_path);
247 1 : fs::write(
248 1 : &storage_metadata_path,
249 1 : serde_json::to_string(&storage_metadata.0)
250 1 : .context("Failed to serialize storage metadata as json")?,
251 : )
252 1 : .await
253 1 : .with_context(|| {
254 0 : format!(
255 0 : "Failed to write metadata to the local storage at '{}'",
256 0 : storage_metadata_path.display()
257 0 : )
258 1 : })?;
259 6293 : }
260 :
261 6294 : Ok(())
262 12601 : }
263 :
264 879 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
265 879 : let target_path = from.with_base(&self.storage_root);
266 879 : if file_exists(&target_path).map_err(DownloadError::BadInput)? {
267 734 : let source = io::BufReader::new(
268 734 : fs::OpenOptions::new()
269 734 : .read(true)
270 734 : .open(&target_path)
271 704 : .await
272 734 : .with_context(|| {
273 0 : format!("Failed to open source file {target_path:?} to use in the download")
274 734 : })
275 734 : .map_err(DownloadError::Other)?,
276 : );
277 :
278 734 : let metadata = self
279 734 : .read_storage_metadata(&target_path)
280 1 : .await
281 734 : .map_err(DownloadError::Other)?;
282 734 : Ok(Download {
283 734 : metadata,
284 734 : download_stream: Box::pin(source),
285 734 : })
286 : } else {
287 145 : Err(DownloadError::NotFound)
288 : }
289 1758 : }
290 :
291 7 : async fn download_byte_range(
292 7 : &self,
293 7 : from: &RemotePath,
294 7 : start_inclusive: u64,
295 7 : end_exclusive: Option<u64>,
296 7 : ) -> Result<Download, DownloadError> {
297 7 : if let Some(end_exclusive) = end_exclusive {
298 5 : if end_exclusive <= start_inclusive {
299 1 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
300 4 : };
301 4 : if start_inclusive == end_exclusive.saturating_sub(1) {
302 1 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
303 3 : }
304 2 : }
305 5 : let target_path = from.with_base(&self.storage_root);
306 5 : if file_exists(&target_path).map_err(DownloadError::BadInput)? {
307 5 : let mut source = io::BufReader::new(
308 5 : fs::OpenOptions::new()
309 5 : .read(true)
310 5 : .open(&target_path)
311 5 : .await
312 5 : .with_context(|| {
313 0 : format!("Failed to open source file {target_path:?} to use in the download")
314 5 : })
315 5 : .map_err(DownloadError::Other)?,
316 : );
317 5 : source
318 5 : .seek(io::SeekFrom::Start(start_inclusive))
319 5 : .await
320 5 : .context("Failed to seek to the range start in a local storage file")
321 5 : .map_err(DownloadError::Other)?;
322 5 : let metadata = self
323 5 : .read_storage_metadata(&target_path)
324 1 : .await
325 5 : .map_err(DownloadError::Other)?;
326 :
327 5 : Ok(match end_exclusive {
328 3 : Some(end_exclusive) => Download {
329 3 : metadata,
330 3 : download_stream: Box::pin(source.take(end_exclusive - start_inclusive)),
331 3 : },
332 2 : None => Download {
333 2 : metadata,
334 2 : download_stream: Box::pin(source),
335 2 : },
336 : })
337 : } else {
338 0 : Err(DownloadError::NotFound)
339 : }
340 14 : }
341 :
342 1772 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
343 1772 : let file_path = path.with_base(&self.storage_root);
344 1772 : match fs::remove_file(&file_path).await {
345 1739 : Ok(()) => Ok(()),
346 : // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
347 : // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
348 : // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
349 33 : Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
350 0 : Err(e) => Err(anyhow::anyhow!(e)),
351 : }
352 3544 : }
353 :
354 1 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
355 4 : for path in paths {
356 3 : self.delete(path).await?
357 : }
358 1 : Ok(())
359 2 : }
360 : }
361 :
362 740 : fn storage_metadata_path(original_path: &Path) -> PathBuf {
363 740 : path_with_suffix_extension(original_path, "metadata")
364 740 : }
365 :
366 28 : fn get_all_files<'a, P>(
367 28 : directory_path: P,
368 28 : recursive: bool,
369 28 : ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<PathBuf>>> + Send + Sync + 'a>>
370 28 : where
371 28 : P: AsRef<Path> + Send + Sync + 'a,
372 28 : {
373 28 : Box::pin(async move {
374 28 : let directory_path = directory_path.as_ref();
375 28 : if directory_path.exists() {
376 28 : if directory_path.is_dir() {
377 28 : let mut paths = Vec::new();
378 28 : let mut dir_contents = fs::read_dir(directory_path).await?;
379 64 : while let Some(dir_entry) = dir_contents.next_entry().await? {
380 36 : let file_type = dir_entry.file_type().await?;
381 36 : let entry_path = dir_entry.path();
382 36 : if file_type.is_symlink() {
383 0 : debug!("{entry_path:?} is a symlink, skipping")
384 36 : } else if file_type.is_dir() {
385 33 : if recursive {
386 18 : paths.extend(get_all_files(&entry_path, true).await?.into_iter())
387 : } else {
388 27 : paths.push(entry_path)
389 : }
390 3 : } else {
391 3 : paths.push(entry_path);
392 3 : }
393 : }
394 28 : Ok(paths)
395 : } else {
396 0 : bail!("Path {directory_path:?} is not a directory")
397 : }
398 : } else {
399 0 : Ok(Vec::new())
400 : }
401 28 : })
402 28 : }
403 :
404 6303 : async fn create_target_directory(target_file_path: &Path) -> anyhow::Result<()> {
405 6303 : let target_dir = match target_file_path.parent() {
406 6303 : Some(parent_dir) => parent_dir,
407 0 : None => bail!(
408 0 : "File path '{}' has no parent directory",
409 0 : target_file_path.display()
410 0 : ),
411 : };
412 6303 : if !target_dir.exists() {
413 286 : fs::create_dir_all(target_dir).await?;
414 6017 : }
415 6303 : Ok(())
416 6303 : }
417 :
418 884 : fn file_exists(file_path: &Path) -> anyhow::Result<bool> {
419 884 : if file_path.exists() {
420 739 : ensure!(
421 739 : file_path.is_file(),
422 0 : "file path '{}' is not a file",
423 0 : file_path.display()
424 : );
425 739 : Ok(true)
426 : } else {
427 145 : Ok(false)
428 : }
429 884 : }
430 :
431 : #[cfg(test)]
432 : mod fs_tests {
433 : use super::*;
434 :
435 : use std::{collections::HashMap, io::Write};
436 : use tempfile::tempdir;
437 :
438 3 : async fn read_and_assert_remote_file_contents(
439 3 : storage: &LocalFs,
440 3 : #[allow(clippy::ptr_arg)]
441 3 : // have to use &PathBuf due to `storage.local_path` parameter requirements
442 3 : remote_storage_path: &RemotePath,
443 3 : expected_metadata: Option<&StorageMetadata>,
444 3 : ) -> anyhow::Result<String> {
445 3 : let mut download = storage
446 3 : .download(remote_storage_path)
447 4 : .await
448 3 : .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
449 3 : ensure!(
450 3 : download.metadata.as_ref() == expected_metadata,
451 0 : "Unexpected metadata returned for the downloaded file"
452 : );
453 :
454 3 : let mut contents = String::new();
455 3 : download
456 3 : .download_stream
457 3 : .read_to_string(&mut contents)
458 6 : .await
459 3 : .context("Failed to read remote file contents into string")?;
460 3 : Ok(contents)
461 3 : }
462 :
463 1 : #[tokio::test]
464 1 : async fn upload_file() -> anyhow::Result<()> {
465 1 : let storage = create_storage()?;
466 :
467 6 : let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?;
468 1 : assert_eq!(
469 6 : storage.list().await?,
470 1 : vec![target_path_1.clone()],
471 0 : "Should list a single file after first upload"
472 : );
473 :
474 6 : let target_path_2 = upload_dummy_file(&storage, "upload_2", None).await?;
475 1 : assert_eq!(
476 6 : list_files_sorted(&storage).await?,
477 1 : vec![target_path_1.clone(), target_path_2.clone()],
478 0 : "Should list a two different files after second upload"
479 : );
480 :
481 1 : Ok(())
482 : }
483 :
484 1 : #[tokio::test]
485 1 : async fn upload_file_negatives() -> anyhow::Result<()> {
486 1 : let storage = create_storage()?;
487 :
488 1 : let id = RemotePath::new(Path::new("dummy"))?;
489 1 : let content = std::io::Cursor::new(b"12345");
490 1 :
491 1 : // Check that you get an error if the size parameter doesn't match the actual
492 1 : // size of the stream.
493 1 : storage
494 1 : .upload(Box::new(content.clone()), 0, &id, None)
495 2 : .await
496 1 : .expect_err("upload with zero size succeeded");
497 1 : storage
498 1 : .upload(Box::new(content.clone()), 4, &id, None)
499 2 : .await
500 1 : .expect_err("upload with too short size succeeded");
501 1 : storage
502 1 : .upload(Box::new(content.clone()), 6, &id, None)
503 2 : .await
504 1 : .expect_err("upload with too large size succeeded");
505 1 :
506 1 : // Correct size is 5, this should succeed.
507 3 : storage.upload(Box::new(content), 5, &id, None).await?;
508 :
509 1 : Ok(())
510 : }
511 :
512 7 : fn create_storage() -> anyhow::Result<LocalFs> {
513 7 : LocalFs::new(tempdir()?.path().to_owned())
514 7 : }
515 :
516 1 : #[tokio::test]
517 1 : async fn download_file() -> anyhow::Result<()> {
518 1 : let storage = create_storage()?;
519 1 : let upload_name = "upload_1";
520 6 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
521 :
522 3 : let contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
523 1 : assert_eq!(
524 1 : dummy_contents(upload_name),
525 : contents,
526 0 : "We should upload and download the same contents"
527 : );
528 :
529 1 : let non_existing_path = "somewhere/else";
530 1 : match storage.download(&RemotePath::new(Path::new(non_existing_path))?).await {
531 1 : Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
532 0 : other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
533 : }
534 1 : Ok(())
535 : }
536 :
537 1 : #[tokio::test]
538 1 : async fn download_file_range_positive() -> anyhow::Result<()> {
539 1 : let storage = create_storage()?;
540 1 : let upload_name = "upload_1";
541 6 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
542 :
543 1 : let full_range_download_contents =
544 3 : read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
545 1 : assert_eq!(
546 1 : dummy_contents(upload_name),
547 : full_range_download_contents,
548 0 : "Download full range should return the whole upload"
549 : );
550 :
551 1 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
552 1 : let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
553 :
554 1 : let mut first_part_download = storage
555 1 : .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
556 2 : .await?;
557 1 : assert!(
558 1 : first_part_download.metadata.is_none(),
559 0 : "No metadata should be returned for no metadata upload"
560 : );
561 :
562 1 : let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
563 1 : io::copy(
564 1 : &mut first_part_download.download_stream,
565 1 : &mut first_part_remote,
566 1 : )
567 1 : .await?;
568 1 : first_part_remote.flush().await?;
569 1 : let first_part_remote = first_part_remote.into_inner().into_inner();
570 1 : assert_eq!(
571 1 : first_part_local,
572 1 : first_part_remote.as_slice(),
573 0 : "First part bytes should be returned when requested"
574 : );
575 :
576 1 : let mut second_part_download = storage
577 1 : .download_byte_range(
578 1 : &upload_target,
579 1 : first_part_local.len() as u64,
580 1 : Some((first_part_local.len() + second_part_local.len()) as u64),
581 1 : )
582 2 : .await?;
583 1 : assert!(
584 1 : second_part_download.metadata.is_none(),
585 0 : "No metadata should be returned for no metadata upload"
586 : );
587 :
588 1 : let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
589 1 : io::copy(
590 1 : &mut second_part_download.download_stream,
591 1 : &mut second_part_remote,
592 1 : )
593 1 : .await?;
594 1 : second_part_remote.flush().await?;
595 1 : let second_part_remote = second_part_remote.into_inner().into_inner();
596 1 : assert_eq!(
597 1 : second_part_local,
598 1 : second_part_remote.as_slice(),
599 0 : "Second part bytes should be returned when requested"
600 : );
601 :
602 1 : Ok(())
603 : }
604 :
605 1 : #[tokio::test]
606 1 : async fn download_file_range_negative() -> anyhow::Result<()> {
607 1 : let storage = create_storage()?;
608 1 : let upload_name = "upload_1";
609 6 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
610 :
611 1 : let start = 1_000_000_000;
612 1 : let end = start + 1;
613 1 : match storage
614 1 : .download_byte_range(
615 1 : &upload_target,
616 1 : start,
617 1 : Some(end), // exclusive end
618 1 : )
619 0 : .await
620 : {
621 0 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
622 1 : Err(e) => {
623 1 : let error_string = e.to_string();
624 1 : assert!(error_string.contains("zero bytes"));
625 1 : assert!(error_string.contains(&start.to_string()));
626 1 : assert!(error_string.contains(&end.to_string()));
627 : }
628 : }
629 :
630 1 : let start = 10000;
631 1 : let end = 234;
632 1 : assert!(start > end, "Should test an incorrect range");
633 1 : match storage
634 1 : .download_byte_range(&upload_target, start, Some(end))
635 0 : .await
636 : {
637 0 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
638 1 : Err(e) => {
639 1 : let error_string = e.to_string();
640 1 : assert!(error_string.contains("Invalid range"));
641 1 : assert!(error_string.contains(&start.to_string()));
642 1 : assert!(error_string.contains(&end.to_string()));
643 : }
644 : }
645 :
646 1 : Ok(())
647 : }
648 :
649 1 : #[tokio::test]
650 1 : async fn delete_file() -> anyhow::Result<()> {
651 1 : let storage = create_storage()?;
652 1 : let upload_name = "upload_1";
653 6 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
654 :
655 1 : storage.delete(&upload_target).await?;
656 6 : assert!(storage.list().await?.is_empty());
657 :
658 1 : storage
659 1 : .delete(&upload_target)
660 1 : .await
661 1 : .expect("Should allow deleting non-existing storage files");
662 1 :
663 1 : Ok(())
664 : }
665 :
666 1 : #[tokio::test]
667 1 : async fn file_with_metadata() -> anyhow::Result<()> {
668 1 : let storage = create_storage()?;
669 1 : let upload_name = "upload_1";
670 1 : let metadata = StorageMetadata(HashMap::from([
671 1 : ("one".to_string(), "1".to_string()),
672 1 : ("two".to_string(), "2".to_string()),
673 1 : ]));
674 1 : let upload_target =
675 7 : upload_dummy_file(&storage, upload_name, Some(metadata.clone())).await?;
676 :
677 1 : let full_range_download_contents =
678 4 : read_and_assert_remote_file_contents(&storage, &upload_target, Some(&metadata)).await?;
679 1 : assert_eq!(
680 1 : dummy_contents(upload_name),
681 : full_range_download_contents,
682 0 : "We should upload and download the same contents"
683 : );
684 :
685 1 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
686 1 : let (first_part_local, _) = uploaded_bytes.split_at(3);
687 :
688 1 : let mut partial_download_with_metadata = storage
689 1 : .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
690 3 : .await?;
691 1 : let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
692 1 : io::copy(
693 1 : &mut partial_download_with_metadata.download_stream,
694 1 : &mut first_part_remote,
695 1 : )
696 1 : .await?;
697 1 : first_part_remote.flush().await?;
698 1 : let first_part_remote = first_part_remote.into_inner().into_inner();
699 1 : assert_eq!(
700 1 : first_part_local,
701 1 : first_part_remote.as_slice(),
702 0 : "First part bytes should be returned when requested"
703 : );
704 :
705 1 : assert_eq!(
706 1 : partial_download_with_metadata.metadata,
707 1 : Some(metadata),
708 0 : "We should get the same metadata back for partial download"
709 : );
710 :
711 1 : Ok(())
712 : }
713 :
714 7 : async fn upload_dummy_file(
715 7 : storage: &LocalFs,
716 7 : name: &str,
717 7 : metadata: Option<StorageMetadata>,
718 7 : ) -> anyhow::Result<RemotePath> {
719 7 : let from_path = storage
720 7 : .storage_root
721 7 : .join("timelines")
722 7 : .join("some_timeline")
723 7 : .join(name);
724 7 : let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
725 :
726 7 : let relative_path = from_path
727 7 : .strip_prefix(&storage.storage_root)
728 7 : .context("Failed to strip storage root prefix")
729 7 : .and_then(RemotePath::new)
730 7 : .with_context(|| {
731 0 : format!(
732 0 : "Failed to resolve remote part of path {:?} for base {:?}",
733 0 : from_path, storage.storage_root
734 0 : )
735 7 : })?;
736 :
737 7 : storage
738 7 : .upload(Box::new(file), size, &relative_path, metadata)
739 36 : .await?;
740 7 : Ok(relative_path)
741 7 : }
742 :
743 7 : async fn create_file_for_upload(
744 7 : path: &Path,
745 7 : contents: &str,
746 7 : ) -> anyhow::Result<(io::BufReader<fs::File>, usize)> {
747 7 : std::fs::create_dir_all(path.parent().unwrap())?;
748 7 : let mut file_for_writing = std::fs::OpenOptions::new()
749 7 : .write(true)
750 7 : .create_new(true)
751 7 : .open(path)?;
752 7 : write!(file_for_writing, "{}", contents)?;
753 7 : drop(file_for_writing);
754 7 : let file_size = path.metadata()?.len() as usize;
755 7 : Ok((
756 7 : io::BufReader::new(fs::OpenOptions::new().read(true).open(&path).await?),
757 7 : file_size,
758 : ))
759 7 : }
760 :
761 12 : fn dummy_contents(name: &str) -> String {
762 12 : format!("contents for {name}")
763 12 : }
764 :
765 1 : async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
766 6 : let mut files = storage.list().await?;
767 1 : files.sort_by(|a, b| a.0.cmp(&b.0));
768 1 : Ok(files)
769 1 : }
770 : }
|