LCOV - code coverage report
Current view: top level - libs/remote_storage/src - local_fs.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 89.0 % 572 509
Test Date: 2023-09-06 10:18:01 Functions: 53.4 % 118 63

            Line data    Source code
       1              : //! Local filesystem acting as a remote storage.
       2              : //! Multiple API users can use the same "storage" of this kind by using different storage roots.
       3              : //!
       4              : //! This storage used in tests, but can also be used in cases when a certain persistent
       5              : //! volume is mounted to the local FS.
       6              : 
       7              : use std::{
       8              :     borrow::Cow,
       9              :     future::Future,
      10              :     io::ErrorKind,
      11              :     path::{Path, PathBuf},
      12              :     pin::Pin,
      13              : };
      14              : 
      15              : use anyhow::{bail, ensure, Context};
      16              : use tokio::{
      17              :     fs,
      18              :     io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
      19              : };
      20              : use tracing::*;
      21              : use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
      22              : 
      23              : use crate::{Download, DownloadError, RemotePath};
      24              : 
      25              : use super::{RemoteStorage, StorageMetadata};
      26              : 
      27              : const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
      28              : 
      29          815 : #[derive(Debug, Clone)]
      30              : pub struct LocalFs {
      31              :     storage_root: PathBuf,
      32              : }
      33              : 
      34              : impl LocalFs {
      35              :     /// Attempts to create local FS storage, along with its root directory.
      36              :     /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
      37          146 :     pub fn new(mut storage_root: PathBuf) -> anyhow::Result<Self> {
      38          146 :         if !storage_root.exists() {
      39           68 :             std::fs::create_dir_all(&storage_root).with_context(|| {
      40            0 :                 format!("Failed to create all directories in the given root path {storage_root:?}")
      41           68 :             })?;
      42           78 :         }
      43          146 :         if !storage_root.is_absolute() {
      44           36 :             storage_root = storage_root.canonicalize().with_context(|| {
      45            0 :                 format!("Failed to represent path {storage_root:?} as an absolute path")
      46           36 :             })?;
      47          110 :         }
      48              : 
      49          146 :         Ok(Self { storage_root })
      50          146 :     }
      51              : 
      52              :     // mirrors S3Bucket::s3_object_to_relative_path
      53           18 :     fn local_file_to_relative_path(&self, key: PathBuf) -> RemotePath {
      54           18 :         let relative_path = key
      55           18 :             .strip_prefix(&self.storage_root)
      56           18 :             .expect("relative path must contain storage_root as prefix");
      57           18 :         RemotePath(relative_path.into())
      58           18 :     }
      59              : 
      60          739 :     async fn read_storage_metadata(
      61          739 :         &self,
      62          739 :         file_path: &Path,
      63          739 :     ) -> anyhow::Result<Option<StorageMetadata>> {
      64          739 :         let metadata_path = storage_metadata_path(file_path);
      65          739 :         if metadata_path.exists() && metadata_path.is_file() {
      66            2 :             let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
      67            0 :                 format!(
      68            0 :                     "Failed to read metadata from the local storage at '{}'",
      69            0 :                     metadata_path.display()
      70            0 :                 )
      71            2 :             })?;
      72              : 
      73            2 :             serde_json::from_str(&metadata_string)
      74            2 :                 .with_context(|| {
      75            0 :                     format!(
      76            0 :                         "Failed to deserialize metadata from the local storage at '{}'",
      77            0 :                         metadata_path.display()
      78            0 :                     )
      79            2 :                 })
      80            2 :                 .map(|metadata| Some(StorageMetadata(metadata)))
      81              :         } else {
      82          737 :             Ok(None)
      83              :         }
      84          739 :     }
      85              : 
      86              :     #[cfg(test)]
      87            3 :     async fn list(&self) -> anyhow::Result<Vec<RemotePath>> {
      88            3 :         Ok(get_all_files(&self.storage_root, true)
      89           18 :             .await?
      90            3 :             .into_iter()
      91            3 :             .map(|path| {
      92            3 :                 path.strip_prefix(&self.storage_root)
      93            3 :                     .context("Failed to strip storage root prefix")
      94            3 :                     .and_then(RemotePath::new)
      95            3 :                     .expect(
      96            3 :                         "We list files for storage root, hence should be able to remote the prefix",
      97            3 :                     )
      98            3 :             })
      99            3 :             .collect())
     100            3 :     }
     101              : }
     102              : 
     103              : #[async_trait::async_trait]
     104              : impl RemoteStorage for LocalFs {
     105           19 :     async fn list_prefixes(
     106           19 :         &self,
     107           19 :         prefix: Option<&RemotePath>,
     108           19 :     ) -> Result<Vec<RemotePath>, DownloadError> {
     109           19 :         let path = match prefix {
     110           19 :             Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
     111            0 :             None => Cow::Borrowed(&self.storage_root),
     112              :         };
     113              : 
     114           19 :         let prefixes_to_filter = get_all_files(path.as_ref(), false)
     115           38 :             .await
     116           19 :             .map_err(DownloadError::Other)?;
     117              : 
     118           19 :         let mut prefixes = Vec::with_capacity(prefixes_to_filter.len());
     119              : 
     120              :         // filter out empty directories to mirror s3 behavior.
     121           46 :         for prefix in prefixes_to_filter {
     122           27 :             if prefix.is_dir()
     123           27 :                 && is_directory_empty(&prefix)
     124           29 :                     .await
     125           27 :                     .map_err(DownloadError::Other)?
     126              :             {
     127            2 :                 continue;
     128           25 :             }
     129           25 : 
     130           25 :             prefixes.push(
     131           25 :                 prefix
     132           25 :                     .strip_prefix(&self.storage_root)
     133           25 :                     .context("Failed to strip prefix")
     134           25 :                     .and_then(RemotePath::new)
     135           25 :                     .expect(
     136           25 :                         "We list files for storage root, hence should be able to remote the prefix",
     137           25 :                     ),
     138           25 :             )
     139              :         }
     140              : 
     141           19 :         Ok(prefixes)
     142           38 :     }
     143              : 
     144              :     // recursively lists all files in a directory,
     145              :     // mirroring the `list_files` for `s3_bucket`
     146           15 :     async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
     147           15 :         let full_path = match folder {
     148           15 :             Some(folder) => folder.with_base(&self.storage_root),
     149            0 :             None => self.storage_root.clone(),
     150              :         };
     151           15 :         let mut files = vec![];
     152           15 :         let mut directory_queue = vec![full_path.clone()];
     153              : 
     154           30 :         while let Some(cur_folder) = directory_queue.pop() {
     155           15 :             let mut entries = fs::read_dir(cur_folder.clone()).await?;
     156           33 :             while let Some(entry) = entries.next_entry().await? {
     157           18 :                 let file_name: PathBuf = entry.file_name().into();
     158           18 :                 let full_file_name = cur_folder.clone().join(&file_name);
     159           18 :                 let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
     160           18 :                 files.push(file_remote_path.clone());
     161           18 :                 if full_file_name.is_dir() {
     162            0 :                     directory_queue.push(full_file_name);
     163           18 :                 }
     164              :             }
     165              :         }
     166           15 :         Ok(files)
     167           30 :     }
     168              : 
     169         6303 :     async fn upload(
     170         6303 :         &self,
     171         6303 :         data: impl io::AsyncRead + Unpin + Send + Sync + 'static,
     172         6303 :         data_size_bytes: usize,
     173         6303 :         to: &RemotePath,
     174         6303 :         metadata: Option<StorageMetadata>,
     175         6303 :     ) -> anyhow::Result<()> {
     176         6303 :         let target_file_path = to.with_base(&self.storage_root);
     177         6303 :         create_target_directory(&target_file_path).await?;
     178              :         // We need this dance with sort of durable rename (without fsyncs)
     179              :         // to prevent partial uploads. This was really hit when pageserver shutdown
     180              :         // cancelled the upload and partial file was left on the fs
     181              :         // NOTE: Because temp file suffix always the same this operation is racy.
     182              :         // Two concurrent operations can lead to the following sequence:
     183              :         // T1: write(temp)
     184              :         // T2: write(temp) -> overwrites the content
     185              :         // T1: rename(temp, dst) -> succeeds
     186              :         // T2: rename(temp, dst) -> fails, temp no longet exists
     187              :         // This can be solved by supplying unique temp suffix every time, but this situation
     188              :         // is not normal in the first place, the error can help (and helped at least once)
     189              :         // to discover bugs in upper level synchronization.
     190         6303 :         let temp_file_path =
     191         6303 :             path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
     192         6300 :         let mut destination = io::BufWriter::new(
     193         6303 :             fs::OpenOptions::new()
     194         6303 :                 .write(true)
     195         6303 :                 .create(true)
     196         6303 :                 .open(&temp_file_path)
     197         6257 :                 .await
     198         6300 :                 .with_context(|| {
     199            0 :                     format!(
     200            0 :                         "Failed to open target fs destination at '{}'",
     201            0 :                         target_file_path.display()
     202            0 :                     )
     203         6300 :                 })?,
     204              :         );
     205              : 
     206         6300 :         let from_size_bytes = data_size_bytes as u64;
     207         6300 :         let mut buffer_to_read = data.take(from_size_bytes);
     208              : 
     209         6300 :         let bytes_read = io::copy(&mut buffer_to_read, &mut destination)
     210      1300002 :             .await
     211         6299 :             .with_context(|| {
     212            0 :                 format!(
     213            0 :                     "Failed to upload file (write temp) to the local storage at '{}'",
     214            0 :                     temp_file_path.display()
     215            0 :                 )
     216         6299 :             })?;
     217              : 
     218         6299 :         if bytes_read < from_size_bytes {
     219            1 :             bail!("Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes");
     220         6298 :         }
     221         6298 :         // Check if there is any extra data after the given size.
     222         6298 :         let mut from = buffer_to_read.into_inner();
     223         6298 :         let extra_read = from.read(&mut [1]).await?;
     224         6298 :         ensure!(
     225         6298 :             extra_read == 0,
     226            2 :             "Provided stream was larger than expected: expected {from_size_bytes} bytes",
     227              :         );
     228              : 
     229         6296 :         destination.flush().await.with_context(|| {
     230            0 :             format!(
     231            0 :                 "Failed to upload (flush temp) file to the local storage at '{}'",
     232            0 :                 temp_file_path.display()
     233            0 :             )
     234         6296 :         })?;
     235              : 
     236         6296 :         fs::rename(temp_file_path, &target_file_path)
     237         6254 :             .await
     238         6295 :             .with_context(|| {
     239            1 :                 format!(
     240            1 :                     "Failed to upload (rename) file to the local storage at '{}'",
     241            1 :                     target_file_path.display()
     242            1 :                 )
     243         6295 :             })?;
     244              : 
     245         6294 :         if let Some(storage_metadata) = metadata {
     246            1 :             let storage_metadata_path = storage_metadata_path(&target_file_path);
     247            1 :             fs::write(
     248            1 :                 &storage_metadata_path,
     249            1 :                 serde_json::to_string(&storage_metadata.0)
     250            1 :                     .context("Failed to serialize storage metadata as json")?,
     251              :             )
     252            1 :             .await
     253            1 :             .with_context(|| {
     254            0 :                 format!(
     255            0 :                     "Failed to write metadata to the local storage at '{}'",
     256            0 :                     storage_metadata_path.display()
     257            0 :                 )
     258            1 :             })?;
     259         6293 :         }
     260              : 
     261         6294 :         Ok(())
     262        12601 :     }
     263              : 
     264          879 :     async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
     265          879 :         let target_path = from.with_base(&self.storage_root);
     266          879 :         if file_exists(&target_path).map_err(DownloadError::BadInput)? {
     267          734 :             let source = io::BufReader::new(
     268          734 :                 fs::OpenOptions::new()
     269          734 :                     .read(true)
     270          734 :                     .open(&target_path)
     271          704 :                     .await
     272          734 :                     .with_context(|| {
     273            0 :                         format!("Failed to open source file {target_path:?} to use in the download")
     274          734 :                     })
     275          734 :                     .map_err(DownloadError::Other)?,
     276              :             );
     277              : 
     278          734 :             let metadata = self
     279          734 :                 .read_storage_metadata(&target_path)
     280            1 :                 .await
     281          734 :                 .map_err(DownloadError::Other)?;
     282          734 :             Ok(Download {
     283          734 :                 metadata,
     284          734 :                 download_stream: Box::pin(source),
     285          734 :             })
     286              :         } else {
     287          145 :             Err(DownloadError::NotFound)
     288              :         }
     289         1758 :     }
     290              : 
     291            7 :     async fn download_byte_range(
     292            7 :         &self,
     293            7 :         from: &RemotePath,
     294            7 :         start_inclusive: u64,
     295            7 :         end_exclusive: Option<u64>,
     296            7 :     ) -> Result<Download, DownloadError> {
     297            7 :         if let Some(end_exclusive) = end_exclusive {
     298            5 :             if end_exclusive <= start_inclusive {
     299            1 :                 return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
     300            4 :             };
     301            4 :             if start_inclusive == end_exclusive.saturating_sub(1) {
     302            1 :                 return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
     303            3 :             }
     304            2 :         }
     305            5 :         let target_path = from.with_base(&self.storage_root);
     306            5 :         if file_exists(&target_path).map_err(DownloadError::BadInput)? {
     307            5 :             let mut source = io::BufReader::new(
     308            5 :                 fs::OpenOptions::new()
     309            5 :                     .read(true)
     310            5 :                     .open(&target_path)
     311            5 :                     .await
     312            5 :                     .with_context(|| {
     313            0 :                         format!("Failed to open source file {target_path:?} to use in the download")
     314            5 :                     })
     315            5 :                     .map_err(DownloadError::Other)?,
     316              :             );
     317            5 :             source
     318            5 :                 .seek(io::SeekFrom::Start(start_inclusive))
     319            5 :                 .await
     320            5 :                 .context("Failed to seek to the range start in a local storage file")
     321            5 :                 .map_err(DownloadError::Other)?;
     322            5 :             let metadata = self
     323            5 :                 .read_storage_metadata(&target_path)
     324            1 :                 .await
     325            5 :                 .map_err(DownloadError::Other)?;
     326              : 
     327            5 :             Ok(match end_exclusive {
     328            3 :                 Some(end_exclusive) => Download {
     329            3 :                     metadata,
     330            3 :                     download_stream: Box::pin(source.take(end_exclusive - start_inclusive)),
     331            3 :                 },
     332            2 :                 None => Download {
     333            2 :                     metadata,
     334            2 :                     download_stream: Box::pin(source),
     335            2 :                 },
     336              :             })
     337              :         } else {
     338            0 :             Err(DownloadError::NotFound)
     339              :         }
     340           14 :     }
     341              : 
     342         1772 :     async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
     343         1772 :         let file_path = path.with_base(&self.storage_root);
     344         1772 :         match fs::remove_file(&file_path).await {
     345         1739 :             Ok(()) => Ok(()),
     346              :             // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
     347              :             // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
     348              :             // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
     349           33 :             Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
     350            0 :             Err(e) => Err(anyhow::anyhow!(e)),
     351              :         }
     352         3544 :     }
     353              : 
     354            1 :     async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
     355            4 :         for path in paths {
     356            3 :             self.delete(path).await?
     357              :         }
     358            1 :         Ok(())
     359            2 :     }
     360              : }
     361              : 
     362          740 : fn storage_metadata_path(original_path: &Path) -> PathBuf {
     363          740 :     path_with_suffix_extension(original_path, "metadata")
     364          740 : }
     365              : 
     366           28 : fn get_all_files<'a, P>(
     367           28 :     directory_path: P,
     368           28 :     recursive: bool,
     369           28 : ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<PathBuf>>> + Send + Sync + 'a>>
     370           28 : where
     371           28 :     P: AsRef<Path> + Send + Sync + 'a,
     372           28 : {
     373           28 :     Box::pin(async move {
     374           28 :         let directory_path = directory_path.as_ref();
     375           28 :         if directory_path.exists() {
     376           28 :             if directory_path.is_dir() {
     377           28 :                 let mut paths = Vec::new();
     378           28 :                 let mut dir_contents = fs::read_dir(directory_path).await?;
     379           64 :                 while let Some(dir_entry) = dir_contents.next_entry().await? {
     380           36 :                     let file_type = dir_entry.file_type().await?;
     381           36 :                     let entry_path = dir_entry.path();
     382           36 :                     if file_type.is_symlink() {
     383            0 :                         debug!("{entry_path:?} is a symlink, skipping")
     384           36 :                     } else if file_type.is_dir() {
     385           33 :                         if recursive {
     386           18 :                             paths.extend(get_all_files(&entry_path, true).await?.into_iter())
     387              :                         } else {
     388           27 :                             paths.push(entry_path)
     389              :                         }
     390            3 :                     } else {
     391            3 :                         paths.push(entry_path);
     392            3 :                     }
     393              :                 }
     394           28 :                 Ok(paths)
     395              :             } else {
     396            0 :                 bail!("Path {directory_path:?} is not a directory")
     397              :             }
     398              :         } else {
     399            0 :             Ok(Vec::new())
     400              :         }
     401           28 :     })
     402           28 : }
     403              : 
     404         6303 : async fn create_target_directory(target_file_path: &Path) -> anyhow::Result<()> {
     405         6303 :     let target_dir = match target_file_path.parent() {
     406         6303 :         Some(parent_dir) => parent_dir,
     407            0 :         None => bail!(
     408            0 :             "File path '{}' has no parent directory",
     409            0 :             target_file_path.display()
     410            0 :         ),
     411              :     };
     412         6303 :     if !target_dir.exists() {
     413          286 :         fs::create_dir_all(target_dir).await?;
     414         6017 :     }
     415         6303 :     Ok(())
     416         6303 : }
     417              : 
     418          884 : fn file_exists(file_path: &Path) -> anyhow::Result<bool> {
     419          884 :     if file_path.exists() {
     420          739 :         ensure!(
     421          739 :             file_path.is_file(),
     422            0 :             "file path '{}' is not a file",
     423            0 :             file_path.display()
     424              :         );
     425          739 :         Ok(true)
     426              :     } else {
     427          145 :         Ok(false)
     428              :     }
     429          884 : }
     430              : 
     431              : #[cfg(test)]
     432              : mod fs_tests {
     433              :     use super::*;
     434              : 
     435              :     use std::{collections::HashMap, io::Write};
     436              :     use tempfile::tempdir;
     437              : 
     438            3 :     async fn read_and_assert_remote_file_contents(
     439            3 :         storage: &LocalFs,
     440            3 :         #[allow(clippy::ptr_arg)]
     441            3 :         // have to use &PathBuf due to `storage.local_path` parameter requirements
     442            3 :         remote_storage_path: &RemotePath,
     443            3 :         expected_metadata: Option<&StorageMetadata>,
     444            3 :     ) -> anyhow::Result<String> {
     445            3 :         let mut download = storage
     446            3 :             .download(remote_storage_path)
     447            4 :             .await
     448            3 :             .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
     449            3 :         ensure!(
     450            3 :             download.metadata.as_ref() == expected_metadata,
     451            0 :             "Unexpected metadata returned for the downloaded file"
     452              :         );
     453              : 
     454            3 :         let mut contents = String::new();
     455            3 :         download
     456            3 :             .download_stream
     457            3 :             .read_to_string(&mut contents)
     458            6 :             .await
     459            3 :             .context("Failed to read remote file contents into string")?;
     460            3 :         Ok(contents)
     461            3 :     }
     462              : 
     463            1 :     #[tokio::test]
     464            1 :     async fn upload_file() -> anyhow::Result<()> {
     465            1 :         let storage = create_storage()?;
     466              : 
     467            6 :         let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?;
     468            1 :         assert_eq!(
     469            6 :             storage.list().await?,
     470            1 :             vec![target_path_1.clone()],
     471            0 :             "Should list a single file after first upload"
     472              :         );
     473              : 
     474            6 :         let target_path_2 = upload_dummy_file(&storage, "upload_2", None).await?;
     475            1 :         assert_eq!(
     476            6 :             list_files_sorted(&storage).await?,
     477            1 :             vec![target_path_1.clone(), target_path_2.clone()],
     478            0 :             "Should list a two different files after second upload"
     479              :         );
     480              : 
     481            1 :         Ok(())
     482              :     }
     483              : 
     484            1 :     #[tokio::test]
     485            1 :     async fn upload_file_negatives() -> anyhow::Result<()> {
     486            1 :         let storage = create_storage()?;
     487              : 
     488            1 :         let id = RemotePath::new(Path::new("dummy"))?;
     489            1 :         let content = std::io::Cursor::new(b"12345");
     490            1 : 
     491            1 :         // Check that you get an error if the size parameter doesn't match the actual
     492            1 :         // size of the stream.
     493            1 :         storage
     494            1 :             .upload(Box::new(content.clone()), 0, &id, None)
     495            2 :             .await
     496            1 :             .expect_err("upload with zero size succeeded");
     497            1 :         storage
     498            1 :             .upload(Box::new(content.clone()), 4, &id, None)
     499            2 :             .await
     500            1 :             .expect_err("upload with too short size succeeded");
     501            1 :         storage
     502            1 :             .upload(Box::new(content.clone()), 6, &id, None)
     503            2 :             .await
     504            1 :             .expect_err("upload with too large size succeeded");
     505            1 : 
     506            1 :         // Correct size is 5, this should succeed.
     507            3 :         storage.upload(Box::new(content), 5, &id, None).await?;
     508              : 
     509            1 :         Ok(())
     510              :     }
     511              : 
     512            7 :     fn create_storage() -> anyhow::Result<LocalFs> {
     513            7 :         LocalFs::new(tempdir()?.path().to_owned())
     514            7 :     }
     515              : 
     516            1 :     #[tokio::test]
     517            1 :     async fn download_file() -> anyhow::Result<()> {
     518            1 :         let storage = create_storage()?;
     519            1 :         let upload_name = "upload_1";
     520            6 :         let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
     521              : 
     522            3 :         let contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
     523            1 :         assert_eq!(
     524            1 :             dummy_contents(upload_name),
     525              :             contents,
     526            0 :             "We should upload and download the same contents"
     527              :         );
     528              : 
     529            1 :         let non_existing_path = "somewhere/else";
     530            1 :         match storage.download(&RemotePath::new(Path::new(non_existing_path))?).await {
     531            1 :             Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
     532            0 :             other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
     533              :         }
     534            1 :         Ok(())
     535              :     }
     536              : 
     537            1 :     #[tokio::test]
     538            1 :     async fn download_file_range_positive() -> anyhow::Result<()> {
     539            1 :         let storage = create_storage()?;
     540            1 :         let upload_name = "upload_1";
     541            6 :         let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
     542              : 
     543            1 :         let full_range_download_contents =
     544            3 :             read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
     545            1 :         assert_eq!(
     546            1 :             dummy_contents(upload_name),
     547              :             full_range_download_contents,
     548            0 :             "Download full range should return the whole upload"
     549              :         );
     550              : 
     551            1 :         let uploaded_bytes = dummy_contents(upload_name).into_bytes();
     552            1 :         let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
     553              : 
     554            1 :         let mut first_part_download = storage
     555            1 :             .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
     556            2 :             .await?;
     557            1 :         assert!(
     558            1 :             first_part_download.metadata.is_none(),
     559            0 :             "No metadata should be returned for no metadata upload"
     560              :         );
     561              : 
     562            1 :         let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
     563            1 :         io::copy(
     564            1 :             &mut first_part_download.download_stream,
     565            1 :             &mut first_part_remote,
     566            1 :         )
     567            1 :         .await?;
     568            1 :         first_part_remote.flush().await?;
     569            1 :         let first_part_remote = first_part_remote.into_inner().into_inner();
     570            1 :         assert_eq!(
     571            1 :             first_part_local,
     572            1 :             first_part_remote.as_slice(),
     573            0 :             "First part bytes should be returned when requested"
     574              :         );
     575              : 
     576            1 :         let mut second_part_download = storage
     577            1 :             .download_byte_range(
     578            1 :                 &upload_target,
     579            1 :                 first_part_local.len() as u64,
     580            1 :                 Some((first_part_local.len() + second_part_local.len()) as u64),
     581            1 :             )
     582            2 :             .await?;
     583            1 :         assert!(
     584            1 :             second_part_download.metadata.is_none(),
     585            0 :             "No metadata should be returned for no metadata upload"
     586              :         );
     587              : 
     588            1 :         let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
     589            1 :         io::copy(
     590            1 :             &mut second_part_download.download_stream,
     591            1 :             &mut second_part_remote,
     592            1 :         )
     593            1 :         .await?;
     594            1 :         second_part_remote.flush().await?;
     595            1 :         let second_part_remote = second_part_remote.into_inner().into_inner();
     596            1 :         assert_eq!(
     597            1 :             second_part_local,
     598            1 :             second_part_remote.as_slice(),
     599            0 :             "Second part bytes should be returned when requested"
     600              :         );
     601              : 
     602            1 :         Ok(())
     603              :     }
     604              : 
     605            1 :     #[tokio::test]
     606            1 :     async fn download_file_range_negative() -> anyhow::Result<()> {
     607            1 :         let storage = create_storage()?;
     608            1 :         let upload_name = "upload_1";
     609            6 :         let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
     610              : 
     611            1 :         let start = 1_000_000_000;
     612            1 :         let end = start + 1;
     613            1 :         match storage
     614            1 :             .download_byte_range(
     615            1 :                 &upload_target,
     616            1 :                 start,
     617            1 :                 Some(end), // exclusive end
     618            1 :             )
     619            0 :             .await
     620              :         {
     621            0 :             Ok(_) => panic!("Should not allow downloading wrong ranges"),
     622            1 :             Err(e) => {
     623            1 :                 let error_string = e.to_string();
     624            1 :                 assert!(error_string.contains("zero bytes"));
     625            1 :                 assert!(error_string.contains(&start.to_string()));
     626            1 :                 assert!(error_string.contains(&end.to_string()));
     627              :             }
     628              :         }
     629              : 
     630            1 :         let start = 10000;
     631            1 :         let end = 234;
     632            1 :         assert!(start > end, "Should test an incorrect range");
     633            1 :         match storage
     634            1 :             .download_byte_range(&upload_target, start, Some(end))
     635            0 :             .await
     636              :         {
     637            0 :             Ok(_) => panic!("Should not allow downloading wrong ranges"),
     638            1 :             Err(e) => {
     639            1 :                 let error_string = e.to_string();
     640            1 :                 assert!(error_string.contains("Invalid range"));
     641            1 :                 assert!(error_string.contains(&start.to_string()));
     642            1 :                 assert!(error_string.contains(&end.to_string()));
     643              :             }
     644              :         }
     645              : 
     646            1 :         Ok(())
     647              :     }
     648              : 
     649            1 :     #[tokio::test]
     650            1 :     async fn delete_file() -> anyhow::Result<()> {
     651            1 :         let storage = create_storage()?;
     652            1 :         let upload_name = "upload_1";
     653            6 :         let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
     654              : 
     655            1 :         storage.delete(&upload_target).await?;
     656            6 :         assert!(storage.list().await?.is_empty());
     657              : 
     658            1 :         storage
     659            1 :             .delete(&upload_target)
     660            1 :             .await
     661            1 :             .expect("Should allow deleting non-existing storage files");
     662            1 : 
     663            1 :         Ok(())
     664              :     }
     665              : 
     666            1 :     #[tokio::test]
     667            1 :     async fn file_with_metadata() -> anyhow::Result<()> {
     668            1 :         let storage = create_storage()?;
     669            1 :         let upload_name = "upload_1";
     670            1 :         let metadata = StorageMetadata(HashMap::from([
     671            1 :             ("one".to_string(), "1".to_string()),
     672            1 :             ("two".to_string(), "2".to_string()),
     673            1 :         ]));
     674            1 :         let upload_target =
     675            7 :             upload_dummy_file(&storage, upload_name, Some(metadata.clone())).await?;
     676              : 
     677            1 :         let full_range_download_contents =
     678            4 :             read_and_assert_remote_file_contents(&storage, &upload_target, Some(&metadata)).await?;
     679            1 :         assert_eq!(
     680            1 :             dummy_contents(upload_name),
     681              :             full_range_download_contents,
     682            0 :             "We should upload and download the same contents"
     683              :         );
     684              : 
     685            1 :         let uploaded_bytes = dummy_contents(upload_name).into_bytes();
     686            1 :         let (first_part_local, _) = uploaded_bytes.split_at(3);
     687              : 
     688            1 :         let mut partial_download_with_metadata = storage
     689            1 :             .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
     690            3 :             .await?;
     691            1 :         let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
     692            1 :         io::copy(
     693            1 :             &mut partial_download_with_metadata.download_stream,
     694            1 :             &mut first_part_remote,
     695            1 :         )
     696            1 :         .await?;
     697            1 :         first_part_remote.flush().await?;
     698            1 :         let first_part_remote = first_part_remote.into_inner().into_inner();
     699            1 :         assert_eq!(
     700            1 :             first_part_local,
     701            1 :             first_part_remote.as_slice(),
     702            0 :             "First part bytes should be returned when requested"
     703              :         );
     704              : 
     705            1 :         assert_eq!(
     706            1 :             partial_download_with_metadata.metadata,
     707            1 :             Some(metadata),
     708            0 :             "We should get the same metadata back for partial download"
     709              :         );
     710              : 
     711            1 :         Ok(())
     712              :     }
     713              : 
     714            7 :     async fn upload_dummy_file(
     715            7 :         storage: &LocalFs,
     716            7 :         name: &str,
     717            7 :         metadata: Option<StorageMetadata>,
     718            7 :     ) -> anyhow::Result<RemotePath> {
     719            7 :         let from_path = storage
     720            7 :             .storage_root
     721            7 :             .join("timelines")
     722            7 :             .join("some_timeline")
     723            7 :             .join(name);
     724            7 :         let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
     725              : 
     726            7 :         let relative_path = from_path
     727            7 :             .strip_prefix(&storage.storage_root)
     728            7 :             .context("Failed to strip storage root prefix")
     729            7 :             .and_then(RemotePath::new)
     730            7 :             .with_context(|| {
     731            0 :                 format!(
     732            0 :                     "Failed to resolve remote part of path {:?} for base {:?}",
     733            0 :                     from_path, storage.storage_root
     734            0 :                 )
     735            7 :             })?;
     736              : 
     737            7 :         storage
     738            7 :             .upload(Box::new(file), size, &relative_path, metadata)
     739           36 :             .await?;
     740            7 :         Ok(relative_path)
     741            7 :     }
     742              : 
     743            7 :     async fn create_file_for_upload(
     744            7 :         path: &Path,
     745            7 :         contents: &str,
     746            7 :     ) -> anyhow::Result<(io::BufReader<fs::File>, usize)> {
     747            7 :         std::fs::create_dir_all(path.parent().unwrap())?;
     748            7 :         let mut file_for_writing = std::fs::OpenOptions::new()
     749            7 :             .write(true)
     750            7 :             .create_new(true)
     751            7 :             .open(path)?;
     752            7 :         write!(file_for_writing, "{}", contents)?;
     753            7 :         drop(file_for_writing);
     754            7 :         let file_size = path.metadata()?.len() as usize;
     755            7 :         Ok((
     756            7 :             io::BufReader::new(fs::OpenOptions::new().read(true).open(&path).await?),
     757            7 :             file_size,
     758              :         ))
     759            7 :     }
     760              : 
     761           12 :     fn dummy_contents(name: &str) -> String {
     762           12 :         format!("contents for {name}")
     763           12 :     }
     764              : 
     765            1 :     async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
     766            6 :         let mut files = storage.list().await?;
     767            1 :         files.sort_by(|a, b| a.0.cmp(&b.0));
     768            1 :         Ok(files)
     769            1 :     }
     770              : }
        

Generated by: LCOV version 2.1-beta