LCOV - code coverage report
Current view: top level - libs/remote_storage/src - local_fs.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 92.6 % 730 676
Test Date: 2024-02-14 18:05:35 Functions: 64.6 % 99 64

            Line data    Source code
       1              : //! Local filesystem acting as a remote storage.
       2              : //! Multiple API users can use the same "storage" of this kind by using different storage roots.
       3              : //!
       4              : //! This storage used in tests, but can also be used in cases when a certain persistent
       5              : //! volume is mounted to the local FS.
       6              : 
       7              : use std::{
       8              :     borrow::Cow, future::Future, io::ErrorKind, num::NonZeroU32, pin::Pin, time::SystemTime,
       9              : };
      10              : 
      11              : use anyhow::{bail, ensure, Context};
      12              : use bytes::Bytes;
      13              : use camino::{Utf8Path, Utf8PathBuf};
      14              : use futures::stream::Stream;
      15              : use tokio::{
      16              :     fs,
      17              :     io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
      18              : };
      19              : use tokio_util::{io::ReaderStream, sync::CancellationToken};
      20              : use tracing::*;
      21              : use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
      22              : 
      23              : use crate::{Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError};
      24              : 
      25              : use super::{RemoteStorage, StorageMetadata};
      26              : 
      27              : const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
      28              : 
      29         4778 : #[derive(Debug, Clone)]
      30              : pub struct LocalFs {
      31              :     storage_root: Utf8PathBuf,
      32              : }
      33              : 
      34              : impl LocalFs {
      35              :     /// Attempts to create local FS storage, along with its root directory.
      36              :     /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
      37          496 :     pub fn new(mut storage_root: Utf8PathBuf) -> anyhow::Result<Self> {
      38          496 :         if !storage_root.exists() {
      39           17 :             std::fs::create_dir_all(&storage_root).with_context(|| {
      40            0 :                 format!("Failed to create all directories in the given root path {storage_root:?}")
      41           17 :             })?;
      42          479 :         }
      43          496 :         if !storage_root.is_absolute() {
      44           88 :             storage_root = storage_root.canonicalize_utf8().with_context(|| {
      45            0 :                 format!("Failed to represent path {storage_root:?} as an absolute path")
      46           88 :             })?;
      47          408 :         }
      48              : 
      49          496 :         Ok(Self { storage_root })
      50          496 :     }
      51              : 
      52              :     // mirrors S3Bucket::s3_object_to_relative_path
      53          239 :     fn local_file_to_relative_path(&self, key: Utf8PathBuf) -> RemotePath {
      54          239 :         let relative_path = key
      55          239 :             .strip_prefix(&self.storage_root)
      56          239 :             .expect("relative path must contain storage_root as prefix");
      57          239 :         RemotePath(relative_path.into())
      58          239 :     }
      59              : 
      60         1003 :     async fn read_storage_metadata(
      61         1003 :         &self,
      62         1003 :         file_path: &Utf8Path,
      63         1003 :     ) -> anyhow::Result<Option<StorageMetadata>> {
      64         1003 :         let metadata_path = storage_metadata_path(file_path);
      65         1003 :         if metadata_path.exists() && metadata_path.is_file() {
      66            4 :             let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
      67            0 :                 format!("Failed to read metadata from the local storage at '{metadata_path}'")
      68            4 :             })?;
      69              : 
      70            4 :             serde_json::from_str(&metadata_string)
      71            4 :                 .with_context(|| {
      72            0 :                     format!(
      73            0 :                         "Failed to deserialize metadata from the local storage at '{metadata_path}'",
      74            0 :                     )
      75            4 :                 })
      76            4 :                 .map(|metadata| Some(StorageMetadata(metadata)))
      77              :         } else {
      78          999 :             Ok(None)
      79              :         }
      80         1003 :     }
      81              : 
      82              :     #[cfg(test)]
      83            6 :     async fn list_all(&self) -> anyhow::Result<Vec<RemotePath>> {
      84            6 :         Ok(get_all_files(&self.storage_root, true)
      85           18 :             .await?
      86            6 :             .into_iter()
      87            6 :             .map(|path| {
      88            6 :                 path.strip_prefix(&self.storage_root)
      89            6 :                     .context("Failed to strip storage root prefix")
      90            6 :                     .and_then(RemotePath::new)
      91            6 :                     .expect(
      92            6 :                         "We list files for storage root, hence should be able to remote the prefix",
      93            6 :                     )
      94            6 :             })
      95            6 :             .collect())
      96            6 :     }
      97              : 
      98              :     // recursively lists all files in a directory,
      99              :     // mirroring the `list_files` for `s3_bucket`
     100          184 :     async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
     101          184 :         let full_path = match folder {
     102          182 :             Some(folder) => folder.with_base(&self.storage_root),
     103            2 :             None => self.storage_root.clone(),
     104              :         };
     105              : 
     106              :         // If we were given a directory, we may use it as our starting point.
     107              :         // Otherwise, we must go up to the first ancestor dir that exists.  This is because
     108              :         // S3 object list prefixes can be arbitrary strings, but when reading
     109              :         // the local filesystem we need a directory to start calling read_dir on.
     110          184 :         let mut initial_dir = full_path.clone();
     111          334 :         loop {
     112          334 :             // Did we make it to the root?
     113          334 :             if initial_dir.parent().is_none() {
     114            0 :                 anyhow::bail!("list_files: failed to find valid ancestor dir for {full_path}");
     115          334 :             }
     116          334 : 
     117          334 :             match fs::metadata(initial_dir.clone()).await {
     118          190 :                 Ok(meta) if meta.is_dir() => {
     119          184 :                     // We found a directory, break
     120          184 :                     break;
     121              :                 }
     122            6 :                 Ok(_meta) => {
     123            6 :                     // It's not a directory: strip back to the parent
     124            6 :                     initial_dir.pop();
     125            6 :                 }
     126          144 :                 Err(e) if e.kind() == ErrorKind::NotFound => {
     127          144 :                     // It's not a file that exists: strip the prefix back to the parent directory
     128          144 :                     initial_dir.pop();
     129          144 :                 }
     130            0 :                 Err(e) => {
     131            0 :                     // Unexpected I/O error
     132            0 :                     anyhow::bail!(e)
     133              :                 }
     134              :             }
     135              :         }
     136              :         // Note that Utf8PathBuf starts_with only considers full path segments, but
     137              :         // object prefixes are arbitrary strings, so we need the strings for doing
     138              :         // starts_with later.
     139          184 :         let prefix = full_path.as_str();
     140          184 : 
     141          184 :         let mut files = vec![];
     142          184 :         let mut directory_queue = vec![initial_dir];
     143          376 :         while let Some(cur_folder) = directory_queue.pop() {
     144          192 :             let mut entries = cur_folder.read_dir_utf8()?;
     145         3192 :             while let Some(Ok(entry)) = entries.next() {
     146         3000 :                 let file_name = entry.file_name();
     147         3000 :                 let full_file_name = cur_folder.join(file_name);
     148         3000 :                 if full_file_name.as_str().starts_with(prefix) {
     149          239 :                     let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
     150          239 :                     files.push(file_remote_path);
     151          239 :                     if full_file_name.is_dir() {
     152            8 :                         directory_queue.push(full_file_name);
     153          231 :                     }
     154         2761 :                 }
     155              :             }
     156              :         }
     157              : 
     158          184 :         Ok(files)
     159          184 :     }
     160              : }
     161              : 
     162              : impl RemoteStorage for LocalFs {
     163          761 :     async fn list(
     164          761 :         &self,
     165          761 :         prefix: Option<&RemotePath>,
     166          761 :         mode: ListingMode,
     167          761 :         max_keys: Option<NonZeroU32>,
     168          761 :     ) -> Result<Listing, DownloadError> {
     169          761 :         let mut result = Listing::default();
     170          761 : 
     171          761 :         if let ListingMode::NoDelimiter = mode {
     172          184 :             let keys = self
     173          184 :                 .list_recursive(prefix)
     174          332 :                 .await
     175          184 :                 .map_err(DownloadError::Other)?;
     176              : 
     177          184 :             result.keys = keys
     178          184 :                 .into_iter()
     179          239 :                 .filter(|k| {
     180          239 :                     let path = k.with_base(&self.storage_root);
     181          239 :                     !path.is_dir()
     182          239 :                 })
     183          184 :                 .collect();
     184          184 :             if let Some(max_keys) = max_keys {
     185            0 :                 result.keys.truncate(max_keys.get() as usize);
     186          184 :             }
     187              : 
     188          184 :             return Ok(result);
     189          577 :         }
     190              : 
     191          577 :         let path = match prefix {
     192          575 :             Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
     193            2 :             None => Cow::Borrowed(&self.storage_root),
     194              :         };
     195              : 
     196          577 :         let prefixes_to_filter = get_all_files(path.as_ref(), false)
     197          216 :             .await
     198          577 :             .map_err(DownloadError::Other)?;
     199              : 
     200              :         // filter out empty directories to mirror s3 behavior.
     201          854 :         for prefix in prefixes_to_filter {
     202          277 :             if prefix.is_dir()
     203          275 :                 && is_directory_empty(&prefix)
     204          267 :                     .await
     205          275 :                     .map_err(DownloadError::Other)?
     206              :             {
     207            6 :                 continue;
     208          271 :             }
     209          271 : 
     210          271 :             let stripped = prefix
     211          271 :                 .strip_prefix(&self.storage_root)
     212          271 :                 .context("Failed to strip prefix")
     213          271 :                 .and_then(RemotePath::new)
     214          271 :                 .expect(
     215          271 :                     "We list files for storage root, hence should be able to remote the prefix",
     216          271 :                 );
     217          271 : 
     218          271 :             if prefix.is_dir() {
     219          269 :                 result.prefixes.push(stripped);
     220          269 :             } else {
     221            2 :                 result.keys.push(stripped);
     222            2 :             }
     223              :         }
     224              : 
     225          577 :         Ok(result)
     226          761 :     }
     227              : 
     228        12468 :     async fn upload(
     229        12468 :         &self,
     230        12468 :         data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
     231        12468 :         data_size_bytes: usize,
     232        12468 :         to: &RemotePath,
     233        12468 :         metadata: Option<StorageMetadata>,
     234        12468 :     ) -> anyhow::Result<()> {
     235        12468 :         let target_file_path = to.with_base(&self.storage_root);
     236        12468 :         create_target_directory(&target_file_path).await?;
     237              :         // We need this dance with sort of durable rename (without fsyncs)
     238              :         // to prevent partial uploads. This was really hit when pageserver shutdown
     239              :         // cancelled the upload and partial file was left on the fs
     240              :         // NOTE: Because temp file suffix always the same this operation is racy.
     241              :         // Two concurrent operations can lead to the following sequence:
     242              :         // T1: write(temp)
     243              :         // T2: write(temp) -> overwrites the content
     244              :         // T1: rename(temp, dst) -> succeeds
     245              :         // T2: rename(temp, dst) -> fails, temp no longet exists
     246              :         // This can be solved by supplying unique temp suffix every time, but this situation
     247              :         // is not normal in the first place, the error can help (and helped at least once)
     248              :         // to discover bugs in upper level synchronization.
     249        12467 :         let temp_file_path =
     250        12467 :             path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
     251        12464 :         let mut destination = io::BufWriter::new(
     252        12467 :             fs::OpenOptions::new()
     253        12467 :                 .write(true)
     254        12467 :                 .create(true)
     255        12467 :                 .open(&temp_file_path)
     256        12297 :                 .await
     257        12464 :                 .with_context(|| {
     258            0 :                     format!("Failed to open target fs destination at '{target_file_path}'")
     259        12464 :                 })?,
     260              :         );
     261              : 
     262        12464 :         let from_size_bytes = data_size_bytes as u64;
     263        12464 :         let data = tokio_util::io::StreamReader::new(data);
     264        12464 :         let data = std::pin::pin!(data);
     265        12464 :         let mut buffer_to_read = data.take(from_size_bytes);
     266              : 
     267              :         // alternatively we could just write the bytes to a file, but local_fs is a testing utility
     268        12464 :         let bytes_read = io::copy_buf(&mut buffer_to_read, &mut destination)
     269       910790 :             .await
     270        12462 :             .with_context(|| {
     271            0 :                 format!(
     272            0 :                     "Failed to upload file (write temp) to the local storage at '{temp_file_path}'",
     273            0 :                 )
     274        12462 :             })?;
     275              : 
     276        12462 :         if bytes_read < from_size_bytes {
     277            2 :             bail!("Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes");
     278        12460 :         }
     279        12460 :         // Check if there is any extra data after the given size.
     280        12460 :         let mut from = buffer_to_read.into_inner();
     281        12460 :         let extra_read = from.read(&mut [1]).await?;
     282        12460 :         ensure!(
     283        12460 :             extra_read == 0,
     284            4 :             "Provided stream was larger than expected: expected {from_size_bytes} bytes",
     285              :         );
     286              : 
     287        12456 :         destination.flush().await.with_context(|| {
     288            0 :             format!(
     289            0 :                 "Failed to upload (flush temp) file to the local storage at '{temp_file_path}'",
     290            0 :             )
     291        12456 :         })?;
     292              : 
     293        12456 :         fs::rename(temp_file_path, &target_file_path)
     294        12175 :             .await
     295        12454 :             .with_context(|| {
     296            0 :                 format!(
     297            0 :                     "Failed to upload (rename) file to the local storage at '{target_file_path}'",
     298            0 :                 )
     299        12454 :             })?;
     300              : 
     301        12454 :         if let Some(storage_metadata) = metadata {
     302            2 :             let storage_metadata_path = storage_metadata_path(&target_file_path);
     303            2 :             fs::write(
     304            2 :                 &storage_metadata_path,
     305            2 :                 serde_json::to_string(&storage_metadata.0)
     306            2 :                     .context("Failed to serialize storage metadata as json")?,
     307              :             )
     308            2 :             .await
     309            2 :             .with_context(|| {
     310            0 :                 format!(
     311            0 :                     "Failed to write metadata to the local storage at '{storage_metadata_path}'",
     312            0 :                 )
     313            2 :             })?;
     314        12452 :         }
     315              : 
     316        12454 :         Ok(())
     317        12460 :     }
     318              : 
     319         1411 :     async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
     320         1411 :         let target_path = from.with_base(&self.storage_root);
     321         1411 :         if file_exists(&target_path).map_err(DownloadError::BadInput)? {
     322          991 :             let source = ReaderStream::new(
     323          991 :                 fs::OpenOptions::new()
     324          991 :                     .read(true)
     325          991 :                     .open(&target_path)
     326          973 :                     .await
     327          991 :                     .with_context(|| {
     328            0 :                         format!("Failed to open source file {target_path:?} to use in the download")
     329          991 :                     })
     330          991 :                     .map_err(DownloadError::Other)?,
     331              :             );
     332              : 
     333          991 :             let metadata = self
     334          991 :                 .read_storage_metadata(&target_path)
     335            2 :                 .await
     336          991 :                 .map_err(DownloadError::Other)?;
     337          991 :             Ok(Download {
     338          991 :                 metadata,
     339          991 :                 last_modified: None,
     340          991 :                 etag: None,
     341          991 :                 download_stream: Box::pin(source),
     342          991 :             })
     343              :         } else {
     344          420 :             Err(DownloadError::NotFound)
     345              :         }
     346         1411 :     }
     347              : 
     348           16 :     async fn download_byte_range(
     349           16 :         &self,
     350           16 :         from: &RemotePath,
     351           16 :         start_inclusive: u64,
     352           16 :         end_exclusive: Option<u64>,
     353           16 :     ) -> Result<Download, DownloadError> {
     354           16 :         if let Some(end_exclusive) = end_exclusive {
     355           10 :             if end_exclusive <= start_inclusive {
     356            2 :                 return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
     357            8 :             };
     358            8 :             if start_inclusive == end_exclusive.saturating_sub(1) {
     359            2 :                 return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
     360            6 :             }
     361            6 :         }
     362           12 :         let target_path = from.with_base(&self.storage_root);
     363           12 :         if file_exists(&target_path).map_err(DownloadError::BadInput)? {
     364           12 :             let mut source = tokio::fs::OpenOptions::new()
     365           12 :                 .read(true)
     366           12 :                 .open(&target_path)
     367           12 :                 .await
     368           12 :                 .with_context(|| {
     369            0 :                     format!("Failed to open source file {target_path:?} to use in the download")
     370           12 :                 })
     371           12 :                 .map_err(DownloadError::Other)?;
     372              : 
     373           12 :             let len = source
     374           12 :                 .metadata()
     375           12 :                 .await
     376           12 :                 .context("query file length")
     377           12 :                 .map_err(DownloadError::Other)?
     378           12 :                 .len();
     379           12 : 
     380           12 :             source
     381           12 :                 .seek(io::SeekFrom::Start(start_inclusive))
     382           12 :                 .await
     383           12 :                 .context("Failed to seek to the range start in a local storage file")
     384           12 :                 .map_err(DownloadError::Other)?;
     385              : 
     386           12 :             let metadata = self
     387           12 :                 .read_storage_metadata(&target_path)
     388            2 :                 .await
     389           12 :                 .map_err(DownloadError::Other)?;
     390              : 
     391           12 :             let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive);
     392           12 :             let source = ReaderStream::new(source);
     393           12 : 
     394           12 :             Ok(Download {
     395           12 :                 metadata,
     396           12 :                 last_modified: None,
     397           12 :                 etag: None,
     398           12 :                 download_stream: Box::pin(source),
     399           12 :             })
     400              :         } else {
     401            0 :             Err(DownloadError::NotFound)
     402              :         }
     403           16 :     }
     404              : 
     405         1164 :     async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
     406         1164 :         let file_path = path.with_base(&self.storage_root);
     407         1164 :         match fs::remove_file(&file_path).await {
     408          587 :             Ok(()) => Ok(()),
     409              :             // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
     410              :             // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
     411              :             // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
     412          577 :             Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
     413            0 :             Err(e) => Err(anyhow::anyhow!(e)),
     414              :         }
     415         1164 :     }
     416              : 
     417           79 :     async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
     418         1223 :         for path in paths {
     419         1144 :             self.delete(path).await?
     420              :         }
     421           79 :         Ok(())
     422           79 :     }
     423              : 
     424            2 :     async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
     425            2 :         let from_path = from.with_base(&self.storage_root);
     426            2 :         let to_path = to.with_base(&self.storage_root);
     427            2 :         create_target_directory(&to_path).await?;
     428            2 :         fs::copy(&from_path, &to_path).await.with_context(|| {
     429            0 :             format!(
     430            0 :                 "Failed to copy file from '{from_path}' to '{to_path}'",
     431            0 :                 from_path = from_path,
     432            0 :                 to_path = to_path
     433            0 :             )
     434            2 :         })?;
     435            2 :         Ok(())
     436            2 :     }
     437              : 
     438              :     #[allow(clippy::diverging_sub_expression)]
     439            0 :     async fn time_travel_recover(
     440            0 :         &self,
     441            0 :         _prefix: Option<&RemotePath>,
     442            0 :         _timestamp: SystemTime,
     443            0 :         _done_if_after: SystemTime,
     444            0 :         _cancel: &CancellationToken,
     445            0 :     ) -> Result<(), TimeTravelError> {
     446            0 :         Err(TimeTravelError::Unimplemented)
     447            0 :     }
     448              : }
     449              : 
     450         1005 : fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
     451         1005 :     path_with_suffix_extension(original_path, "metadata")
     452         1005 : }
     453              : 
     454          595 : fn get_all_files<'a, P>(
     455          595 :     directory_path: P,
     456          595 :     recursive: bool,
     457          595 : ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Utf8PathBuf>>> + Send + Sync + 'a>>
     458          595 : where
     459          595 :     P: AsRef<Utf8Path> + Send + Sync + 'a,
     460          595 : {
     461          595 :     Box::pin(async move {
     462          595 :         let directory_path = directory_path.as_ref();
     463          595 :         if directory_path.exists() {
     464          235 :             if directory_path.is_dir() {
     465          235 :                 let mut paths = Vec::new();
     466          235 :                 let mut dir_contents = fs::read_dir(directory_path).await?;
     467          530 :                 while let Some(dir_entry) = dir_contents.next_entry().await? {
     468          295 :                     let file_type = dir_entry.file_type().await?;
     469          295 :                     let entry_path =
     470          295 :                         Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| {
     471            0 :                             anyhow::Error::msg(format!(
     472            0 :                                 "non-Unicode path: {}",
     473            0 :                                 pb.to_string_lossy()
     474            0 :                             ))
     475          295 :                         })?;
     476          295 :                     if file_type.is_symlink() {
     477            0 :                         debug!("{entry_path:?} is a symlink, skipping")
     478          295 :                     } else if file_type.is_dir() {
     479          287 :                         if recursive {
     480           18 :                             paths.extend(get_all_files(&entry_path, true).await?.into_iter())
     481              :                         } else {
     482          275 :                             paths.push(entry_path)
     483              :                         }
     484            8 :                     } else {
     485            8 :                         paths.push(entry_path);
     486            8 :                     }
     487              :                 }
     488          235 :                 Ok(paths)
     489              :             } else {
     490            0 :                 bail!("Path {directory_path:?} is not a directory")
     491              :             }
     492              :         } else {
     493          360 :             Ok(Vec::new())
     494              :         }
     495          595 :     })
     496          595 : }
     497              : 
     498        12470 : async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
     499        12470 :     let target_dir = match target_file_path.parent() {
     500        12470 :         Some(parent_dir) => parent_dir,
     501            0 :         None => bail!("File path '{target_file_path}' has no parent directory"),
     502              :     };
     503        12470 :     if !target_dir.exists() {
     504          926 :         fs::create_dir_all(target_dir).await?;
     505        11544 :     }
     506        12469 :     Ok(())
     507        12469 : }
     508              : 
     509         1423 : fn file_exists(file_path: &Utf8Path) -> anyhow::Result<bool> {
     510         1423 :     if file_path.exists() {
     511         1003 :         ensure!(file_path.is_file(), "file path '{file_path}' is not a file");
     512         1003 :         Ok(true)
     513              :     } else {
     514          420 :         Ok(false)
     515              :     }
     516         1423 : }
     517              : 
     518              : #[cfg(test)]
     519              : mod fs_tests {
     520              :     use super::*;
     521              : 
     522              :     use bytes::Bytes;
     523              :     use camino_tempfile::tempdir;
     524              :     use futures_util::Stream;
     525              :     use std::{collections::HashMap, io::Write};
     526              : 
     527            6 :     async fn read_and_check_metadata(
     528            6 :         storage: &LocalFs,
     529            6 :         remote_storage_path: &RemotePath,
     530            6 :         expected_metadata: Option<&StorageMetadata>,
     531            6 :     ) -> anyhow::Result<String> {
     532            6 :         let download = storage
     533            6 :             .download(remote_storage_path)
     534            8 :             .await
     535            6 :             .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
     536            6 :         ensure!(
     537            6 :             download.metadata.as_ref() == expected_metadata,
     538            0 :             "Unexpected metadata returned for the downloaded file"
     539              :         );
     540              : 
     541           12 :         let contents = aggregate(download.download_stream).await?;
     542              : 
     543            6 :         String::from_utf8(contents).map_err(anyhow::Error::new)
     544            6 :     }
     545              : 
     546            2 :     #[tokio::test]
     547            2 :     async fn upload_file() -> anyhow::Result<()> {
     548            2 :         let storage = create_storage()?;
     549            2 : 
     550           12 :         let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?;
     551            2 :         assert_eq!(
     552            6 :             storage.list_all().await?,
     553            2 :             vec![target_path_1.clone()],
     554            2 :             "Should list a single file after first upload"
     555            2 :         );
     556            2 : 
     557           12 :         let target_path_2 = upload_dummy_file(&storage, "upload_2", None).await?;
     558            2 :         assert_eq!(
     559            6 :             list_files_sorted(&storage).await?,
     560            2 :             vec![target_path_1.clone(), target_path_2.clone()],
     561            2 :             "Should list a two different files after second upload"
     562            2 :         );
     563            2 : 
     564            2 :         Ok(())
     565            2 :     }
     566              : 
     567            2 :     #[tokio::test]
     568            2 :     async fn upload_file_negatives() -> anyhow::Result<()> {
     569            2 :         let storage = create_storage()?;
     570            2 : 
     571            2 :         let id = RemotePath::new(Utf8Path::new("dummy"))?;
     572            2 :         let content = Bytes::from_static(b"12345");
     573            8 :         let content = move || futures::stream::once(futures::future::ready(Ok(content.clone())));
     574            2 : 
     575            2 :         // Check that you get an error if the size parameter doesn't match the actual
     576            2 :         // size of the stream.
     577            2 :         storage
     578            2 :             .upload(content(), 0, &id, None)
     579            2 :             .await
     580            2 :             .expect_err("upload with zero size succeeded");
     581            2 :         storage
     582            2 :             .upload(content(), 4, &id, None)
     583            4 :             .await
     584            2 :             .expect_err("upload with too short size succeeded");
     585            2 :         storage
     586            2 :             .upload(content(), 6, &id, None)
     587            4 :             .await
     588            2 :             .expect_err("upload with too large size succeeded");
     589            2 : 
     590            2 :         // Correct size is 5, this should succeed.
     591            6 :         storage.upload(content(), 5, &id, None).await?;
     592            2 : 
     593            2 :         Ok(())
     594            2 :     }
     595              : 
     596           16 :     fn create_storage() -> anyhow::Result<LocalFs> {
     597           16 :         let storage_root = tempdir()?.path().to_path_buf();
     598           16 :         LocalFs::new(storage_root)
     599           16 :     }
     600              : 
     601            2 :     #[tokio::test]
     602            2 :     async fn download_file() -> anyhow::Result<()> {
     603            2 :         let storage = create_storage()?;
     604            2 :         let upload_name = "upload_1";
     605           12 :         let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
     606            2 : 
     607            6 :         let contents = read_and_check_metadata(&storage, &upload_target, None).await?;
     608            2 :         assert_eq!(
     609            2 :             dummy_contents(upload_name),
     610            2 :             contents,
     611            2 :             "We should upload and download the same contents"
     612            2 :         );
     613            2 : 
     614            2 :         let non_existing_path = "somewhere/else";
     615            2 :         match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?).await {
     616            2 :             Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
     617            2 :             other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
     618            2 :         }
     619            2 :         Ok(())
     620            2 :     }
     621              : 
     622            2 :     #[tokio::test]
     623            2 :     async fn download_file_range_positive() -> anyhow::Result<()> {
     624            2 :         let storage = create_storage()?;
     625            2 :         let upload_name = "upload_1";
     626           12 :         let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
     627            2 : 
     628            2 :         let full_range_download_contents =
     629            6 :             read_and_check_metadata(&storage, &upload_target, None).await?;
     630            2 :         assert_eq!(
     631            2 :             dummy_contents(upload_name),
     632            2 :             full_range_download_contents,
     633            2 :             "Download full range should return the whole upload"
     634            2 :         );
     635            2 : 
     636            2 :         let uploaded_bytes = dummy_contents(upload_name).into_bytes();
     637            2 :         let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
     638            2 : 
     639            2 :         let first_part_download = storage
     640            2 :             .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
     641            6 :             .await?;
     642            2 :         assert!(
     643            2 :             first_part_download.metadata.is_none(),
     644            2 :             "No metadata should be returned for no metadata upload"
     645            2 :         );
     646            2 : 
     647            2 :         let first_part_remote = aggregate(first_part_download.download_stream).await?;
     648            2 :         assert_eq!(
     649            2 :             first_part_local, first_part_remote,
     650            2 :             "First part bytes should be returned when requested"
     651            2 :         );
     652            2 : 
     653            2 :         let second_part_download = storage
     654            2 :             .download_byte_range(
     655            2 :                 &upload_target,
     656            2 :                 first_part_local.len() as u64,
     657            2 :                 Some((first_part_local.len() + second_part_local.len()) as u64),
     658            2 :             )
     659            6 :             .await?;
     660            2 :         assert!(
     661            2 :             second_part_download.metadata.is_none(),
     662            2 :             "No metadata should be returned for no metadata upload"
     663            2 :         );
     664            2 : 
     665            2 :         let second_part_remote = aggregate(second_part_download.download_stream).await?;
     666            2 :         assert_eq!(
     667            2 :             second_part_local, second_part_remote,
     668            2 :             "Second part bytes should be returned when requested"
     669            2 :         );
     670            2 : 
     671            2 :         let suffix_bytes = storage
     672            2 :             .download_byte_range(&upload_target, 13, None)
     673            6 :             .await?
     674            2 :             .download_stream;
     675            2 :         let suffix_bytes = aggregate(suffix_bytes).await?;
     676            2 :         let suffix = std::str::from_utf8(&suffix_bytes)?;
     677            2 :         assert_eq!(upload_name, suffix);
     678            2 : 
     679            2 :         let all_bytes = storage
     680            2 :             .download_byte_range(&upload_target, 0, None)
     681            6 :             .await?
     682            2 :             .download_stream;
     683            2 :         let all_bytes = aggregate(all_bytes).await?;
     684            2 :         let all_bytes = std::str::from_utf8(&all_bytes)?;
     685            2 :         assert_eq!(dummy_contents("upload_1"), all_bytes);
     686            2 : 
     687            2 :         Ok(())
     688            2 :     }
     689              : 
     690            2 :     #[tokio::test]
     691            2 :     async fn download_file_range_negative() -> anyhow::Result<()> {
     692            2 :         let storage = create_storage()?;
     693            2 :         let upload_name = "upload_1";
     694           12 :         let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
     695            2 : 
     696            2 :         let start = 1_000_000_000;
     697            2 :         let end = start + 1;
     698            2 :         match storage
     699            2 :             .download_byte_range(
     700            2 :                 &upload_target,
     701            2 :                 start,
     702            2 :                 Some(end), // exclusive end
     703            2 :             )
     704            2 :             .await
     705            2 :         {
     706            2 :             Ok(_) => panic!("Should not allow downloading wrong ranges"),
     707            2 :             Err(e) => {
     708            2 :                 let error_string = e.to_string();
     709            2 :                 assert!(error_string.contains("zero bytes"));
     710            2 :                 assert!(error_string.contains(&start.to_string()));
     711            2 :                 assert!(error_string.contains(&end.to_string()));
     712            2 :             }
     713            2 :         }
     714            2 : 
     715            2 :         let start = 10000;
     716            2 :         let end = 234;
     717            2 :         assert!(start > end, "Should test an incorrect range");
     718            2 :         match storage
     719            2 :             .download_byte_range(&upload_target, start, Some(end))
     720            2 :             .await
     721            2 :         {
     722            2 :             Ok(_) => panic!("Should not allow downloading wrong ranges"),
     723            2 :             Err(e) => {
     724            2 :                 let error_string = e.to_string();
     725            2 :                 assert!(error_string.contains("Invalid range"));
     726            2 :                 assert!(error_string.contains(&start.to_string()));
     727            2 :                 assert!(error_string.contains(&end.to_string()));
     728            2 :             }
     729            2 :         }
     730            2 : 
     731            2 :         Ok(())
     732            2 :     }
     733              : 
     734            2 :     #[tokio::test]
     735            2 :     async fn delete_file() -> anyhow::Result<()> {
     736            2 :         let storage = create_storage()?;
     737            2 :         let upload_name = "upload_1";
     738           12 :         let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
     739            2 : 
     740            2 :         storage.delete(&upload_target).await?;
     741            6 :         assert!(storage.list_all().await?.is_empty());
     742            2 : 
     743            2 :         storage
     744            2 :             .delete(&upload_target)
     745            2 :             .await
     746            2 :             .expect("Should allow deleting non-existing storage files");
     747            2 : 
     748            2 :         Ok(())
     749            2 :     }
     750              : 
     751            2 :     #[tokio::test]
     752            2 :     async fn file_with_metadata() -> anyhow::Result<()> {
     753            2 :         let storage = create_storage()?;
     754            2 :         let upload_name = "upload_1";
     755            2 :         let metadata = StorageMetadata(HashMap::from([
     756            2 :             ("one".to_string(), "1".to_string()),
     757            2 :             ("two".to_string(), "2".to_string()),
     758            2 :         ]));
     759            2 :         let upload_target =
     760           14 :             upload_dummy_file(&storage, upload_name, Some(metadata.clone())).await?;
     761            2 : 
     762            2 :         let full_range_download_contents =
     763            8 :             read_and_check_metadata(&storage, &upload_target, Some(&metadata)).await?;
     764            2 :         assert_eq!(
     765            2 :             dummy_contents(upload_name),
     766            2 :             full_range_download_contents,
     767            2 :             "We should upload and download the same contents"
     768            2 :         );
     769            2 : 
     770            2 :         let uploaded_bytes = dummy_contents(upload_name).into_bytes();
     771            2 :         let (first_part_local, _) = uploaded_bytes.split_at(3);
     772            2 : 
     773            2 :         let partial_download_with_metadata = storage
     774            2 :             .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
     775            8 :             .await?;
     776            2 :         let first_part_remote = aggregate(partial_download_with_metadata.download_stream).await?;
     777            2 :         assert_eq!(
     778            2 :             first_part_local,
     779            2 :             first_part_remote.as_slice(),
     780            2 :             "First part bytes should be returned when requested"
     781            2 :         );
     782            2 : 
     783            2 :         assert_eq!(
     784            2 :             partial_download_with_metadata.metadata,
     785            2 :             Some(metadata),
     786            2 :             "We should get the same metadata back for partial download"
     787            2 :         );
     788            2 : 
     789            2 :         Ok(())
     790            2 :     }
     791              : 
     792            2 :     #[tokio::test]
     793            2 :     async fn list() -> anyhow::Result<()> {
     794            2 :         // No delimiter: should recursively list everything
     795            2 :         let storage = create_storage()?;
     796            7 :         let child = upload_dummy_file(&storage, "grandparent/parent/child", None).await?;
     797           12 :         let uncle = upload_dummy_file(&storage, "grandparent/uncle", None).await?;
     798            2 : 
     799            2 :         let listing = storage.list(None, ListingMode::NoDelimiter, None).await?;
     800            2 :         assert!(listing.prefixes.is_empty());
     801            2 :         assert_eq!(listing.keys, [uncle.clone(), child.clone()].to_vec());
     802            2 : 
     803            2 :         // Delimiter: should only go one deep
     804            4 :         let listing = storage.list(None, ListingMode::WithDelimiter, None).await?;
     805            2 : 
     806            2 :         assert_eq!(
     807            2 :             listing.prefixes,
     808            2 :             [RemotePath::from_string("timelines").unwrap()].to_vec()
     809            2 :         );
     810            2 :         assert!(listing.keys.is_empty());
     811            2 : 
     812            2 :         // Delimiter & prefix
     813            2 :         let listing = storage
     814            2 :             .list(
     815            2 :                 Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
     816            2 :                 ListingMode::WithDelimiter,
     817            2 :                 None,
     818            2 :             )
     819            4 :             .await?;
     820            2 :         assert_eq!(
     821            2 :             listing.prefixes,
     822            2 :             [RemotePath::from_string("timelines/some_timeline/grandparent/parent").unwrap()]
     823            2 :                 .to_vec()
     824            2 :         );
     825            2 :         assert_eq!(listing.keys, [uncle.clone()].to_vec());
     826            2 : 
     827            2 :         Ok(())
     828            2 :     }
     829              : 
     830           18 :     async fn upload_dummy_file(
     831           18 :         storage: &LocalFs,
     832           18 :         name: &str,
     833           18 :         metadata: Option<StorageMetadata>,
     834           18 :     ) -> anyhow::Result<RemotePath> {
     835           18 :         let from_path = storage
     836           18 :             .storage_root
     837           18 :             .join("timelines")
     838           18 :             .join("some_timeline")
     839           18 :             .join(name);
     840           18 :         let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
     841              : 
     842           18 :         let relative_path = from_path
     843           18 :             .strip_prefix(&storage.storage_root)
     844           18 :             .context("Failed to strip storage root prefix")
     845           18 :             .and_then(RemotePath::new)
     846           18 :             .with_context(|| {
     847            0 :                 format!(
     848            0 :                     "Failed to resolve remote part of path {:?} for base {:?}",
     849            0 :                     from_path, storage.storage_root
     850            0 :                 )
     851           18 :             })?;
     852              : 
     853           18 :         let file = tokio_util::io::ReaderStream::new(file);
     854           18 : 
     855           87 :         storage.upload(file, size, &relative_path, metadata).await?;
     856           18 :         Ok(relative_path)
     857           18 :     }
     858              : 
     859           18 :     async fn create_file_for_upload(
     860           18 :         path: &Utf8Path,
     861           18 :         contents: &str,
     862           18 :     ) -> anyhow::Result<(fs::File, usize)> {
     863           18 :         std::fs::create_dir_all(path.parent().unwrap())?;
     864           18 :         let mut file_for_writing = std::fs::OpenOptions::new()
     865           18 :             .write(true)
     866           18 :             .create_new(true)
     867           18 :             .open(path)?;
     868           18 :         write!(file_for_writing, "{}", contents)?;
     869           18 :         drop(file_for_writing);
     870           18 :         let file_size = path.metadata()?.len() as usize;
     871           18 :         Ok((
     872           18 :             fs::OpenOptions::new().read(true).open(&path).await?,
     873           18 :             file_size,
     874              :         ))
     875           18 :     }
     876              : 
     877           30 :     fn dummy_contents(name: &str) -> String {
     878           30 :         format!("contents for {name}")
     879           30 :     }
     880              : 
     881            2 :     async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
     882            6 :         let mut files = storage.list_all().await?;
     883            2 :         files.sort_by(|a, b| a.0.cmp(&b.0));
     884            2 :         Ok(files)
     885            2 :     }
     886              : 
     887           16 :     async fn aggregate(
     888           16 :         stream: impl Stream<Item = std::io::Result<Bytes>>,
     889           16 :     ) -> anyhow::Result<Vec<u8>> {
     890           16 :         use futures::stream::StreamExt;
     891           16 :         let mut out = Vec::new();
     892           16 :         let mut stream = std::pin::pin!(stream);
     893           32 :         while let Some(res) = stream.next().await {
     894           16 :             out.extend_from_slice(&res?[..]);
     895              :         }
     896           16 :         Ok(out)
     897           16 :     }
     898              : }
        

Generated by: LCOV version 2.1-beta