LCOV - differential code coverage report
Current view: top level - libs/remote_storage/src - local_fs.rs (source / functions) Coverage Total Hit LBC UBC GBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 89.0 % 630 561 3 66 1 560
Current Date: 2024-01-09 02:06:09 Functions: 66.7 % 93 62 1 30 62
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! Local filesystem acting as a remote storage.
       2                 : //! Multiple API users can use the same "storage" of this kind by using different storage roots.
       3                 : //!
       4                 : //! This storage used in tests, but can also be used in cases when a certain persistent
       5                 : //! volume is mounted to the local FS.
       6                 : 
       7                 : use std::{borrow::Cow, future::Future, io::ErrorKind, pin::Pin};
       8                 : 
       9                 : use anyhow::{bail, ensure, Context};
      10                 : use bytes::Bytes;
      11                 : use camino::{Utf8Path, Utf8PathBuf};
      12                 : use futures::stream::Stream;
      13                 : use tokio::{
      14                 :     fs,
      15                 :     io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
      16                 : };
      17                 : use tokio_util::io::ReaderStream;
      18                 : use tracing::*;
      19                 : use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
      20                 : 
      21                 : use crate::{Download, DownloadError, DownloadStream, Listing, ListingMode, RemotePath};
      22                 : 
      23                 : use super::{RemoteStorage, StorageMetadata};
      24                 : 
      25                 : const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
      26                 : 
      27 CBC        4034 : #[derive(Debug, Clone)]
      28                 : pub struct LocalFs {
      29                 :     storage_root: Utf8PathBuf,
      30                 : }
      31                 : 
      32                 : impl LocalFs {
      33                 :     /// Attempts to create local FS storage, along with its root directory.
      34                 :     /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
      35             414 :     pub fn new(mut storage_root: Utf8PathBuf) -> anyhow::Result<Self> {
      36             414 :         if !storage_root.exists() {
      37              10 :             std::fs::create_dir_all(&storage_root).with_context(|| {
      38 UBC           0 :                 format!("Failed to create all directories in the given root path {storage_root:?}")
      39 CBC          10 :             })?;
      40             404 :         }
      41             414 :         if !storage_root.is_absolute() {
      42              45 :             storage_root = storage_root.canonicalize_utf8().with_context(|| {
      43 UBC           0 :                 format!("Failed to represent path {storage_root:?} as an absolute path")
      44 CBC          45 :             })?;
      45             369 :         }
      46                 : 
      47             414 :         Ok(Self { storage_root })
      48             414 :     }
      49                 : 
      50                 :     // mirrors S3Bucket::s3_object_to_relative_path
      51             170 :     fn local_file_to_relative_path(&self, key: Utf8PathBuf) -> RemotePath {
      52             170 :         let relative_path = key
      53             170 :             .strip_prefix(&self.storage_root)
      54             170 :             .expect("relative path must contain storage_root as prefix");
      55             170 :         RemotePath(relative_path.into())
      56             170 :     }
      57                 : 
      58             874 :     async fn read_storage_metadata(
      59             874 :         &self,
      60             874 :         file_path: &Utf8Path,
      61             874 :     ) -> anyhow::Result<Option<StorageMetadata>> {
      62             874 :         let metadata_path = storage_metadata_path(file_path);
      63             874 :         if metadata_path.exists() && metadata_path.is_file() {
      64               2 :             let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
      65 UBC           0 :                 format!("Failed to read metadata from the local storage at '{metadata_path}'")
      66 CBC           2 :             })?;
      67                 : 
      68               2 :             serde_json::from_str(&metadata_string)
      69               2 :                 .with_context(|| {
      70 UBC           0 :                     format!(
      71               0 :                         "Failed to deserialize metadata from the local storage at '{metadata_path}'",
      72               0 :                     )
      73 CBC           2 :                 })
      74               2 :                 .map(|metadata| Some(StorageMetadata(metadata)))
      75                 :         } else {
      76             872 :             Ok(None)
      77                 :         }
      78             874 :     }
      79                 : 
      80                 :     #[cfg(test)]
      81               3 :     async fn list_all(&self) -> anyhow::Result<Vec<RemotePath>> {
      82               3 :         Ok(get_all_files(&self.storage_root, true)
      83               9 :             .await?
      84               3 :             .into_iter()
      85               3 :             .map(|path| {
      86               3 :                 path.strip_prefix(&self.storage_root)
      87               3 :                     .context("Failed to strip storage root prefix")
      88               3 :                     .and_then(RemotePath::new)
      89               3 :                     .expect(
      90               3 :                         "We list files for storage root, hence should be able to remote the prefix",
      91               3 :                     )
      92               3 :             })
      93               3 :             .collect())
      94               3 :     }
      95                 : 
      96                 :     // recursively lists all files in a directory,
      97                 :     // mirroring the `list_files` for `s3_bucket`
      98             122 :     async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
      99             122 :         let full_path = match folder {
     100             121 :             Some(folder) => folder.with_base(&self.storage_root),
     101               1 :             None => self.storage_root.clone(),
     102                 :         };
     103                 : 
     104                 :         // If we were given a directory, we may use it as our starting point.
     105                 :         // Otherwise, we must go up to the first ancestor dir that exists.  This is because
     106                 :         // S3 object list prefixes can be arbitrary strings, but when reading
     107                 :         // the local filesystem we need a directory to start calling read_dir on.
     108             122 :         let mut initial_dir = full_path.clone();
     109             224 :         loop {
     110             224 :             // Did we make it to the root?
     111             224 :             if initial_dir.parent().is_none() {
     112 UBC           0 :                 anyhow::bail!("list_files: failed to find valid ancestor dir for {full_path}");
     113 CBC         224 :             }
     114             224 : 
     115             224 :             match fs::metadata(initial_dir.clone()).await {
     116             125 :                 Ok(meta) if meta.is_dir() => {
     117             122 :                     // We found a directory, break
     118             122 :                     break;
     119                 :                 }
     120               3 :                 Ok(_meta) => {
     121               3 :                     // It's not a directory: strip back to the parent
     122               3 :                     initial_dir.pop();
     123               3 :                 }
     124              99 :                 Err(e) if e.kind() == ErrorKind::NotFound => {
     125              99 :                     // It's not a file that exists: strip the prefix back to the parent directory
     126              99 :                     initial_dir.pop();
     127              99 :                 }
     128 UBC           0 :                 Err(e) => {
     129               0 :                     // Unexpected I/O error
     130               0 :                     anyhow::bail!(e)
     131                 :                 }
     132                 :             }
     133                 :         }
     134                 :         // Note that Utf8PathBuf starts_with only considers full path segments, but
     135                 :         // object prefixes are arbitrary strings, so we need the strings for doing
     136                 :         // starts_with later.
     137 CBC         122 :         let prefix = full_path.as_str();
     138             122 : 
     139             122 :         let mut files = vec![];
     140             122 :         let mut directory_queue = vec![initial_dir];
     141             248 :         while let Some(cur_folder) = directory_queue.pop() {
     142             126 :             let mut entries = cur_folder.read_dir_utf8()?;
     143            2808 :             while let Some(Ok(entry)) = entries.next() {
     144            2682 :                 let file_name = entry.file_name();
     145            2682 :                 let full_file_name = cur_folder.join(file_name);
     146            2682 :                 if full_file_name.as_str().starts_with(prefix) {
     147             170 :                     let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
     148             170 :                     files.push(file_remote_path);
     149             170 :                     if full_file_name.is_dir() {
     150               4 :                         directory_queue.push(full_file_name);
     151             166 :                     }
     152            2512 :                 }
     153                 :             }
     154                 :         }
     155                 : 
     156             122 :         Ok(files)
     157             122 :     }
     158                 : }
     159                 : 
     160                 : #[async_trait::async_trait]
     161                 : impl RemoteStorage for LocalFs {
     162             287 :     async fn list(
     163             287 :         &self,
     164             287 :         prefix: Option<&RemotePath>,
     165             287 :         mode: ListingMode,
     166             287 :     ) -> Result<Listing, DownloadError> {
     167             287 :         let mut result = Listing::default();
     168             287 : 
     169             287 :         if let ListingMode::NoDelimiter = mode {
     170             122 :             let keys = self
     171             122 :                 .list_recursive(prefix)
     172             219 :                 .await
     173             122 :                 .map_err(DownloadError::Other)?;
     174                 : 
     175             122 :             result.keys = keys
     176             122 :                 .into_iter()
     177             170 :                 .filter(|k| {
     178             170 :                     let path = k.with_base(&self.storage_root);
     179             170 :                     !path.is_dir()
     180             170 :                 })
     181             122 :                 .collect();
     182             122 : 
     183             122 :             return Ok(result);
     184             165 :         }
     185                 : 
     186             165 :         let path = match prefix {
     187             164 :             Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
     188               1 :             None => Cow::Borrowed(&self.storage_root),
     189                 :         };
     190                 : 
     191             165 :         let prefixes_to_filter = get_all_files(path.as_ref(), false)
     192             164 :             .await
     193             165 :             .map_err(DownloadError::Other)?;
     194                 : 
     195                 :         // filter out empty directories to mirror s3 behavior.
     196             386 :         for prefix in prefixes_to_filter {
     197             221 :             if prefix.is_dir()
     198             220 :                 && is_directory_empty(&prefix)
     199             214 :                     .await
     200             220 :                     .map_err(DownloadError::Other)?
     201                 :             {
     202               4 :                 continue;
     203             217 :             }
     204             217 : 
     205             217 :             let stripped = prefix
     206             217 :                 .strip_prefix(&self.storage_root)
     207             217 :                 .context("Failed to strip prefix")
     208             217 :                 .and_then(RemotePath::new)
     209             217 :                 .expect(
     210             217 :                     "We list files for storage root, hence should be able to remote the prefix",
     211             217 :                 );
     212             217 : 
     213             217 :             if prefix.is_dir() {
     214             216 :                 result.prefixes.push(stripped);
     215             216 :             } else {
     216               1 :                 result.keys.push(stripped);
     217               1 :             }
     218                 :         }
     219                 : 
     220             165 :         Ok(result)
     221             574 :     }
     222                 : 
     223           10273 :     async fn upload(
     224           10273 :         &self,
     225           10273 :         data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
     226           10273 :         data_size_bytes: usize,
     227           10273 :         to: &RemotePath,
     228           10273 :         metadata: Option<StorageMetadata>,
     229           10273 :     ) -> anyhow::Result<()> {
     230           10273 :         let target_file_path = to.with_base(&self.storage_root);
     231           10273 :         create_target_directory(&target_file_path).await?;
     232                 :         // We need this dance with sort of durable rename (without fsyncs)
     233                 :         // to prevent partial uploads. This was really hit when pageserver shutdown
     234                 :         // cancelled the upload and partial file was left on the fs
     235                 :         // NOTE: Because temp file suffix always the same this operation is racy.
     236                 :         // Two concurrent operations can lead to the following sequence:
     237                 :         // T1: write(temp)
     238                 :         // T2: write(temp) -> overwrites the content
     239                 :         // T1: rename(temp, dst) -> succeeds
     240                 :         // T2: rename(temp, dst) -> fails, temp no longet exists
     241                 :         // This can be solved by supplying unique temp suffix every time, but this situation
     242                 :         // is not normal in the first place, the error can help (and helped at least once)
     243                 :         // to discover bugs in upper level synchronization.
     244           10273 :         let temp_file_path =
     245           10273 :             path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
     246           10270 :         let mut destination = io::BufWriter::new(
     247           10273 :             fs::OpenOptions::new()
     248           10273 :                 .write(true)
     249           10273 :                 .create(true)
     250           10273 :                 .open(&temp_file_path)
     251           10105 :                 .await
     252           10270 :                 .with_context(|| {
     253 UBC           0 :                     format!("Failed to open target fs destination at '{target_file_path}'")
     254 CBC       10270 :                 })?,
     255                 :         );
     256                 : 
     257           10270 :         let from_size_bytes = data_size_bytes as u64;
     258           10270 :         let data = tokio_util::io::StreamReader::new(data);
     259           10270 :         let data = std::pin::pin!(data);
     260           10270 :         let mut buffer_to_read = data.take(from_size_bytes);
     261                 : 
     262                 :         // alternatively we could just write the bytes to a file, but local_fs is a testing utility
     263           10270 :         let bytes_read = io::copy_buf(&mut buffer_to_read, &mut destination)
     264          744589 :             .await
     265           10270 :             .with_context(|| {
     266 UBC           0 :                 format!(
     267               0 :                     "Failed to upload file (write temp) to the local storage at '{temp_file_path}'",
     268               0 :                 )
     269 CBC       10270 :             })?;
     270                 : 
     271           10270 :         if bytes_read < from_size_bytes {
     272               1 :             bail!("Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes");
     273           10269 :         }
     274           10269 :         // Check if there is any extra data after the given size.
     275           10269 :         let mut from = buffer_to_read.into_inner();
     276           10269 :         let extra_read = from.read(&mut [1]).await?;
     277                 :         ensure!(
     278           10269 :             extra_read == 0,
     279               2 :             "Provided stream was larger than expected: expected {from_size_bytes} bytes",
     280                 :         );
     281                 : 
     282           10267 :         destination.flush().await.with_context(|| {
     283 UBC           0 :             format!(
     284               0 :                 "Failed to upload (flush temp) file to the local storage at '{temp_file_path}'",
     285               0 :             )
     286 CBC       10267 :         })?;
     287                 : 
     288           10267 :         fs::rename(temp_file_path, &target_file_path)
     289            9994 :             .await
     290           10267 :             .with_context(|| {
     291 LBC         (1) :                 format!(
     292             (1) :                     "Failed to upload (rename) file to the local storage at '{target_file_path}'",
     293             (1) :                 )
     294 CBC       10267 :             })?;
     295                 : 
     296           10267 :         if let Some(storage_metadata) = metadata {
     297               1 :             let storage_metadata_path = storage_metadata_path(&target_file_path);
     298               1 :             fs::write(
     299               1 :                 &storage_metadata_path,
     300               1 :                 serde_json::to_string(&storage_metadata.0)
     301               1 :                     .context("Failed to serialize storage metadata as json")?,
     302                 :             )
     303               1 :             .await
     304               1 :             .with_context(|| {
     305 UBC           0 :                 format!(
     306               0 :                     "Failed to write metadata to the local storage at '{storage_metadata_path}'",
     307               0 :                 )
     308 CBC           1 :             })?;
     309           10266 :         }
     310                 : 
     311           10267 :         Ok(())
     312           20543 :     }
     313                 : 
     314            1189 :     async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
     315            1189 :         let target_path = from.with_base(&self.storage_root);
     316            1189 :         if file_exists(&target_path).map_err(DownloadError::BadInput)? {
     317             869 :             let source = ReaderStream::new(
     318             869 :                 fs::OpenOptions::new()
     319             869 :                     .read(true)
     320             869 :                     .open(&target_path)
     321             853 :                     .await
     322             869 :                     .with_context(|| {
     323 UBC           0 :                         format!("Failed to open source file {target_path:?} to use in the download")
     324 CBC         869 :                     })
     325             869 :                     .map_err(DownloadError::Other)?,
     326                 :             );
     327                 : 
     328             869 :             let metadata = self
     329             869 :                 .read_storage_metadata(&target_path)
     330               1 :                 .await
     331             869 :                 .map_err(DownloadError::Other)?;
     332             869 :             Ok(Download {
     333             869 :                 metadata,
     334             869 :                 last_modified: None,
     335             869 :                 etag: None,
     336             869 :                 download_stream: Box::pin(source),
     337             869 :             })
     338                 :         } else {
     339             320 :             Err(DownloadError::NotFound)
     340                 :         }
     341            2378 :     }
     342                 : 
     343               7 :     async fn download_byte_range(
     344               7 :         &self,
     345               7 :         from: &RemotePath,
     346               7 :         start_inclusive: u64,
     347               7 :         end_exclusive: Option<u64>,
     348               7 :     ) -> Result<Download, DownloadError> {
     349               7 :         if let Some(end_exclusive) = end_exclusive {
     350               5 :             if end_exclusive <= start_inclusive {
     351               1 :                 return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
     352               4 :             };
     353               4 :             if start_inclusive == end_exclusive.saturating_sub(1) {
     354               1 :                 return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
     355               3 :             }
     356               2 :         }
     357               5 :         let target_path = from.with_base(&self.storage_root);
     358               5 :         if file_exists(&target_path).map_err(DownloadError::BadInput)? {
     359               5 :             let mut source = tokio::fs::OpenOptions::new()
     360               5 :                 .read(true)
     361               5 :                 .open(&target_path)
     362               5 :                 .await
     363               5 :                 .with_context(|| {
     364 UBC           0 :                     format!("Failed to open source file {target_path:?} to use in the download")
     365 CBC           5 :                 })
     366               5 :                 .map_err(DownloadError::Other)?;
     367               5 :             source
     368               5 :                 .seek(io::SeekFrom::Start(start_inclusive))
     369               5 :                 .await
     370               5 :                 .context("Failed to seek to the range start in a local storage file")
     371               5 :                 .map_err(DownloadError::Other)?;
     372               5 :             let metadata = self
     373               5 :                 .read_storage_metadata(&target_path)
     374 GBC           1 :                 .await
     375 CBC           5 :                 .map_err(DownloadError::Other)?;
     376                 : 
     377               5 :             let download_stream: DownloadStream = match end_exclusive {
     378               3 :                 Some(end_exclusive) => Box::pin(ReaderStream::new(
     379               3 :                     source.take(end_exclusive - start_inclusive),
     380               3 :                 )),
     381               2 :                 None => Box::pin(ReaderStream::new(source)),
     382                 :             };
     383               5 :             Ok(Download {
     384               5 :                 metadata,
     385               5 :                 last_modified: None,
     386               5 :                 etag: None,
     387               5 :                 download_stream,
     388               5 :             })
     389                 :         } else {
     390 UBC           0 :             Err(DownloadError::NotFound)
     391                 :         }
     392 CBC          14 :     }
     393                 : 
     394            1208 :     async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
     395            1208 :         let file_path = path.with_base(&self.storage_root);
     396            1208 :         match fs::remove_file(&file_path).await {
     397             648 :             Ok(()) => Ok(()),
     398                 :             // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
     399                 :             // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
     400                 :             // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
     401             560 :             Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
     402 UBC           0 :             Err(e) => Err(anyhow::anyhow!(e)),
     403                 :         }
     404 CBC        2416 :     }
     405                 : 
     406              50 :     async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
     407            1253 :         for path in paths {
     408            1203 :             self.delete(path).await?
     409                 :         }
     410              50 :         Ok(())
     411             100 :     }
     412                 : 
     413 UBC           0 :     async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
     414               0 :         let from_path = from.with_base(&self.storage_root);
     415               0 :         let to_path = to.with_base(&self.storage_root);
     416               0 :         create_target_directory(&to_path).await?;
     417               0 :         fs::copy(&from_path, &to_path).await.with_context(|| {
     418               0 :             format!(
     419               0 :                 "Failed to copy file from '{from_path}' to '{to_path}'",
     420               0 :                 from_path = from_path,
     421               0 :                 to_path = to_path
     422               0 :             )
     423               0 :         })?;
     424               0 :         Ok(())
     425               0 :     }
     426                 : }
     427                 : 
     428 CBC         875 : fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
     429             875 :     path_with_suffix_extension(original_path, "metadata")
     430             875 : }
     431                 : 
     432             174 : fn get_all_files<'a, P>(
     433             174 :     directory_path: P,
     434             174 :     recursive: bool,
     435             174 : ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Utf8PathBuf>>> + Send + Sync + 'a>>
     436             174 : where
     437             174 :     P: AsRef<Utf8Path> + Send + Sync + 'a,
     438             174 : {
     439             174 :     Box::pin(async move {
     440             174 :         let directory_path = directory_path.as_ref();
     441             174 :         if directory_path.exists() {
     442             174 :             if directory_path.is_dir() {
     443             174 :                 let mut paths = Vec::new();
     444             174 :                 let mut dir_contents = fs::read_dir(directory_path).await?;
     445             404 :                 while let Some(dir_entry) = dir_contents.next_entry().await? {
     446             230 :                     let file_type = dir_entry.file_type().await?;
     447             230 :                     let entry_path =
     448             230 :                         Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| {
     449 UBC           0 :                             anyhow::Error::msg(format!(
     450               0 :                                 "non-Unicode path: {}",
     451               0 :                                 pb.to_string_lossy()
     452               0 :                             ))
     453 CBC         230 :                         })?;
     454             230 :                     if file_type.is_symlink() {
     455 UBC           0 :                         debug!("{entry_path:?} is a symlink, skipping")
     456 CBC         230 :                     } else if file_type.is_dir() {
     457             226 :                         if recursive {
     458               9 :                             paths.extend(get_all_files(&entry_path, true).await?.into_iter())
     459                 :                         } else {
     460             220 :                             paths.push(entry_path)
     461                 :                         }
     462               4 :                     } else {
     463               4 :                         paths.push(entry_path);
     464               4 :                     }
     465                 :                 }
     466             174 :                 Ok(paths)
     467                 :             } else {
     468 UBC           0 :                 bail!("Path {directory_path:?} is not a directory")
     469                 :             }
     470                 :         } else {
     471               0 :             Ok(Vec::new())
     472                 :         }
     473 CBC         174 :     })
     474             174 : }
     475                 : 
     476           10273 : async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
     477           10273 :     let target_dir = match target_file_path.parent() {
     478           10273 :         Some(parent_dir) => parent_dir,
     479 UBC           0 :         None => bail!("File path '{target_file_path}' has no parent directory"),
     480                 :     };
     481 CBC       10273 :     if !target_dir.exists() {
     482             712 :         fs::create_dir_all(target_dir).await?;
     483            9561 :     }
     484           10273 :     Ok(())
     485           10273 : }
     486                 : 
     487            1194 : fn file_exists(file_path: &Utf8Path) -> anyhow::Result<bool> {
     488            1194 :     if file_path.exists() {
     489             874 :         ensure!(file_path.is_file(), "file path '{file_path}' is not a file");
     490             874 :         Ok(true)
     491                 :     } else {
     492             320 :         Ok(false)
     493                 :     }
     494            1194 : }
     495                 : 
     496                 : #[cfg(test)]
     497                 : mod fs_tests {
     498                 :     use super::*;
     499                 : 
     500                 :     use bytes::Bytes;
     501                 :     use camino_tempfile::tempdir;
     502                 :     use futures_util::Stream;
     503                 :     use std::{collections::HashMap, io::Write};
     504                 : 
     505               3 :     async fn read_and_assert_remote_file_contents(
     506               3 :         storage: &LocalFs,
     507               3 :         #[allow(clippy::ptr_arg)]
     508               3 :         // have to use &Utf8PathBuf due to `storage.local_path` parameter requirements
     509               3 :         remote_storage_path: &RemotePath,
     510               3 :         expected_metadata: Option<&StorageMetadata>,
     511               3 :     ) -> anyhow::Result<String> {
     512               3 :         let download = storage
     513               3 :             .download(remote_storage_path)
     514               4 :             .await
     515               3 :             .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
     516                 :         ensure!(
     517               3 :             download.metadata.as_ref() == expected_metadata,
     518 UBC           0 :             "Unexpected metadata returned for the downloaded file"
     519                 :         );
     520                 : 
     521 CBC           6 :         let contents = aggregate(download.download_stream).await?;
     522                 : 
     523               3 :         String::from_utf8(contents).map_err(anyhow::Error::new)
     524               3 :     }
     525                 : 
     526               1 :     #[tokio::test]
     527               1 :     async fn upload_file() -> anyhow::Result<()> {
     528               1 :         let storage = create_storage()?;
     529                 : 
     530               6 :         let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?;
     531               1 :         assert_eq!(
     532               3 :             storage.list_all().await?,
     533               1 :             vec![target_path_1.clone()],
     534 UBC           0 :             "Should list a single file after first upload"
     535                 :         );
     536                 : 
     537 CBC           6 :         let target_path_2 = upload_dummy_file(&storage, "upload_2", None).await?;
     538               1 :         assert_eq!(
     539               3 :             list_files_sorted(&storage).await?,
     540               1 :             vec![target_path_1.clone(), target_path_2.clone()],
     541 UBC           0 :             "Should list a two different files after second upload"
     542                 :         );
     543                 : 
     544 CBC           1 :         Ok(())
     545                 :     }
     546                 : 
     547               1 :     #[tokio::test]
     548               1 :     async fn upload_file_negatives() -> anyhow::Result<()> {
     549               1 :         let storage = create_storage()?;
     550                 : 
     551               1 :         let id = RemotePath::new(Utf8Path::new("dummy"))?;
     552               1 :         let content = Bytes::from_static(b"12345");
     553               4 :         let content = move || futures::stream::once(futures::future::ready(Ok(content.clone())));
     554                 : 
     555                 :         // Check that you get an error if the size parameter doesn't match the actual
     556                 :         // size of the stream.
     557               1 :         storage
     558               1 :             .upload(content(), 0, &id, None)
     559               1 :             .await
     560               1 :             .expect_err("upload with zero size succeeded");
     561               1 :         storage
     562               1 :             .upload(content(), 4, &id, None)
     563               2 :             .await
     564               1 :             .expect_err("upload with too short size succeeded");
     565               1 :         storage
     566               1 :             .upload(content(), 6, &id, None)
     567               2 :             .await
     568               1 :             .expect_err("upload with too large size succeeded");
     569               1 : 
     570               1 :         // Correct size is 5, this should succeed.
     571               3 :         storage.upload(content(), 5, &id, None).await?;
     572                 : 
     573               1 :         Ok(())
     574                 :     }
     575                 : 
     576               8 :     fn create_storage() -> anyhow::Result<LocalFs> {
     577               8 :         let storage_root = tempdir()?.path().to_path_buf();
     578               8 :         LocalFs::new(storage_root)
     579               8 :     }
     580                 : 
     581               1 :     #[tokio::test]
     582               1 :     async fn download_file() -> anyhow::Result<()> {
     583               1 :         let storage = create_storage()?;
     584               1 :         let upload_name = "upload_1";
     585               5 :         let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
     586                 : 
     587               3 :         let contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
     588               1 :         assert_eq!(
     589               1 :             dummy_contents(upload_name),
     590                 :             contents,
     591 UBC           0 :             "We should upload and download the same contents"
     592                 :         );
     593                 : 
     594 CBC           1 :         let non_existing_path = "somewhere/else";
     595               1 :         match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?).await {
     596               1 :             Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
     597 UBC           0 :             other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
     598                 :         }
     599 CBC           1 :         Ok(())
     600                 :     }
     601                 : 
     602               1 :     #[tokio::test]
     603               1 :     async fn download_file_range_positive() -> anyhow::Result<()> {
     604               1 :         let storage = create_storage()?;
     605               1 :         let upload_name = "upload_1";
     606               6 :         let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
     607                 : 
     608               1 :         let full_range_download_contents =
     609               3 :             read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
     610               1 :         assert_eq!(
     611               1 :             dummy_contents(upload_name),
     612                 :             full_range_download_contents,
     613 UBC           0 :             "Download full range should return the whole upload"
     614                 :         );
     615                 : 
     616 CBC           1 :         let uploaded_bytes = dummy_contents(upload_name).into_bytes();
     617               1 :         let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
     618                 : 
     619               1 :         let first_part_download = storage
     620               1 :             .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
     621               2 :             .await?;
     622               1 :         assert!(
     623               1 :             first_part_download.metadata.is_none(),
     624 UBC           0 :             "No metadata should be returned for no metadata upload"
     625                 :         );
     626                 : 
     627 CBC           1 :         let first_part_remote = aggregate(first_part_download.download_stream).await?;
     628               1 :         assert_eq!(
     629                 :             first_part_local, first_part_remote,
     630 UBC           0 :             "First part bytes should be returned when requested"
     631                 :         );
     632                 : 
     633 CBC           1 :         let second_part_download = storage
     634               1 :             .download_byte_range(
     635               1 :                 &upload_target,
     636               1 :                 first_part_local.len() as u64,
     637               1 :                 Some((first_part_local.len() + second_part_local.len()) as u64),
     638               1 :             )
     639               2 :             .await?;
     640               1 :         assert!(
     641               1 :             second_part_download.metadata.is_none(),
     642 UBC           0 :             "No metadata should be returned for no metadata upload"
     643                 :         );
     644                 : 
     645 CBC           1 :         let second_part_remote = aggregate(second_part_download.download_stream).await?;
     646               1 :         assert_eq!(
     647                 :             second_part_local, second_part_remote,
     648 UBC           0 :             "Second part bytes should be returned when requested"
     649                 :         );
     650                 : 
     651 CBC           1 :         Ok(())
     652                 :     }
     653                 : 
     654               1 :     #[tokio::test]
     655               1 :     async fn download_file_range_negative() -> anyhow::Result<()> {
     656               1 :         let storage = create_storage()?;
     657               1 :         let upload_name = "upload_1";
     658               6 :         let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
     659                 : 
     660               1 :         let start = 1_000_000_000;
     661               1 :         let end = start + 1;
     662               1 :         match storage
     663               1 :             .download_byte_range(
     664               1 :                 &upload_target,
     665               1 :                 start,
     666               1 :                 Some(end), // exclusive end
     667               1 :             )
     668 UBC           0 :             .await
     669                 :         {
     670               0 :             Ok(_) => panic!("Should not allow downloading wrong ranges"),
     671 CBC           1 :             Err(e) => {
     672               1 :                 let error_string = e.to_string();
     673               1 :                 assert!(error_string.contains("zero bytes"));
     674               1 :                 assert!(error_string.contains(&start.to_string()));
     675               1 :                 assert!(error_string.contains(&end.to_string()));
     676                 :             }
     677                 :         }
     678                 : 
     679               1 :         let start = 10000;
     680               1 :         let end = 234;
     681               1 :         assert!(start > end, "Should test an incorrect range");
     682               1 :         match storage
     683               1 :             .download_byte_range(&upload_target, start, Some(end))
     684 UBC           0 :             .await
     685                 :         {
     686               0 :             Ok(_) => panic!("Should not allow downloading wrong ranges"),
     687 CBC           1 :             Err(e) => {
     688               1 :                 let error_string = e.to_string();
     689               1 :                 assert!(error_string.contains("Invalid range"));
     690               1 :                 assert!(error_string.contains(&start.to_string()));
     691               1 :                 assert!(error_string.contains(&end.to_string()));
     692                 :             }
     693                 :         }
     694                 : 
     695               1 :         Ok(())
     696                 :     }
     697                 : 
     698               1 :     #[tokio::test]
     699               1 :     async fn delete_file() -> anyhow::Result<()> {
     700               1 :         let storage = create_storage()?;
     701               1 :         let upload_name = "upload_1";
     702               6 :         let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
     703                 : 
     704               1 :         storage.delete(&upload_target).await?;
     705               3 :         assert!(storage.list_all().await?.is_empty());
     706                 : 
     707               1 :         storage
     708               1 :             .delete(&upload_target)
     709               1 :             .await
     710               1 :             .expect("Should allow deleting non-existing storage files");
     711               1 : 
     712               1 :         Ok(())
     713                 :     }
     714                 : 
     715               1 :     #[tokio::test]
     716               1 :     async fn file_with_metadata() -> anyhow::Result<()> {
     717               1 :         let storage = create_storage()?;
     718               1 :         let upload_name = "upload_1";
     719               1 :         let metadata = StorageMetadata(HashMap::from([
     720               1 :             ("one".to_string(), "1".to_string()),
     721               1 :             ("two".to_string(), "2".to_string()),
     722               1 :         ]));
     723               1 :         let upload_target =
     724               7 :             upload_dummy_file(&storage, upload_name, Some(metadata.clone())).await?;
     725                 : 
     726               1 :         let full_range_download_contents =
     727               4 :             read_and_assert_remote_file_contents(&storage, &upload_target, Some(&metadata)).await?;
     728               1 :         assert_eq!(
     729               1 :             dummy_contents(upload_name),
     730                 :             full_range_download_contents,
     731 UBC           0 :             "We should upload and download the same contents"
     732                 :         );
     733                 : 
     734 CBC           1 :         let uploaded_bytes = dummy_contents(upload_name).into_bytes();
     735               1 :         let (first_part_local, _) = uploaded_bytes.split_at(3);
     736                 : 
     737               1 :         let partial_download_with_metadata = storage
     738               1 :             .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
     739               3 :             .await?;
     740               1 :         let first_part_remote = aggregate(partial_download_with_metadata.download_stream).await?;
     741               1 :         assert_eq!(
     742               1 :             first_part_local,
     743               1 :             first_part_remote.as_slice(),
     744 UBC           0 :             "First part bytes should be returned when requested"
     745                 :         );
     746                 : 
     747 CBC           1 :         assert_eq!(
     748               1 :             partial_download_with_metadata.metadata,
     749               1 :             Some(metadata),
     750 UBC           0 :             "We should get the same metadata back for partial download"
     751                 :         );
     752                 : 
     753 CBC           1 :         Ok(())
     754                 :     }
     755                 : 
     756               1 :     #[tokio::test]
     757               1 :     async fn list() -> anyhow::Result<()> {
     758                 :         // No delimiter: should recursively list everything
     759               1 :         let storage = create_storage()?;
     760               6 :         let child = upload_dummy_file(&storage, "grandparent/parent/child", None).await?;
     761               6 :         let uncle = upload_dummy_file(&storage, "grandparent/uncle", None).await?;
     762                 : 
     763               1 :         let listing = storage.list(None, ListingMode::NoDelimiter).await?;
     764               1 :         assert!(listing.prefixes.is_empty());
     765               1 :         assert_eq!(listing.keys, [uncle.clone(), child.clone()].to_vec());
     766                 : 
     767                 :         // Delimiter: should only go one deep
     768               2 :         let listing = storage.list(None, ListingMode::WithDelimiter).await?;
     769                 : 
     770               1 :         assert_eq!(
     771               1 :             listing.prefixes,
     772               1 :             [RemotePath::from_string("timelines").unwrap()].to_vec()
     773               1 :         );
     774               1 :         assert!(listing.keys.is_empty());
     775                 : 
     776                 :         // Delimiter & prefix
     777               1 :         let listing = storage
     778               1 :             .list(
     779               1 :                 Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
     780               1 :                 ListingMode::WithDelimiter,
     781               1 :             )
     782               2 :             .await?;
     783               1 :         assert_eq!(
     784               1 :             listing.prefixes,
     785               1 :             [RemotePath::from_string("timelines/some_timeline/grandparent/parent").unwrap()]
     786               1 :                 .to_vec()
     787               1 :         );
     788               1 :         assert_eq!(listing.keys, [uncle.clone()].to_vec());
     789                 : 
     790               1 :         Ok(())
     791                 :     }
     792                 : 
     793               9 :     async fn upload_dummy_file(
     794               9 :         storage: &LocalFs,
     795               9 :         name: &str,
     796               9 :         metadata: Option<StorageMetadata>,
     797               9 :     ) -> anyhow::Result<RemotePath> {
     798               9 :         let from_path = storage
     799               9 :             .storage_root
     800               9 :             .join("timelines")
     801               9 :             .join("some_timeline")
     802               9 :             .join(name);
     803               9 :         let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
     804                 : 
     805               9 :         let relative_path = from_path
     806               9 :             .strip_prefix(&storage.storage_root)
     807               9 :             .context("Failed to strip storage root prefix")
     808               9 :             .and_then(RemotePath::new)
     809               9 :             .with_context(|| {
     810 UBC           0 :                 format!(
     811               0 :                     "Failed to resolve remote part of path {:?} for base {:?}",
     812               0 :                     from_path, storage.storage_root
     813               0 :                 )
     814 CBC           9 :             })?;
     815                 : 
     816               9 :         let file = tokio_util::io::ReaderStream::new(file);
     817               9 : 
     818              45 :         storage.upload(file, size, &relative_path, metadata).await?;
     819               9 :         Ok(relative_path)
     820               9 :     }
     821                 : 
     822               9 :     async fn create_file_for_upload(
     823               9 :         path: &Utf8Path,
     824               9 :         contents: &str,
     825               9 :     ) -> anyhow::Result<(fs::File, usize)> {
     826               9 :         std::fs::create_dir_all(path.parent().unwrap())?;
     827               9 :         let mut file_for_writing = std::fs::OpenOptions::new()
     828               9 :             .write(true)
     829               9 :             .create_new(true)
     830               9 :             .open(path)?;
     831               9 :         write!(file_for_writing, "{}", contents)?;
     832               9 :         drop(file_for_writing);
     833               9 :         let file_size = path.metadata()?.len() as usize;
     834               9 :         Ok((
     835               9 :             fs::OpenOptions::new().read(true).open(&path).await?,
     836               9 :             file_size,
     837                 :         ))
     838               9 :     }
     839                 : 
     840              14 :     fn dummy_contents(name: &str) -> String {
     841              14 :         format!("contents for {name}")
     842              14 :     }
     843                 : 
     844               1 :     async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
     845               3 :         let mut files = storage.list_all().await?;
     846               1 :         files.sort_by(|a, b| a.0.cmp(&b.0));
     847               1 :         Ok(files)
     848               1 :     }
     849                 : 
     850               6 :     async fn aggregate(
     851               6 :         stream: impl Stream<Item = std::io::Result<Bytes>>,
     852               6 :     ) -> anyhow::Result<Vec<u8>> {
     853               6 :         use futures::stream::StreamExt;
     854               6 :         let mut out = Vec::new();
     855               6 :         let mut stream = std::pin::pin!(stream);
     856              12 :         while let Some(res) = stream.next().await {
     857               6 :             out.extend_from_slice(&res?[..]);
     858                 :         }
     859               6 :         Ok(out)
     860               6 :     }
     861                 : }
        

Generated by: LCOV version 2.1-beta