LCOV - differential code coverage report
Current view: top level - libs/remote_storage/src - local_fs.rs (source / functions) Coverage Total Hit LBC UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 90.0 % 580 522 3 55 522
Current Date: 2023-10-19 02:04:12 Functions: 55.4 % 112 62 1 49 62
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Local filesystem acting as a remote storage.
       2                 : //! Multiple API users can use the same "storage" of this kind by using different storage roots.
       3                 : //!
       4                 : //! This storage used in tests, but can also be used in cases when a certain persistent
       5                 : //! volume is mounted to the local FS.
       6                 : 
       7                 : use std::{borrow::Cow, future::Future, io::ErrorKind, pin::Pin};
       8                 : 
       9                 : use anyhow::{bail, ensure, Context};
      10                 : use camino::{Utf8Path, Utf8PathBuf};
      11                 : use tokio::{
      12                 :     fs,
      13                 :     io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
      14                 : };
      15                 : use tracing::*;
      16                 : use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
      17                 : 
      18                 : use crate::{Download, DownloadError, RemotePath};
      19                 : 
      20                 : use super::{RemoteStorage, StorageMetadata};
      21                 : 
      22                 : const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
      23                 : 
      24 CBC        3221 : #[derive(Debug, Clone)]
      25                 : pub struct LocalFs {
      26                 :     storage_root: Utf8PathBuf,
      27                 : }
      28                 : 
      29                 : impl LocalFs {
      30                 :     /// Attempts to create local FS storage, along with its root directory.
      31                 :     /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
      32             382 :     pub fn new(mut storage_root: Utf8PathBuf) -> anyhow::Result<Self> {
      33             382 :         if !storage_root.exists() {
      34             218 :             std::fs::create_dir_all(&storage_root).with_context(|| {
      35 UBC           0 :                 format!("Failed to create all directories in the given root path {storage_root:?}")
      36 CBC         218 :             })?;
      37             164 :         }
      38             382 :         if !storage_root.is_absolute() {
      39              41 :             storage_root = storage_root.canonicalize_utf8().with_context(|| {
      40 UBC           0 :                 format!("Failed to represent path {storage_root:?} as an absolute path")
      41 CBC          41 :             })?;
      42             341 :         }
      43                 : 
      44             382 :         Ok(Self { storage_root })
      45             382 :     }
      46                 : 
      47                 :     // mirrors S3Bucket::s3_object_to_relative_path
      48              39 :     fn local_file_to_relative_path(&self, key: Utf8PathBuf) -> RemotePath {
      49              39 :         let relative_path = key
      50              39 :             .strip_prefix(&self.storage_root)
      51              39 :             .expect("relative path must contain storage_root as prefix");
      52              39 :         RemotePath(relative_path.into())
      53              39 :     }
      54                 : 
      55             826 :     async fn read_storage_metadata(
      56             826 :         &self,
      57             826 :         file_path: &Utf8Path,
      58             826 :     ) -> anyhow::Result<Option<StorageMetadata>> {
      59             826 :         let metadata_path = storage_metadata_path(file_path);
      60             826 :         if metadata_path.exists() && metadata_path.is_file() {
      61               2 :             let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
      62 UBC           0 :                 format!("Failed to read metadata from the local storage at '{metadata_path}'")
      63 CBC           2 :             })?;
      64                 : 
      65               2 :             serde_json::from_str(&metadata_string)
      66               2 :                 .with_context(|| {
      67 UBC           0 :                     format!(
      68               0 :                         "Failed to deserialize metadata from the local storage at '{metadata_path}'",
      69               0 :                     )
      70 CBC           2 :                 })
      71               2 :                 .map(|metadata| Some(StorageMetadata(metadata)))
      72                 :         } else {
      73             824 :             Ok(None)
      74                 :         }
      75             826 :     }
      76                 : 
      77                 :     #[cfg(test)]
      78               3 :     async fn list(&self) -> anyhow::Result<Vec<RemotePath>> {
      79               3 :         Ok(get_all_files(&self.storage_root, true)
      80              18 :             .await?
      81               3 :             .into_iter()
      82               3 :             .map(|path| {
      83               3 :                 path.strip_prefix(&self.storage_root)
      84               3 :                     .context("Failed to strip storage root prefix")
      85               3 :                     .and_then(RemotePath::new)
      86               3 :                     .expect(
      87               3 :                         "We list files for storage root, hence should be able to remote the prefix",
      88               3 :                     )
      89               3 :             })
      90               3 :             .collect())
      91               3 :     }
      92                 : }
      93                 : 
      94                 : #[async_trait::async_trait]
      95                 : impl RemoteStorage for LocalFs {
      96              20 :     async fn list_prefixes(
      97              20 :         &self,
      98              20 :         prefix: Option<&RemotePath>,
      99              20 :     ) -> Result<Vec<RemotePath>, DownloadError> {
     100              20 :         let path = match prefix {
     101              20 :             Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
     102 UBC           0 :             None => Cow::Borrowed(&self.storage_root),
     103                 :         };
     104                 : 
     105 CBC          20 :         let prefixes_to_filter = get_all_files(path.as_ref(), false)
     106              40 :             .await
     107              20 :             .map_err(DownloadError::Other)?;
     108                 : 
     109              20 :         let mut prefixes = Vec::with_capacity(prefixes_to_filter.len());
     110                 : 
     111                 :         // filter out empty directories to mirror s3 behavior.
     112              48 :         for prefix in prefixes_to_filter {
     113              28 :             if prefix.is_dir()
     114              28 :                 && is_directory_empty(&prefix)
     115              30 :                     .await
     116              28 :                     .map_err(DownloadError::Other)?
     117                 :             {
     118               2 :                 continue;
     119              26 :             }
     120              26 : 
     121              26 :             prefixes.push(
     122              26 :                 prefix
     123              26 :                     .strip_prefix(&self.storage_root)
     124              26 :                     .context("Failed to strip prefix")
     125              26 :                     .and_then(RemotePath::new)
     126              26 :                     .expect(
     127              26 :                         "We list files for storage root, hence should be able to remote the prefix",
     128              26 :                     ),
     129              26 :             )
     130                 :         }
     131                 : 
     132              20 :         Ok(prefixes)
     133              40 :     }
     134                 : 
     135                 :     // recursively lists all files in a directory,
     136                 :     // mirroring the `list_files` for `s3_bucket`
     137              30 :     async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
     138              30 :         let full_path = match folder {
     139              30 :             Some(folder) => folder.with_base(&self.storage_root),
     140 UBC           0 :             None => self.storage_root.clone(),
     141                 :         };
     142                 : 
     143                 :         // If we were given a directory, we may use it as our starting point.
     144                 :         // Otherwise, we must go up to the parent directory.  This is because
     145                 :         // S3 object list prefixes can be arbitrary strings, but when reading
     146                 :         // the local filesystem we need a directory to start calling read_dir on.
     147 CBC          30 :         let mut initial_dir = full_path.clone();
     148              30 :         match fs::metadata(full_path.clone()).await {
     149              29 :             Ok(meta) => {
     150              29 :                 if !meta.is_dir() {
     151               3 :                     // It's not a directory: strip back to the parent
     152               3 :                     initial_dir.pop();
     153              26 :                 }
     154                 :             }
     155               1 :             Err(e) if e.kind() == ErrorKind::NotFound => {
     156               1 :                 // It's not a file that exists: strip the prefix back to the parent directory
     157               1 :                 initial_dir.pop();
     158               1 :             }
     159 UBC           0 :             Err(e) => {
     160               0 :                 // Unexpected I/O error
     161               0 :                 anyhow::bail!(e)
     162                 :             }
     163                 :         }
     164                 : 
     165                 :         // Note that Utf8PathBuf starts_with only considers full path segments, but
     166                 :         // object prefixes are arbitrary strings, so we need the strings for doing
     167                 :         // starts_with later.
     168 CBC          30 :         let prefix = full_path.as_str();
     169              30 : 
     170              30 :         let mut files = vec![];
     171              30 :         let mut directory_queue = vec![initial_dir];
     172              60 :         while let Some(cur_folder) = directory_queue.pop() {
     173              30 :             let mut entries = cur_folder.read_dir_utf8()?;
     174              75 :             while let Some(Ok(entry)) = entries.next() {
     175              45 :                 let file_name = entry.file_name();
     176              45 :                 let full_file_name = cur_folder.join(file_name);
     177              45 :                 if full_file_name.as_str().starts_with(prefix) {
     178              39 :                     let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
     179              39 :                     files.push(file_remote_path);
     180              39 :                     if full_file_name.is_dir() {
     181 UBC           0 :                         directory_queue.push(full_file_name);
     182 CBC          39 :                     }
     183               6 :                 }
     184                 :             }
     185                 :         }
     186                 : 
     187              30 :         Ok(files)
     188              60 :     }
     189                 : 
     190           10270 :     async fn upload(
     191           10270 :         &self,
     192           10270 :         data: impl io::AsyncRead + Unpin + Send + Sync + 'static,
     193           10270 :         data_size_bytes: usize,
     194           10270 :         to: &RemotePath,
     195           10270 :         metadata: Option<StorageMetadata>,
     196           10270 :     ) -> anyhow::Result<()> {
     197           10270 :         let target_file_path = to.with_base(&self.storage_root);
     198           10270 :         create_target_directory(&target_file_path).await?;
     199                 :         // We need this dance with sort of durable rename (without fsyncs)
     200                 :         // to prevent partial uploads. This was really hit when pageserver shutdown
     201                 :         // cancelled the upload and partial file was left on the fs
     202                 :         // NOTE: Because temp file suffix always the same this operation is racy.
     203                 :         // Two concurrent operations can lead to the following sequence:
     204                 :         // T1: write(temp)
     205                 :         // T2: write(temp) -> overwrites the content
     206                 :         // T1: rename(temp, dst) -> succeeds
     207                 :         // T2: rename(temp, dst) -> fails, temp no longet exists
     208                 :         // This can be solved by supplying unique temp suffix every time, but this situation
     209                 :         // is not normal in the first place, the error can help (and helped at least once)
     210                 :         // to discover bugs in upper level synchronization.
     211           10270 :         let temp_file_path =
     212           10270 :             path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
     213           10268 :         let mut destination = io::BufWriter::new(
     214           10270 :             fs::OpenOptions::new()
     215           10270 :                 .write(true)
     216           10270 :                 .create(true)
     217           10270 :                 .open(&temp_file_path)
     218           10183 :                 .await
     219           10268 :                 .with_context(|| {
     220 UBC           0 :                     format!("Failed to open target fs destination at '{target_file_path}'")
     221 CBC       10268 :                 })?,
     222                 :         );
     223                 : 
     224           10268 :         let from_size_bytes = data_size_bytes as u64;
     225           10268 :         let mut buffer_to_read = data.take(from_size_bytes);
     226                 : 
     227           10268 :         let bytes_read = io::copy(&mut buffer_to_read, &mut destination)
     228         3164262 :             .await
     229           10265 :             .with_context(|| {
     230 UBC           0 :                 format!(
     231               0 :                     "Failed to upload file (write temp) to the local storage at '{temp_file_path}'",
     232               0 :                 )
     233 CBC       10265 :             })?;
     234                 : 
     235           10265 :         if bytes_read < from_size_bytes {
     236               1 :             bail!("Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes");
     237           10264 :         }
     238           10264 :         // Check if there is any extra data after the given size.
     239           10264 :         let mut from = buffer_to_read.into_inner();
     240           10264 :         let extra_read = from.read(&mut [1]).await?;
     241           10264 :         ensure!(
     242           10264 :             extra_read == 0,
     243               2 :             "Provided stream was larger than expected: expected {from_size_bytes} bytes",
     244                 :         );
     245                 : 
     246           10262 :         destination.flush().await.with_context(|| {
     247 UBC           0 :             format!(
     248               0 :                 "Failed to upload (flush temp) file to the local storage at '{temp_file_path}'",
     249               0 :             )
     250 CBC       10262 :         })?;
     251                 : 
     252           10262 :         fs::rename(temp_file_path, &target_file_path)
     253           10224 :             .await
     254           10262 :             .with_context(|| {
     255 LBC         (1) :                 format!(
     256             (1) :                     "Failed to upload (rename) file to the local storage at '{target_file_path}'",
     257             (1) :                 )
     258 CBC       10262 :             })?;
     259                 : 
     260           10262 :         if let Some(storage_metadata) = metadata {
     261               1 :             let storage_metadata_path = storage_metadata_path(&target_file_path);
     262               1 :             fs::write(
     263               1 :                 &storage_metadata_path,
     264               1 :                 serde_json::to_string(&storage_metadata.0)
     265               1 :                     .context("Failed to serialize storage metadata as json")?,
     266                 :             )
     267               1 :             .await
     268               1 :             .with_context(|| {
     269 UBC           0 :                 format!(
     270               0 :                     "Failed to write metadata to the local storage at '{storage_metadata_path}'",
     271               0 :                 )
     272 CBC           1 :             })?;
     273           10261 :         }
     274                 : 
     275           10262 :         Ok(())
     276           20535 :     }
     277                 : 
     278            1255 :     async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
     279            1255 :         let target_path = from.with_base(&self.storage_root);
     280            1255 :         if file_exists(&target_path).map_err(DownloadError::BadInput)? {
     281             821 :             let source = io::BufReader::new(
     282             821 :                 fs::OpenOptions::new()
     283             821 :                     .read(true)
     284             821 :                     .open(&target_path)
     285             788 :                     .await
     286             821 :                     .with_context(|| {
     287 UBC           0 :                         format!("Failed to open source file {target_path:?} to use in the download")
     288 CBC         821 :                     })
     289             821 :                     .map_err(DownloadError::Other)?,
     290                 :             );
     291                 : 
     292             821 :             let metadata = self
     293             821 :                 .read_storage_metadata(&target_path)
     294               1 :                 .await
     295             821 :                 .map_err(DownloadError::Other)?;
     296             821 :             Ok(Download {
     297             821 :                 metadata,
     298             821 :                 download_stream: Box::pin(source),
     299             821 :             })
     300                 :         } else {
     301             434 :             Err(DownloadError::NotFound)
     302                 :         }
     303            2510 :     }
     304                 : 
     305               7 :     async fn download_byte_range(
     306               7 :         &self,
     307               7 :         from: &RemotePath,
     308               7 :         start_inclusive: u64,
     309               7 :         end_exclusive: Option<u64>,
     310               7 :     ) -> Result<Download, DownloadError> {
     311               7 :         if let Some(end_exclusive) = end_exclusive {
     312               5 :             if end_exclusive <= start_inclusive {
     313               1 :                 return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
     314               4 :             };
     315               4 :             if start_inclusive == end_exclusive.saturating_sub(1) {
     316               1 :                 return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
     317               3 :             }
     318               2 :         }
     319               5 :         let target_path = from.with_base(&self.storage_root);
     320               5 :         if file_exists(&target_path).map_err(DownloadError::BadInput)? {
     321               5 :             let mut source = io::BufReader::new(
     322               5 :                 fs::OpenOptions::new()
     323               5 :                     .read(true)
     324               5 :                     .open(&target_path)
     325               5 :                     .await
     326               5 :                     .with_context(|| {
     327 UBC           0 :                         format!("Failed to open source file {target_path:?} to use in the download")
     328 CBC           5 :                     })
     329               5 :                     .map_err(DownloadError::Other)?,
     330                 :             );
     331               5 :             source
     332               5 :                 .seek(io::SeekFrom::Start(start_inclusive))
     333               5 :                 .await
     334               5 :                 .context("Failed to seek to the range start in a local storage file")
     335               5 :                 .map_err(DownloadError::Other)?;
     336               5 :             let metadata = self
     337               5 :                 .read_storage_metadata(&target_path)
     338               1 :                 .await
     339               5 :                 .map_err(DownloadError::Other)?;
     340                 : 
     341               5 :             Ok(match end_exclusive {
     342               3 :                 Some(end_exclusive) => Download {
     343               3 :                     metadata,
     344               3 :                     download_stream: Box::pin(source.take(end_exclusive - start_inclusive)),
     345               3 :                 },
     346               2 :                 None => Download {
     347               2 :                     metadata,
     348               2 :                     download_stream: Box::pin(source),
     349               2 :                 },
     350                 :             })
     351                 :         } else {
     352 UBC           0 :             Err(DownloadError::NotFound)
     353                 :         }
     354 CBC          14 :     }
     355                 : 
     356            2738 :     async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
     357            2738 :         let file_path = path.with_base(&self.storage_root);
     358            2738 :         match fs::remove_file(&file_path).await {
     359            2706 :             Ok(()) => Ok(()),
     360                 :             // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
     361                 :             // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
     362                 :             // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
     363              32 :             Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
     364 UBC           0 :             Err(e) => Err(anyhow::anyhow!(e)),
     365                 :         }
     366 CBC        5476 :     }
     367                 : 
     368             168 :     async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
     369            2861 :         for path in paths {
     370            2693 :             self.delete(path).await?
     371                 :         }
     372             168 :         Ok(())
     373             336 :     }
     374                 : }
     375                 : 
     376             827 : fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
     377             827 :     path_with_suffix_extension(original_path, "metadata")
     378             827 : }
     379                 : 
     380              29 : fn get_all_files<'a, P>(
     381              29 :     directory_path: P,
     382              29 :     recursive: bool,
     383              29 : ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Utf8PathBuf>>> + Send + Sync + 'a>>
     384              29 : where
     385              29 :     P: AsRef<Utf8Path> + Send + Sync + 'a,
     386              29 : {
     387              29 :     Box::pin(async move {
     388              29 :         let directory_path = directory_path.as_ref();
     389              29 :         if directory_path.exists() {
     390              29 :             if directory_path.is_dir() {
     391              29 :                 let mut paths = Vec::new();
     392              29 :                 let mut dir_contents = fs::read_dir(directory_path).await?;
     393              66 :                 while let Some(dir_entry) = dir_contents.next_entry().await? {
     394              37 :                     let file_type = dir_entry.file_type().await?;
     395              37 :                     let entry_path =
     396              37 :                         Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| {
     397 UBC           0 :                             anyhow::Error::msg(format!(
     398               0 :                                 "non-Unicode path: {}",
     399               0 :                                 pb.to_string_lossy()
     400               0 :                             ))
     401 CBC          37 :                         })?;
     402              37 :                     if file_type.is_symlink() {
     403 UBC           0 :                         debug!("{entry_path:?} is a symlink, skipping")
     404 CBC          37 :                     } else if file_type.is_dir() {
     405              34 :                         if recursive {
     406              18 :                             paths.extend(get_all_files(&entry_path, true).await?.into_iter())
     407                 :                         } else {
     408              28 :                             paths.push(entry_path)
     409                 :                         }
     410               3 :                     } else {
     411               3 :                         paths.push(entry_path);
     412               3 :                     }
     413                 :                 }
     414              29 :                 Ok(paths)
     415                 :             } else {
     416 UBC           0 :                 bail!("Path {directory_path:?} is not a directory")
     417                 :             }
     418                 :         } else {
     419               0 :             Ok(Vec::new())
     420                 :         }
     421 CBC          29 :     })
     422              29 : }
     423                 : 
     424           10270 : async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
     425           10270 :     let target_dir = match target_file_path.parent() {
     426           10270 :         Some(parent_dir) => parent_dir,
     427 UBC           0 :         None => bail!("File path '{target_file_path}' has no parent directory"),
     428                 :     };
     429 CBC       10270 :     if !target_dir.exists() {
     430             672 :         fs::create_dir_all(target_dir).await?;
     431            9598 :     }
     432           10270 :     Ok(())
     433           10270 : }
     434                 : 
     435            1260 : fn file_exists(file_path: &Utf8Path) -> anyhow::Result<bool> {
     436            1260 :     if file_path.exists() {
     437             826 :         ensure!(file_path.is_file(), "file path '{file_path}' is not a file");
     438             826 :         Ok(true)
     439                 :     } else {
     440             434 :         Ok(false)
     441                 :     }
     442            1260 : }
     443                 : 
     444                 : #[cfg(test)]
     445                 : mod fs_tests {
     446                 :     use super::*;
     447                 : 
     448                 :     use camino_tempfile::tempdir;
     449                 :     use std::{collections::HashMap, io::Write};
     450                 : 
     451               3 :     async fn read_and_assert_remote_file_contents(
     452               3 :         storage: &LocalFs,
     453               3 :         #[allow(clippy::ptr_arg)]
     454               3 :         // have to use &Utf8PathBuf due to `storage.local_path` parameter requirements
     455               3 :         remote_storage_path: &RemotePath,
     456               3 :         expected_metadata: Option<&StorageMetadata>,
     457               3 :     ) -> anyhow::Result<String> {
     458               3 :         let mut download = storage
     459               3 :             .download(remote_storage_path)
     460               4 :             .await
     461               3 :             .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
     462               3 :         ensure!(
     463               3 :             download.metadata.as_ref() == expected_metadata,
     464 UBC           0 :             "Unexpected metadata returned for the downloaded file"
     465                 :         );
     466                 : 
     467 CBC           3 :         let mut contents = String::new();
     468               3 :         download
     469               3 :             .download_stream
     470               3 :             .read_to_string(&mut contents)
     471               6 :             .await
     472               3 :             .context("Failed to read remote file contents into string")?;
     473               3 :         Ok(contents)
     474               3 :     }
     475                 : 
     476               1 :     #[tokio::test]
     477               1 :     async fn upload_file() -> anyhow::Result<()> {
     478               1 :         let storage = create_storage()?;
     479                 : 
     480               6 :         let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?;
     481               1 :         assert_eq!(
     482               6 :             storage.list().await?,
     483               1 :             vec![target_path_1.clone()],
     484 UBC           0 :             "Should list a single file after first upload"
     485                 :         );
     486                 : 
     487 CBC           6 :         let target_path_2 = upload_dummy_file(&storage, "upload_2", None).await?;
     488               1 :         assert_eq!(
     489               6 :             list_files_sorted(&storage).await?,
     490               1 :             vec![target_path_1.clone(), target_path_2.clone()],
     491 UBC           0 :             "Should list a two different files after second upload"
     492                 :         );
     493                 : 
     494 CBC           1 :         Ok(())
     495                 :     }
     496                 : 
     497               1 :     #[tokio::test]
     498               1 :     async fn upload_file_negatives() -> anyhow::Result<()> {
     499               1 :         let storage = create_storage()?;
     500                 : 
     501               1 :         let id = RemotePath::new(Utf8Path::new("dummy"))?;
     502               1 :         let content = std::io::Cursor::new(b"12345");
     503               1 : 
     504               1 :         // Check that you get an error if the size parameter doesn't match the actual
     505               1 :         // size of the stream.
     506               1 :         storage
     507               1 :             .upload(Box::new(content.clone()), 0, &id, None)
     508               1 :             .await
     509               1 :             .expect_err("upload with zero size succeeded");
     510               1 :         storage
     511               1 :             .upload(Box::new(content.clone()), 4, &id, None)
     512               2 :             .await
     513               1 :             .expect_err("upload with too short size succeeded");
     514               1 :         storage
     515               1 :             .upload(Box::new(content.clone()), 6, &id, None)
     516               2 :             .await
     517               1 :             .expect_err("upload with too large size succeeded");
     518               1 : 
     519               1 :         // Correct size is 5, this should succeed.
     520               3 :         storage.upload(Box::new(content), 5, &id, None).await?;
     521                 : 
     522               1 :         Ok(())
     523                 :     }
     524                 : 
     525               7 :     fn create_storage() -> anyhow::Result<LocalFs> {
     526               7 :         let storage_root = tempdir()?.path().to_path_buf();
     527               7 :         LocalFs::new(storage_root)
     528               7 :     }
     529                 : 
     530               1 :     #[tokio::test]
     531               1 :     async fn download_file() -> anyhow::Result<()> {
     532               1 :         let storage = create_storage()?;
     533               1 :         let upload_name = "upload_1";
     534               6 :         let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
     535                 : 
     536               3 :         let contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
     537               1 :         assert_eq!(
     538               1 :             dummy_contents(upload_name),
     539                 :             contents,
     540 UBC           0 :             "We should upload and download the same contents"
     541                 :         );
     542                 : 
     543 CBC           1 :         let non_existing_path = "somewhere/else";
     544               1 :         match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?).await {
     545               1 :             Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
     546 UBC           0 :             other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
     547                 :         }
     548 CBC           1 :         Ok(())
     549                 :     }
     550                 : 
     551               1 :     #[tokio::test]
     552               1 :     async fn download_file_range_positive() -> anyhow::Result<()> {
     553               1 :         let storage = create_storage()?;
     554               1 :         let upload_name = "upload_1";
     555               6 :         let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
     556                 : 
     557               1 :         let full_range_download_contents =
     558               3 :             read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
     559               1 :         assert_eq!(
     560               1 :             dummy_contents(upload_name),
     561                 :             full_range_download_contents,
     562 UBC           0 :             "Download full range should return the whole upload"
     563                 :         );
     564                 : 
     565 CBC           1 :         let uploaded_bytes = dummy_contents(upload_name).into_bytes();
     566               1 :         let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
     567                 : 
     568               1 :         let mut first_part_download = storage
     569               1 :             .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
     570               2 :             .await?;
     571               1 :         assert!(
     572               1 :             first_part_download.metadata.is_none(),
     573 UBC           0 :             "No metadata should be returned for no metadata upload"
     574                 :         );
     575                 : 
     576 CBC           1 :         let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
     577               1 :         io::copy(
     578               1 :             &mut first_part_download.download_stream,
     579               1 :             &mut first_part_remote,
     580               1 :         )
     581               1 :         .await?;
     582               1 :         first_part_remote.flush().await?;
     583               1 :         let first_part_remote = first_part_remote.into_inner().into_inner();
     584               1 :         assert_eq!(
     585               1 :             first_part_local,
     586               1 :             first_part_remote.as_slice(),
     587 UBC           0 :             "First part bytes should be returned when requested"
     588                 :         );
     589                 : 
     590 CBC           1 :         let mut second_part_download = storage
     591               1 :             .download_byte_range(
     592               1 :                 &upload_target,
     593               1 :                 first_part_local.len() as u64,
     594               1 :                 Some((first_part_local.len() + second_part_local.len()) as u64),
     595               1 :             )
     596               2 :             .await?;
     597               1 :         assert!(
     598               1 :             second_part_download.metadata.is_none(),
     599 UBC           0 :             "No metadata should be returned for no metadata upload"
     600                 :         );
     601                 : 
     602 CBC           1 :         let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
     603               1 :         io::copy(
     604               1 :             &mut second_part_download.download_stream,
     605               1 :             &mut second_part_remote,
     606               1 :         )
     607               1 :         .await?;
     608               1 :         second_part_remote.flush().await?;
     609               1 :         let second_part_remote = second_part_remote.into_inner().into_inner();
     610               1 :         assert_eq!(
     611               1 :             second_part_local,
     612               1 :             second_part_remote.as_slice(),
     613 UBC           0 :             "Second part bytes should be returned when requested"
     614                 :         );
     615                 : 
     616 CBC           1 :         Ok(())
     617                 :     }
     618                 : 
     619               1 :     #[tokio::test]
     620               1 :     async fn download_file_range_negative() -> anyhow::Result<()> {
     621               1 :         let storage = create_storage()?;
     622               1 :         let upload_name = "upload_1";
     623               6 :         let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
     624                 : 
     625               1 :         let start = 1_000_000_000;
     626               1 :         let end = start + 1;
     627               1 :         match storage
     628               1 :             .download_byte_range(
     629               1 :                 &upload_target,
     630               1 :                 start,
     631               1 :                 Some(end), // exclusive end
     632               1 :             )
     633 UBC           0 :             .await
     634                 :         {
     635               0 :             Ok(_) => panic!("Should not allow downloading wrong ranges"),
     636 CBC           1 :             Err(e) => {
     637               1 :                 let error_string = e.to_string();
     638               1 :                 assert!(error_string.contains("zero bytes"));
     639               1 :                 assert!(error_string.contains(&start.to_string()));
     640               1 :                 assert!(error_string.contains(&end.to_string()));
     641                 :             }
     642                 :         }
     643                 : 
     644               1 :         let start = 10000;
     645               1 :         let end = 234;
     646               1 :         assert!(start > end, "Should test an incorrect range");
     647               1 :         match storage
     648               1 :             .download_byte_range(&upload_target, start, Some(end))
     649 UBC           0 :             .await
     650                 :         {
     651               0 :             Ok(_) => panic!("Should not allow downloading wrong ranges"),
     652 CBC           1 :             Err(e) => {
     653               1 :                 let error_string = e.to_string();
     654               1 :                 assert!(error_string.contains("Invalid range"));
     655               1 :                 assert!(error_string.contains(&start.to_string()));
     656               1 :                 assert!(error_string.contains(&end.to_string()));
     657                 :             }
     658                 :         }
     659                 : 
     660               1 :         Ok(())
     661                 :     }
     662                 : 
     663               1 :     #[tokio::test]
     664               1 :     async fn delete_file() -> anyhow::Result<()> {
     665               1 :         let storage = create_storage()?;
     666               1 :         let upload_name = "upload_1";
     667               6 :         let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
     668                 : 
     669               1 :         storage.delete(&upload_target).await?;
     670               6 :         assert!(storage.list().await?.is_empty());
     671                 : 
     672               1 :         storage
     673               1 :             .delete(&upload_target)
     674               1 :             .await
     675               1 :             .expect("Should allow deleting non-existing storage files");
     676               1 : 
     677               1 :         Ok(())
     678                 :     }
     679                 : 
     680               1 :     #[tokio::test]
     681               1 :     async fn file_with_metadata() -> anyhow::Result<()> {
     682               1 :         let storage = create_storage()?;
     683               1 :         let upload_name = "upload_1";
     684               1 :         let metadata = StorageMetadata(HashMap::from([
     685               1 :             ("one".to_string(), "1".to_string()),
     686               1 :             ("two".to_string(), "2".to_string()),
     687               1 :         ]));
     688               1 :         let upload_target =
     689               7 :             upload_dummy_file(&storage, upload_name, Some(metadata.clone())).await?;
     690                 : 
     691               1 :         let full_range_download_contents =
     692               4 :             read_and_assert_remote_file_contents(&storage, &upload_target, Some(&metadata)).await?;
     693               1 :         assert_eq!(
     694               1 :             dummy_contents(upload_name),
     695                 :             full_range_download_contents,
     696 UBC           0 :             "We should upload and download the same contents"
     697                 :         );
     698                 : 
     699 CBC           1 :         let uploaded_bytes = dummy_contents(upload_name).into_bytes();
     700               1 :         let (first_part_local, _) = uploaded_bytes.split_at(3);
     701                 : 
     702               1 :         let mut partial_download_with_metadata = storage
     703               1 :             .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
     704               3 :             .await?;
     705               1 :         let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
     706               1 :         io::copy(
     707               1 :             &mut partial_download_with_metadata.download_stream,
     708               1 :             &mut first_part_remote,
     709               1 :         )
     710               1 :         .await?;
     711               1 :         first_part_remote.flush().await?;
     712               1 :         let first_part_remote = first_part_remote.into_inner().into_inner();
     713               1 :         assert_eq!(
     714               1 :             first_part_local,
     715               1 :             first_part_remote.as_slice(),
     716 UBC           0 :             "First part bytes should be returned when requested"
     717                 :         );
     718                 : 
     719 CBC           1 :         assert_eq!(
     720               1 :             partial_download_with_metadata.metadata,
     721               1 :             Some(metadata),
     722 UBC           0 :             "We should get the same metadata back for partial download"
     723                 :         );
     724                 : 
     725 CBC           1 :         Ok(())
     726                 :     }
     727                 : 
     728               7 :     async fn upload_dummy_file(
     729               7 :         storage: &LocalFs,
     730               7 :         name: &str,
     731               7 :         metadata: Option<StorageMetadata>,
     732               7 :     ) -> anyhow::Result<RemotePath> {
     733               7 :         let from_path = storage
     734               7 :             .storage_root
     735               7 :             .join("timelines")
     736               7 :             .join("some_timeline")
     737               7 :             .join(name);
     738               7 :         let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
     739                 : 
     740               7 :         let relative_path = from_path
     741               7 :             .strip_prefix(&storage.storage_root)
     742               7 :             .context("Failed to strip storage root prefix")
     743               7 :             .and_then(RemotePath::new)
     744               7 :             .with_context(|| {
     745 UBC           0 :                 format!(
     746               0 :                     "Failed to resolve remote part of path {:?} for base {:?}",
     747               0 :                     from_path, storage.storage_root
     748               0 :                 )
     749 CBC           7 :             })?;
     750                 : 
     751               7 :         storage
     752               7 :             .upload(Box::new(file), size, &relative_path, metadata)
     753              36 :             .await?;
     754               7 :         Ok(relative_path)
     755               7 :     }
     756                 : 
     757               7 :     async fn create_file_for_upload(
     758               7 :         path: &Utf8Path,
     759               7 :         contents: &str,
     760               7 :     ) -> anyhow::Result<(io::BufReader<fs::File>, usize)> {
     761               7 :         std::fs::create_dir_all(path.parent().unwrap())?;
     762               7 :         let mut file_for_writing = std::fs::OpenOptions::new()
     763               7 :             .write(true)
     764               7 :             .create_new(true)
     765               7 :             .open(path)?;
     766               7 :         write!(file_for_writing, "{}", contents)?;
     767               7 :         drop(file_for_writing);
     768               7 :         let file_size = path.metadata()?.len() as usize;
     769               7 :         Ok((
     770               7 :             io::BufReader::new(fs::OpenOptions::new().read(true).open(&path).await?),
     771               7 :             file_size,
     772                 :         ))
     773               7 :     }
     774                 : 
     775              12 :     fn dummy_contents(name: &str) -> String {
     776              12 :         format!("contents for {name}")
     777              12 :     }
     778                 : 
     779               1 :     async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
     780               6 :         let mut files = storage.list().await?;
     781               1 :         files.sort_by(|a, b| a.0.cmp(&b.0));
     782               1 :         Ok(files)
     783               1 :     }
     784                 : }
        

Generated by: LCOV version 2.1-beta