LCOV - code coverage report
Current view: top level - libs/remote_storage/src - local_fs.rs (source / functions) Coverage Total Hit
Test: b837401fb09d2d9818b70e630fdb67e9799b7b0d.info Lines: 90.7 % 874 793
Test Date: 2024-04-18 15:32:49 Functions: 55.9 % 145 81

            Line data    Source code
       1              : //! Local filesystem acting as a remote storage.
       2              : //! Multiple API users can use the same "storage" of this kind by using different storage roots.
       3              : //!
       4              : //! This storage used in tests, but can also be used in cases when a certain persistent
       5              : //! volume is mounted to the local FS.
       6              : 
       7              : use std::{
       8              :     borrow::Cow,
       9              :     future::Future,
      10              :     io::ErrorKind,
      11              :     num::NonZeroU32,
      12              :     pin::Pin,
      13              :     time::{Duration, SystemTime, UNIX_EPOCH},
      14              : };
      15              : 
      16              : use anyhow::{bail, ensure, Context};
      17              : use bytes::Bytes;
      18              : use camino::{Utf8Path, Utf8PathBuf};
      19              : use futures::stream::Stream;
      20              : use tokio::{
      21              :     fs,
      22              :     io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
      23              : };
      24              : use tokio_util::{io::ReaderStream, sync::CancellationToken};
      25              : use tracing::*;
      26              : use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
      27              : 
      28              : use crate::{
      29              :     Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError, TimeoutOrCancel,
      30              : };
      31              : 
      32              : use super::{RemoteStorage, StorageMetadata};
      33              : use crate::Etag;
      34              : 
      35              : const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
      36              : 
      37              : #[derive(Debug, Clone)]
      38              : pub struct LocalFs {
      39              :     storage_root: Utf8PathBuf,
      40              :     timeout: Duration,
      41              : }
      42              : 
      43              : impl LocalFs {
      44              :     /// Attempts to create local FS storage, along with its root directory.
      45              :     /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
      46          144 :     pub fn new(mut storage_root: Utf8PathBuf, timeout: Duration) -> anyhow::Result<Self> {
      47          144 :         if !storage_root.exists() {
      48           20 :             std::fs::create_dir_all(&storage_root).with_context(|| {
      49            0 :                 format!("Failed to create all directories in the given root path {storage_root:?}")
      50           20 :             })?;
      51          124 :         }
      52          144 :         if !storage_root.is_absolute() {
      53          108 :             storage_root = storage_root.canonicalize_utf8().with_context(|| {
      54            0 :                 format!("Failed to represent path {storage_root:?} as an absolute path")
      55          108 :             })?;
      56           36 :         }
      57              : 
      58          144 :         Ok(Self {
      59          144 :             storage_root,
      60          144 :             timeout,
      61          144 :         })
      62          144 :     }
      63              : 
      64              :     // mirrors S3Bucket::s3_object_to_relative_path
      65           30 :     fn local_file_to_relative_path(&self, key: Utf8PathBuf) -> RemotePath {
      66           30 :         let relative_path = key
      67           30 :             .strip_prefix(&self.storage_root)
      68           30 :             .expect("relative path must contain storage_root as prefix");
      69           30 :         RemotePath(relative_path.into())
      70           30 :     }
      71              : 
      72           50 :     async fn read_storage_metadata(
      73           50 :         &self,
      74           50 :         file_path: &Utf8Path,
      75           50 :     ) -> anyhow::Result<Option<StorageMetadata>> {
      76           50 :         let metadata_path = storage_metadata_path(file_path);
      77           50 :         if metadata_path.exists() && metadata_path.is_file() {
      78            4 :             let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
      79            0 :                 format!("Failed to read metadata from the local storage at '{metadata_path}'")
      80            4 :             })?;
      81              : 
      82            4 :             serde_json::from_str(&metadata_string)
      83            4 :                 .with_context(|| {
      84            0 :                     format!(
      85            0 :                         "Failed to deserialize metadata from the local storage at '{metadata_path}'",
      86            0 :                     )
      87            4 :                 })
      88            4 :                 .map(|metadata| Some(StorageMetadata(metadata)))
      89              :         } else {
      90           46 :             Ok(None)
      91              :         }
      92           50 :     }
      93              : 
      94              :     #[cfg(test)]
      95            6 :     async fn list_all(&self) -> anyhow::Result<Vec<RemotePath>> {
      96            6 :         Ok(get_all_files(&self.storage_root, true)
      97           18 :             .await?
      98            6 :             .into_iter()
      99            6 :             .map(|path| {
     100            6 :                 path.strip_prefix(&self.storage_root)
     101            6 :                     .context("Failed to strip storage root prefix")
     102            6 :                     .and_then(RemotePath::new)
     103            6 :                     .expect(
     104            6 :                         "We list files for storage root, hence should be able to remote the prefix",
     105            6 :                     )
     106            6 :             })
     107            6 :             .collect())
     108            6 :     }
     109              : 
     110              :     // recursively lists all files in a directory,
     111              :     // mirroring the `list_files` for `s3_bucket`
     112            8 :     async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
     113            8 :         let full_path = match folder {
     114            6 :             Some(folder) => folder.with_base(&self.storage_root),
     115            2 :             None => self.storage_root.clone(),
     116              :         };
     117              : 
     118              :         // If we were given a directory, we may use it as our starting point.
     119              :         // Otherwise, we must go up to the first ancestor dir that exists.  This is because
     120              :         // S3 object list prefixes can be arbitrary strings, but when reading
     121              :         // the local filesystem we need a directory to start calling read_dir on.
     122            8 :         let mut initial_dir = full_path.clone();
     123           14 :         loop {
     124           14 :             // Did we make it to the root?
     125           14 :             if initial_dir.parent().is_none() {
     126            0 :                 anyhow::bail!("list_files: failed to find valid ancestor dir for {full_path}");
     127           14 :             }
     128           14 : 
     129           14 :             match fs::metadata(initial_dir.clone()).await {
     130           14 :                 Ok(meta) if meta.is_dir() => {
     131            8 :                     // We found a directory, break
     132            8 :                     break;
     133              :                 }
     134            6 :                 Ok(_meta) => {
     135            6 :                     // It's not a directory: strip back to the parent
     136            6 :                     initial_dir.pop();
     137            6 :                 }
     138            0 :                 Err(e) if e.kind() == ErrorKind::NotFound => {
     139            0 :                     // It's not a file that exists: strip the prefix back to the parent directory
     140            0 :                     initial_dir.pop();
     141            0 :                 }
     142            0 :                 Err(e) => {
     143            0 :                     // Unexpected I/O error
     144            0 :                     anyhow::bail!(e)
     145              :                 }
     146              :             }
     147              :         }
     148              :         // Note that Utf8PathBuf starts_with only considers full path segments, but
     149              :         // object prefixes are arbitrary strings, so we need the strings for doing
     150              :         // starts_with later.
     151            8 :         let prefix = full_path.as_str();
     152            8 : 
     153            8 :         let mut files = vec![];
     154            8 :         let mut directory_queue = vec![initial_dir];
     155           24 :         while let Some(cur_folder) = directory_queue.pop() {
     156           16 :             let mut entries = cur_folder.read_dir_utf8()?;
     157           52 :             while let Some(Ok(entry)) = entries.next() {
     158           36 :                 let file_name = entry.file_name();
     159           36 :                 let full_file_name = cur_folder.join(file_name);
     160           36 :                 if full_file_name.as_str().starts_with(prefix) {
     161           30 :                     let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
     162           30 :                     files.push(file_remote_path);
     163           30 :                     if full_file_name.is_dir() {
     164            8 :                         directory_queue.push(full_file_name);
     165           22 :                     }
     166            6 :                 }
     167              :             }
     168              :         }
     169              : 
     170            8 :         Ok(files)
     171            8 :     }
     172              : 
     173         1874 :     async fn upload0(
     174         1874 :         &self,
     175         1874 :         data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
     176         1874 :         data_size_bytes: usize,
     177         1874 :         to: &RemotePath,
     178         1874 :         metadata: Option<StorageMetadata>,
     179         1874 :         cancel: &CancellationToken,
     180         1874 :     ) -> anyhow::Result<()> {
     181         1874 :         let target_file_path = to.with_base(&self.storage_root);
     182         1874 :         create_target_directory(&target_file_path).await?;
     183              :         // We need this dance with sort of durable rename (without fsyncs)
     184              :         // to prevent partial uploads. This was really hit when pageserver shutdown
     185              :         // cancelled the upload and partial file was left on the fs
     186              :         // NOTE: Because temp file suffix always the same this operation is racy.
     187              :         // Two concurrent operations can lead to the following sequence:
     188              :         // T1: write(temp)
     189              :         // T2: write(temp) -> overwrites the content
     190              :         // T1: rename(temp, dst) -> succeeds
     191              :         // T2: rename(temp, dst) -> fails, temp no longet exists
     192              :         // This can be solved by supplying unique temp suffix every time, but this situation
     193              :         // is not normal in the first place, the error can help (and helped at least once)
     194              :         // to discover bugs in upper level synchronization.
     195         1873 :         let temp_file_path =
     196         1873 :             path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
     197         1775 :         let mut destination = io::BufWriter::new(
     198         1873 :             fs::OpenOptions::new()
     199         1873 :                 .write(true)
     200         1873 :                 .create(true)
     201         1873 :                 .truncate(true)
     202         1873 :                 .open(&temp_file_path)
     203         1727 :                 .await
     204         1775 :                 .with_context(|| {
     205            0 :                     format!("Failed to open target fs destination at '{target_file_path}'")
     206         1775 :                 })?,
     207              :         );
     208              : 
     209         1775 :         let from_size_bytes = data_size_bytes as u64;
     210         1775 :         let data = tokio_util::io::StreamReader::new(data);
     211         1775 :         let data = std::pin::pin!(data);
     212         1775 :         let mut buffer_to_read = data.take(from_size_bytes);
     213         1775 : 
     214         1775 :         // alternatively we could just write the bytes to a file, but local_fs is a testing utility
     215         1775 :         let copy = io::copy_buf(&mut buffer_to_read, &mut destination);
     216              : 
     217        23602 :         let bytes_read = tokio::select! {
     218              :             biased;
     219              :             _ = cancel.cancelled() => {
     220              :                 let file = destination.into_inner();
     221              :                 // wait for the inflight operation(s) to complete so that there could be a next
     222              :                 // attempt right away and our writes are not directed to their file.
     223              :                 file.into_std().await;
     224              : 
     225              :                 // TODO: leave the temp or not? leaving is probably less racy. enabled truncate at
     226              :                 // least.
     227              :                 fs::remove_file(temp_file_path).await.context("remove temp_file_path after cancellation or timeout")?;
     228              :                 return Err(TimeoutOrCancel::Cancel.into());
     229              :             }
     230         1766 :             read = copy => read,
     231              :         };
     232              : 
     233         1766 :         let bytes_read =
     234         1766 :             bytes_read.with_context(|| {
     235            0 :                 format!(
     236            0 :                     "Failed to upload file (write temp) to the local storage at '{temp_file_path}'",
     237            0 :                 )
     238         1766 :             })?;
     239              : 
     240         1766 :         if bytes_read < from_size_bytes {
     241            2 :             bail!("Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes");
     242         1764 :         }
     243         1764 :         // Check if there is any extra data after the given size.
     244         1764 :         let mut from = buffer_to_read.into_inner();
     245         1764 :         let extra_read = from.read(&mut [1]).await?;
     246         1764 :         ensure!(
     247         1764 :             extra_read == 0,
     248            4 :             "Provided stream was larger than expected: expected {from_size_bytes} bytes",
     249              :         );
     250              : 
     251         1760 :         destination.flush().await.with_context(|| {
     252            0 :             format!(
     253            0 :                 "Failed to upload (flush temp) file to the local storage at '{temp_file_path}'",
     254            0 :             )
     255         1760 :         })?;
     256              : 
     257         1760 :         fs::rename(temp_file_path, &target_file_path)
     258         1674 :             .await
     259         1759 :             .with_context(|| {
     260            0 :                 format!(
     261            0 :                     "Failed to upload (rename) file to the local storage at '{target_file_path}'",
     262            0 :                 )
     263         1759 :             })?;
     264              : 
     265         1759 :         if let Some(storage_metadata) = metadata {
     266              :             // FIXME: we must not be using metadata much, since this would forget the old metadata
     267              :             // for new writes? or perhaps metadata is sticky; could consider removing if it's never
     268              :             // used.
     269            2 :             let storage_metadata_path = storage_metadata_path(&target_file_path);
     270            2 :             fs::write(
     271            2 :                 &storage_metadata_path,
     272            2 :                 serde_json::to_string(&storage_metadata.0)
     273            2 :                     .context("Failed to serialize storage metadata as json")?,
     274              :             )
     275            2 :             .await
     276            2 :             .with_context(|| {
     277            0 :                 format!(
     278            0 :                     "Failed to write metadata to the local storage at '{storage_metadata_path}'",
     279            0 :                 )
     280            2 :             })?;
     281         1757 :         }
     282              : 
     283         1759 :         Ok(())
     284         1767 :     }
     285              : }
     286              : 
     287              : impl RemoteStorage for LocalFs {
     288          120 :     async fn list(
     289          120 :         &self,
     290          120 :         prefix: Option<&RemotePath>,
     291          120 :         mode: ListingMode,
     292          120 :         max_keys: Option<NonZeroU32>,
     293          120 :         cancel: &CancellationToken,
     294          120 :     ) -> Result<Listing, DownloadError> {
     295          120 :         let op = async {
     296          120 :             let mut result = Listing::default();
     297          120 : 
     298          120 :             if let ListingMode::NoDelimiter = mode {
     299            8 :                 let keys = self
     300            8 :                     .list_recursive(prefix)
     301           14 :                     .await
     302            8 :                     .map_err(DownloadError::Other)?;
     303              : 
     304            8 :                 result.keys = keys
     305            8 :                     .into_iter()
     306           30 :                     .filter(|k| {
     307           30 :                         let path = k.with_base(&self.storage_root);
     308           30 :                         !path.is_dir()
     309           30 :                     })
     310            8 :                     .collect();
     311              : 
     312            8 :                 if let Some(max_keys) = max_keys {
     313            0 :                     result.keys.truncate(max_keys.get() as usize);
     314            8 :                 }
     315              : 
     316            8 :                 return Ok(result);
     317          112 :             }
     318              : 
     319          112 :             let path = match prefix {
     320          110 :                 Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
     321            2 :                 None => Cow::Borrowed(&self.storage_root),
     322              :             };
     323              : 
     324          112 :             let prefixes_to_filter = get_all_files(path.as_ref(), false)
     325            8 :                 .await
     326          112 :                 .map_err(DownloadError::Other)?;
     327              : 
     328              :             // filter out empty directories to mirror s3 behavior.
     329          124 :             for prefix in prefixes_to_filter {
     330           12 :                 if prefix.is_dir()
     331           10 :                     && is_directory_empty(&prefix)
     332           10 :                         .await
     333           10 :                         .map_err(DownloadError::Other)?
     334              :                 {
     335            0 :                     continue;
     336           12 :                 }
     337           12 : 
     338           12 :                 let stripped = prefix
     339           12 :                     .strip_prefix(&self.storage_root)
     340           12 :                     .context("Failed to strip prefix")
     341           12 :                     .and_then(RemotePath::new)
     342           12 :                     .expect(
     343           12 :                         "We list files for storage root, hence should be able to remote the prefix",
     344           12 :                     );
     345           12 : 
     346           12 :                 if prefix.is_dir() {
     347           10 :                     result.prefixes.push(stripped);
     348           10 :                 } else {
     349            2 :                     result.keys.push(stripped);
     350            2 :                 }
     351              :             }
     352              : 
     353          112 :             Ok(result)
     354          120 :         };
     355              : 
     356          120 :         let timeout = async {
     357           57 :             tokio::time::sleep(self.timeout).await;
     358            0 :             Err(DownloadError::Timeout)
     359            0 :         };
     360              : 
     361          120 :         let cancelled = async {
     362           88 :             cancel.cancelled().await;
     363            0 :             Err(DownloadError::Cancelled)
     364            0 :         };
     365              : 
     366          152 :         tokio::select! {
     367          120 :             res = op => res,
     368            0 :             res = timeout => res,
     369            0 :             res = cancelled => res,
     370              :         }
     371          120 :     }
     372              : 
     373         1874 :     async fn upload(
     374         1874 :         &self,
     375         1874 :         data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
     376         1874 :         data_size_bytes: usize,
     377         1874 :         to: &RemotePath,
     378         1874 :         metadata: Option<StorageMetadata>,
     379         1874 :         cancel: &CancellationToken,
     380         1874 :     ) -> anyhow::Result<()> {
     381         1874 :         let cancel = cancel.child_token();
     382         1874 : 
     383         1874 :         let op = self.upload0(data, data_size_bytes, to, metadata, &cancel);
     384         1874 :         let mut op = std::pin::pin!(op);
     385              : 
     386              :         // race the upload0 to the timeout; if it goes over, do a graceful shutdown
     387        28158 :         let (res, timeout) = tokio::select! {
     388         1767 :             res = &mut op => (res, false),
     389              :             _ = tokio::time::sleep(self.timeout) => {
     390              :                 cancel.cancel();
     391              :                 (op.await, true)
     392              :             }
     393              :         };
     394              : 
     395            8 :         match res {
     396            8 :             Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
     397            0 :                 // we caused this cancel (or they happened simultaneously) -- swap it out to
     398            0 :                 // Timeout
     399            0 :                 Err(TimeoutOrCancel::Timeout.into())
     400              :             }
     401         1767 :             res => res,
     402              :         }
     403         1767 :     }
     404              : 
     405           56 :     async fn download(
     406           56 :         &self,
     407           56 :         from: &RemotePath,
     408           56 :         cancel: &CancellationToken,
     409           56 :     ) -> Result<Download, DownloadError> {
     410           56 :         let target_path = from.with_base(&self.storage_root);
     411              : 
     412           56 :         let file_metadata = file_metadata(&target_path).await?;
     413              : 
     414           40 :         let source = ReaderStream::new(
     415           40 :             fs::OpenOptions::new()
     416           40 :                 .read(true)
     417           40 :                 .open(&target_path)
     418           40 :                 .await
     419           40 :                 .with_context(|| {
     420            0 :                     format!("Failed to open source file {target_path:?} to use in the download")
     421           40 :                 })
     422           40 :                 .map_err(DownloadError::Other)?,
     423              :         );
     424              : 
     425           40 :         let metadata = self
     426           40 :             .read_storage_metadata(&target_path)
     427            2 :             .await
     428           40 :             .map_err(DownloadError::Other)?;
     429              : 
     430           40 :         let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
     431           40 :         let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
     432           40 : 
     433           40 :         let etag = mock_etag(&file_metadata);
     434           40 :         Ok(Download {
     435           40 :             metadata,
     436           40 :             last_modified: file_metadata
     437           40 :                 .modified()
     438           40 :                 .map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?,
     439           40 :             etag,
     440           40 :             download_stream: Box::pin(source),
     441              :         })
     442           56 :     }
     443              : 
     444           14 :     async fn download_byte_range(
     445           14 :         &self,
     446           14 :         from: &RemotePath,
     447           14 :         start_inclusive: u64,
     448           14 :         end_exclusive: Option<u64>,
     449           14 :         cancel: &CancellationToken,
     450           14 :     ) -> Result<Download, DownloadError> {
     451           14 :         if let Some(end_exclusive) = end_exclusive {
     452           10 :             if end_exclusive <= start_inclusive {
     453            2 :                 return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
     454            8 :             };
     455            8 :             if start_inclusive == end_exclusive.saturating_sub(1) {
     456            2 :                 return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
     457            6 :             }
     458            4 :         }
     459              : 
     460           10 :         let target_path = from.with_base(&self.storage_root);
     461           10 :         let file_metadata = file_metadata(&target_path).await?;
     462           10 :         let mut source = tokio::fs::OpenOptions::new()
     463           10 :             .read(true)
     464           10 :             .open(&target_path)
     465           10 :             .await
     466           10 :             .with_context(|| {
     467            0 :                 format!("Failed to open source file {target_path:?} to use in the download")
     468           10 :             })
     469           10 :             .map_err(DownloadError::Other)?;
     470              : 
     471           10 :         let len = source
     472           10 :             .metadata()
     473           10 :             .await
     474           10 :             .context("query file length")
     475           10 :             .map_err(DownloadError::Other)?
     476           10 :             .len();
     477           10 : 
     478           10 :         source
     479           10 :             .seek(io::SeekFrom::Start(start_inclusive))
     480           10 :             .await
     481           10 :             .context("Failed to seek to the range start in a local storage file")
     482           10 :             .map_err(DownloadError::Other)?;
     483              : 
     484           10 :         let metadata = self
     485           10 :             .read_storage_metadata(&target_path)
     486            2 :             .await
     487           10 :             .map_err(DownloadError::Other)?;
     488              : 
     489           10 :         let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive);
     490           10 :         let source = ReaderStream::new(source);
     491           10 : 
     492           10 :         let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
     493           10 :         let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
     494           10 : 
     495           10 :         let etag = mock_etag(&file_metadata);
     496           10 :         Ok(Download {
     497           10 :             metadata,
     498           10 :             last_modified: file_metadata
     499           10 :                 .modified()
     500           10 :                 .map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?,
     501           10 :             etag,
     502           10 :             download_stream: Box::pin(source),
     503              :         })
     504           14 :     }
     505              : 
     506           12 :     async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> {
     507           12 :         let file_path = path.with_base(&self.storage_root);
     508           12 :         match fs::remove_file(&file_path).await {
     509           10 :             Ok(()) => Ok(()),
     510              :             // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
     511              :             // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
     512              :             // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
     513            2 :             Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
     514            0 :             Err(e) => Err(anyhow::anyhow!(e)),
     515              :         }
     516           12 :     }
     517              : 
     518            6 :     async fn delete_objects<'a>(
     519            6 :         &self,
     520            6 :         paths: &'a [RemotePath],
     521            6 :         cancel: &CancellationToken,
     522            6 :     ) -> anyhow::Result<()> {
     523           12 :         for path in paths {
     524            6 :             self.delete(path, cancel).await?
     525              :         }
     526            6 :         Ok(())
     527            6 :     }
     528              : 
     529            0 :     async fn copy(
     530            0 :         &self,
     531            0 :         from: &RemotePath,
     532            0 :         to: &RemotePath,
     533            0 :         _cancel: &CancellationToken,
     534            0 :     ) -> anyhow::Result<()> {
     535            0 :         let from_path = from.with_base(&self.storage_root);
     536            0 :         let to_path = to.with_base(&self.storage_root);
     537            0 :         create_target_directory(&to_path).await?;
     538            0 :         fs::copy(&from_path, &to_path).await.with_context(|| {
     539            0 :             format!(
     540            0 :                 "Failed to copy file from '{from_path}' to '{to_path}'",
     541            0 :                 from_path = from_path,
     542            0 :                 to_path = to_path
     543            0 :             )
     544            0 :         })?;
     545            0 :         Ok(())
     546            0 :     }
     547              : 
     548            0 :     async fn time_travel_recover(
     549            0 :         &self,
     550            0 :         _prefix: Option<&RemotePath>,
     551            0 :         _timestamp: SystemTime,
     552            0 :         _done_if_after: SystemTime,
     553            0 :         _cancel: &CancellationToken,
     554            0 :     ) -> Result<(), TimeTravelError> {
     555            0 :         Err(TimeTravelError::Unimplemented)
     556            0 :     }
     557              : }
     558              : 
     559           52 : fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
     560           52 :     path_with_suffix_extension(original_path, "metadata")
     561           52 : }
     562              : 
     563          130 : fn get_all_files<'a, P>(
     564          130 :     directory_path: P,
     565          130 :     recursive: bool,
     566          130 : ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Utf8PathBuf>>> + Send + Sync + 'a>>
     567          130 : where
     568          130 :     P: AsRef<Utf8Path> + Send + Sync + 'a,
     569          130 : {
     570          130 :     Box::pin(async move {
     571          130 :         let directory_path = directory_path.as_ref();
     572          130 :         if directory_path.exists() {
     573           26 :             if directory_path.is_dir() {
     574           26 :                 let mut paths = Vec::new();
     575           26 :                 let mut dir_contents = fs::read_dir(directory_path).await?;
     576           56 :                 while let Some(dir_entry) = dir_contents.next_entry().await? {
     577           30 :                     let file_type = dir_entry.file_type().await?;
     578           30 :                     let entry_path =
     579           30 :                         Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| {
     580            0 :                             anyhow::Error::msg(format!(
     581            0 :                                 "non-Unicode path: {}",
     582            0 :                                 pb.to_string_lossy()
     583            0 :                             ))
     584           30 :                         })?;
     585           30 :                     if file_type.is_symlink() {
     586            0 :                         debug!("{entry_path:?} is a symlink, skipping")
     587           30 :                     } else if file_type.is_dir() {
     588           22 :                         if recursive {
     589           18 :                             paths.extend(get_all_files(&entry_path, true).await?.into_iter())
     590              :                         } else {
     591           10 :                             paths.push(entry_path)
     592              :                         }
     593            8 :                     } else {
     594            8 :                         paths.push(entry_path);
     595            8 :                     }
     596              :                 }
     597           26 :                 Ok(paths)
     598              :             } else {
     599            0 :                 bail!("Path {directory_path:?} is not a directory")
     600              :             }
     601              :         } else {
     602          104 :             Ok(Vec::new())
     603              :         }
     604          130 :     })
     605          130 : }
     606              : 
     607         1874 : async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
     608         1874 :     let target_dir = match target_file_path.parent() {
     609         1874 :         Some(parent_dir) => parent_dir,
     610            0 :         None => bail!("File path '{target_file_path}' has no parent directory"),
     611              :     };
     612         1874 :     if !target_dir.exists() {
     613          322 :         fs::create_dir_all(target_dir).await?;
     614         1552 :     }
     615         1873 :     Ok(())
     616         1873 : }
     617              : 
     618           66 : async fn file_metadata(file_path: &Utf8Path) -> Result<std::fs::Metadata, DownloadError> {
     619           66 :     tokio::fs::metadata(&file_path).await.map_err(|e| {
     620           16 :         if e.kind() == ErrorKind::NotFound {
     621           16 :             DownloadError::NotFound
     622              :         } else {
     623            0 :             DownloadError::BadInput(e.into())
     624              :         }
     625           66 :     })
     626           66 : }
     627              : 
     628              : // Use mtime as stand-in for ETag.  We could calculate a meaningful one by md5'ing the contents of files we
     629              : // read, but that's expensive and the local_fs test helper's whole reason for existence is to run small tests
     630              : // quickly, with less overhead than using a mock S3 server.
     631           50 : fn mock_etag(meta: &std::fs::Metadata) -> Etag {
     632           50 :     let mtime = meta.modified().expect("Filesystem mtime missing");
     633           50 :     format!("{}", mtime.duration_since(UNIX_EPOCH).unwrap().as_millis()).into()
     634           50 : }
     635              : 
     636              : #[cfg(test)]
     637              : mod fs_tests {
     638              :     use super::*;
     639              : 
     640              :     use camino_tempfile::tempdir;
     641              :     use std::{collections::HashMap, io::Write};
     642              : 
     643            6 :     async fn read_and_check_metadata(
     644            6 :         storage: &LocalFs,
     645            6 :         remote_storage_path: &RemotePath,
     646            6 :         expected_metadata: Option<&StorageMetadata>,
     647            6 :     ) -> anyhow::Result<String> {
     648            6 :         let cancel = CancellationToken::new();
     649            6 :         let download = storage
     650            6 :             .download(remote_storage_path, &cancel)
     651           14 :             .await
     652            6 :             .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
     653            6 :         ensure!(
     654            6 :             download.metadata.as_ref() == expected_metadata,
     655            0 :             "Unexpected metadata returned for the downloaded file"
     656              :         );
     657              : 
     658           12 :         let contents = aggregate(download.download_stream).await?;
     659              : 
     660            6 :         String::from_utf8(contents).map_err(anyhow::Error::new)
     661            6 :     }
     662              : 
     663              :     #[tokio::test]
     664            2 :     async fn upload_file() -> anyhow::Result<()> {
     665            2 :         let (storage, cancel) = create_storage()?;
     666            2 : 
     667           12 :         let target_path_1 = upload_dummy_file(&storage, "upload_1", None, &cancel).await?;
     668            2 :         assert_eq!(
     669            6 :             storage.list_all().await?,
     670            2 :             vec![target_path_1.clone()],
     671            2 :             "Should list a single file after first upload"
     672            2 :         );
     673            2 : 
     674           12 :         let target_path_2 = upload_dummy_file(&storage, "upload_2", None, &cancel).await?;
     675            2 :         assert_eq!(
     676            6 :             list_files_sorted(&storage).await?,
     677            2 :             vec![target_path_1.clone(), target_path_2.clone()],
     678            2 :             "Should list a two different files after second upload"
     679            2 :         );
     680            2 : 
     681            2 :         Ok(())
     682            2 :     }
     683              : 
     684              :     #[tokio::test]
     685            2 :     async fn upload_file_negatives() -> anyhow::Result<()> {
     686            2 :         let (storage, cancel) = create_storage()?;
     687            2 : 
     688            2 :         let id = RemotePath::new(Utf8Path::new("dummy"))?;
     689            2 :         let content = Bytes::from_static(b"12345");
     690            8 :         let content = move || futures::stream::once(futures::future::ready(Ok(content.clone())));
     691            2 : 
     692            2 :         // Check that you get an error if the size parameter doesn't match the actual
     693            2 :         // size of the stream.
     694            2 :         storage
     695            2 :             .upload(content(), 0, &id, None, &cancel)
     696            2 :             .await
     697            2 :             .expect_err("upload with zero size succeeded");
     698            2 :         storage
     699            2 :             .upload(content(), 4, &id, None, &cancel)
     700            4 :             .await
     701            2 :             .expect_err("upload with too short size succeeded");
     702            2 :         storage
     703            2 :             .upload(content(), 6, &id, None, &cancel)
     704            4 :             .await
     705            2 :             .expect_err("upload with too large size succeeded");
     706            2 : 
     707            2 :         // Correct size is 5, this should succeed.
     708            6 :         storage.upload(content(), 5, &id, None, &cancel).await?;
     709            2 : 
     710            2 :         Ok(())
     711            2 :     }
     712              : 
     713           20 :     fn create_storage() -> anyhow::Result<(LocalFs, CancellationToken)> {
     714           20 :         let storage_root = tempdir()?.path().to_path_buf();
     715           20 :         LocalFs::new(storage_root, Duration::from_secs(120)).map(|s| (s, CancellationToken::new()))
     716           20 :     }
     717              : 
     718              :     #[tokio::test]
     719            2 :     async fn download_file() -> anyhow::Result<()> {
     720            2 :         let (storage, cancel) = create_storage()?;
     721            2 :         let upload_name = "upload_1";
     722           11 :         let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
     723            2 : 
     724            8 :         let contents = read_and_check_metadata(&storage, &upload_target, None).await?;
     725            2 :         assert_eq!(
     726            2 :             dummy_contents(upload_name),
     727            2 :             contents,
     728            2 :             "We should upload and download the same contents"
     729            2 :         );
     730            2 : 
     731            2 :         let non_existing_path = "somewhere/else";
     732            2 :         match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?, &cancel).await {
     733            2 :             Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
     734            2 :             other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
     735            2 :         }
     736            2 :         Ok(())
     737            2 :     }
     738              : 
     739              :     #[tokio::test]
     740            2 :     async fn download_file_range_positive() -> anyhow::Result<()> {
     741            2 :         let (storage, cancel) = create_storage()?;
     742            2 :         let upload_name = "upload_1";
     743           12 :         let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
     744            2 : 
     745            2 :         let full_range_download_contents =
     746            8 :             read_and_check_metadata(&storage, &upload_target, None).await?;
     747            2 :         assert_eq!(
     748            2 :             dummy_contents(upload_name),
     749            2 :             full_range_download_contents,
     750            2 :             "Download full range should return the whole upload"
     751            2 :         );
     752            2 : 
     753            2 :         let uploaded_bytes = dummy_contents(upload_name).into_bytes();
     754            2 :         let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
     755            2 : 
     756            2 :         let first_part_download = storage
     757            2 :             .download_byte_range(
     758            2 :                 &upload_target,
     759            2 :                 0,
     760            2 :                 Some(first_part_local.len() as u64),
     761            2 :                 &cancel,
     762            2 :             )
     763            8 :             .await?;
     764            2 :         assert!(
     765            2 :             first_part_download.metadata.is_none(),
     766            2 :             "No metadata should be returned for no metadata upload"
     767            2 :         );
     768            2 : 
     769            2 :         let first_part_remote = aggregate(first_part_download.download_stream).await?;
     770            2 :         assert_eq!(
     771            2 :             first_part_local, first_part_remote,
     772            2 :             "First part bytes should be returned when requested"
     773            2 :         );
     774            2 : 
     775            2 :         let second_part_download = storage
     776            2 :             .download_byte_range(
     777            2 :                 &upload_target,
     778            2 :                 first_part_local.len() as u64,
     779            2 :                 Some((first_part_local.len() + second_part_local.len()) as u64),
     780            2 :                 &cancel,
     781            2 :             )
     782            8 :             .await?;
     783            2 :         assert!(
     784            2 :             second_part_download.metadata.is_none(),
     785            2 :             "No metadata should be returned for no metadata upload"
     786            2 :         );
     787            2 : 
     788            2 :         let second_part_remote = aggregate(second_part_download.download_stream).await?;
     789            2 :         assert_eq!(
     790            2 :             second_part_local, second_part_remote,
     791            2 :             "Second part bytes should be returned when requested"
     792            2 :         );
     793            2 : 
     794            2 :         let suffix_bytes = storage
     795            2 :             .download_byte_range(&upload_target, 13, None, &cancel)
     796            8 :             .await?
     797            2 :             .download_stream;
     798            2 :         let suffix_bytes = aggregate(suffix_bytes).await?;
     799            2 :         let suffix = std::str::from_utf8(&suffix_bytes)?;
     800            2 :         assert_eq!(upload_name, suffix);
     801            2 : 
     802            2 :         let all_bytes = storage
     803            2 :             .download_byte_range(&upload_target, 0, None, &cancel)
     804            8 :             .await?
     805            2 :             .download_stream;
     806            2 :         let all_bytes = aggregate(all_bytes).await?;
     807            2 :         let all_bytes = std::str::from_utf8(&all_bytes)?;
     808            2 :         assert_eq!(dummy_contents("upload_1"), all_bytes);
     809            2 : 
     810            2 :         Ok(())
     811            2 :     }
     812              : 
     813              :     #[tokio::test]
     814            2 :     async fn download_file_range_negative() -> anyhow::Result<()> {
     815            2 :         let (storage, cancel) = create_storage()?;
     816            2 :         let upload_name = "upload_1";
     817           12 :         let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
     818            2 : 
     819            2 :         let start = 1_000_000_000;
     820            2 :         let end = start + 1;
     821            2 :         match storage
     822            2 :             .download_byte_range(
     823            2 :                 &upload_target,
     824            2 :                 start,
     825            2 :                 Some(end), // exclusive end
     826            2 :                 &cancel,
     827            2 :             )
     828            2 :             .await
     829            2 :         {
     830            2 :             Ok(_) => panic!("Should not allow downloading wrong ranges"),
     831            2 :             Err(e) => {
     832            2 :                 let error_string = e.to_string();
     833            2 :                 assert!(error_string.contains("zero bytes"));
     834            2 :                 assert!(error_string.contains(&start.to_string()));
     835            2 :                 assert!(error_string.contains(&end.to_string()));
     836            2 :             }
     837            2 :         }
     838            2 : 
     839            2 :         let start = 10000;
     840            2 :         let end = 234;
     841            2 :         assert!(start > end, "Should test an incorrect range");
     842            2 :         match storage
     843            2 :             .download_byte_range(&upload_target, start, Some(end), &cancel)
     844            2 :             .await
     845            2 :         {
     846            2 :             Ok(_) => panic!("Should not allow downloading wrong ranges"),
     847            2 :             Err(e) => {
     848            2 :                 let error_string = e.to_string();
     849            2 :                 assert!(error_string.contains("Invalid range"));
     850            2 :                 assert!(error_string.contains(&start.to_string()));
     851            2 :                 assert!(error_string.contains(&end.to_string()));
     852            2 :             }
     853            2 :         }
     854            2 : 
     855            2 :         Ok(())
     856            2 :     }
     857              : 
     858              :     #[tokio::test]
     859            2 :     async fn delete_file() -> anyhow::Result<()> {
     860            2 :         let (storage, cancel) = create_storage()?;
     861            2 :         let upload_name = "upload_1";
     862           12 :         let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
     863            2 : 
     864            2 :         storage.delete(&upload_target, &cancel).await?;
     865            6 :         assert!(storage.list_all().await?.is_empty());
     866            2 : 
     867            2 :         storage
     868            2 :             .delete(&upload_target, &cancel)
     869            2 :             .await
     870            2 :             .expect("Should allow deleting non-existing storage files");
     871            2 : 
     872            2 :         Ok(())
     873            2 :     }
     874              : 
     875              :     #[tokio::test]
     876            2 :     async fn file_with_metadata() -> anyhow::Result<()> {
     877            2 :         let (storage, cancel) = create_storage()?;
     878            2 :         let upload_name = "upload_1";
     879            2 :         let metadata = StorageMetadata(HashMap::from([
     880            2 :             ("one".to_string(), "1".to_string()),
     881            2 :             ("two".to_string(), "2".to_string()),
     882            2 :         ]));
     883            2 :         let upload_target =
     884           14 :             upload_dummy_file(&storage, upload_name, Some(metadata.clone()), &cancel).await?;
     885            2 : 
     886            2 :         let full_range_download_contents =
     887           10 :             read_and_check_metadata(&storage, &upload_target, Some(&metadata)).await?;
     888            2 :         assert_eq!(
     889            2 :             dummy_contents(upload_name),
     890            2 :             full_range_download_contents,
     891            2 :             "We should upload and download the same contents"
     892            2 :         );
     893            2 : 
     894            2 :         let uploaded_bytes = dummy_contents(upload_name).into_bytes();
     895            2 :         let (first_part_local, _) = uploaded_bytes.split_at(3);
     896            2 : 
     897            2 :         let partial_download_with_metadata = storage
     898            2 :             .download_byte_range(
     899            2 :                 &upload_target,
     900            2 :                 0,
     901            2 :                 Some(first_part_local.len() as u64),
     902            2 :                 &cancel,
     903            2 :             )
     904           10 :             .await?;
     905            2 :         let first_part_remote = aggregate(partial_download_with_metadata.download_stream).await?;
     906            2 :         assert_eq!(
     907            2 :             first_part_local,
     908            2 :             first_part_remote.as_slice(),
     909            2 :             "First part bytes should be returned when requested"
     910            2 :         );
     911            2 : 
     912            2 :         assert_eq!(
     913            2 :             partial_download_with_metadata.metadata,
     914            2 :             Some(metadata),
     915            2 :             "We should get the same metadata back for partial download"
     916            2 :         );
     917            2 : 
     918            2 :         Ok(())
     919            2 :     }
     920              : 
     921              :     #[tokio::test]
     922            2 :     async fn list() -> anyhow::Result<()> {
     923            2 :         // No delimiter: should recursively list everything
     924            2 :         let (storage, cancel) = create_storage()?;
     925           12 :         let child = upload_dummy_file(&storage, "grandparent/parent/child", None, &cancel).await?;
     926           12 :         let uncle = upload_dummy_file(&storage, "grandparent/uncle", None, &cancel).await?;
     927            2 : 
     928            2 :         let listing = storage
     929            2 :             .list(None, ListingMode::NoDelimiter, None, &cancel)
     930            2 :             .await?;
     931            2 :         assert!(listing.prefixes.is_empty());
     932            2 :         assert_eq!(listing.keys, [uncle.clone(), child.clone()].to_vec());
     933            2 : 
     934            2 :         // Delimiter: should only go one deep
     935            2 :         let listing = storage
     936            2 :             .list(None, ListingMode::WithDelimiter, None, &cancel)
     937            4 :             .await?;
     938            2 : 
     939            2 :         assert_eq!(
     940            2 :             listing.prefixes,
     941            2 :             [RemotePath::from_string("timelines").unwrap()].to_vec()
     942            2 :         );
     943            2 :         assert!(listing.keys.is_empty());
     944            2 : 
     945            2 :         // Delimiter & prefix
     946            2 :         let listing = storage
     947            2 :             .list(
     948            2 :                 Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
     949            2 :                 ListingMode::WithDelimiter,
     950            2 :                 None,
     951            2 :                 &cancel,
     952            2 :             )
     953            4 :             .await?;
     954            2 :         assert_eq!(
     955            2 :             listing.prefixes,
     956            2 :             [RemotePath::from_string("timelines/some_timeline/grandparent/parent").unwrap()]
     957            2 :                 .to_vec()
     958            2 :         );
     959            2 :         assert_eq!(listing.keys, [uncle.clone()].to_vec());
     960            2 : 
     961            2 :         Ok(())
     962            2 :     }
     963              : 
     964              :     #[tokio::test]
     965            2 :     async fn overwrite_shorter_file() -> anyhow::Result<()> {
     966            2 :         let (storage, cancel) = create_storage()?;
     967            2 : 
     968            2 :         let path = RemotePath::new("does/not/matter/file".into())?;
     969            2 : 
     970            2 :         let body = Bytes::from_static(b"long file contents is long");
     971            2 :         {
     972            2 :             let len = body.len();
     973            2 :             let body =
     974            2 :                 futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
     975            8 :             storage.upload(body, len, &path, None, &cancel).await?;
     976            2 :         }
     977            2 : 
     978            4 :         let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
     979            2 :         assert_eq!(body, read);
     980            2 : 
     981            2 :         let shorter = Bytes::from_static(b"shorter body");
     982            2 :         {
     983            2 :             let len = shorter.len();
     984            2 :             let body =
     985            2 :                 futures::stream::once(futures::future::ready(std::io::Result::Ok(shorter.clone())));
     986            6 :             storage.upload(body, len, &path, None, &cancel).await?;
     987            2 :         }
     988            2 : 
     989            4 :         let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
     990            2 :         assert_eq!(shorter, read);
     991            2 :         Ok(())
     992            2 :     }
     993              : 
     994              :     #[tokio::test]
     995            2 :     async fn cancelled_upload_can_later_be_retried() -> anyhow::Result<()> {
     996            2 :         let (storage, cancel) = create_storage()?;
     997            2 : 
     998            2 :         let path = RemotePath::new("does/not/matter/file".into())?;
     999            2 : 
    1000            2 :         let body = Bytes::from_static(b"long file contents is long");
    1001            2 :         {
    1002            2 :             let len = body.len();
    1003            2 :             let body =
    1004            2 :                 futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
    1005            2 :             let cancel = cancel.child_token();
    1006            2 :             cancel.cancel();
    1007            2 :             let e = storage
    1008            2 :                 .upload(body, len, &path, None, &cancel)
    1009            6 :                 .await
    1010            2 :                 .unwrap_err();
    1011            2 : 
    1012            2 :             assert!(TimeoutOrCancel::caused_by_cancel(&e));
    1013            2 :         }
    1014            2 : 
    1015            2 :         {
    1016            2 :             let len = body.len();
    1017            2 :             let body =
    1018            2 :                 futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
    1019            5 :             storage.upload(body, len, &path, None, &cancel).await?;
    1020            2 :         }
    1021            2 : 
    1022            4 :         let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
    1023            2 :         assert_eq!(body, read);
    1024            2 : 
    1025            2 :         Ok(())
    1026            2 :     }
    1027              : 
    1028           18 :     async fn upload_dummy_file(
    1029           18 :         storage: &LocalFs,
    1030           18 :         name: &str,
    1031           18 :         metadata: Option<StorageMetadata>,
    1032           18 :         cancel: &CancellationToken,
    1033           18 :     ) -> anyhow::Result<RemotePath> {
    1034           18 :         let from_path = storage
    1035           18 :             .storage_root
    1036           18 :             .join("timelines")
    1037           18 :             .join("some_timeline")
    1038           18 :             .join(name);
    1039           18 :         let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
    1040              : 
    1041           18 :         let relative_path = from_path
    1042           18 :             .strip_prefix(&storage.storage_root)
    1043           18 :             .context("Failed to strip storage root prefix")
    1044           18 :             .and_then(RemotePath::new)
    1045           18 :             .with_context(|| {
    1046            0 :                 format!(
    1047            0 :                     "Failed to resolve remote part of path {:?} for base {:?}",
    1048            0 :                     from_path, storage.storage_root
    1049            0 :                 )
    1050           18 :             })?;
    1051              : 
    1052           18 :         let file = tokio_util::io::ReaderStream::new(file);
    1053           18 : 
    1054           18 :         storage
    1055           18 :             .upload(file, size, &relative_path, metadata, cancel)
    1056           91 :             .await?;
    1057           18 :         Ok(relative_path)
    1058           18 :     }
    1059              : 
    1060           18 :     async fn create_file_for_upload(
    1061           18 :         path: &Utf8Path,
    1062           18 :         contents: &str,
    1063           18 :     ) -> anyhow::Result<(fs::File, usize)> {
    1064           18 :         std::fs::create_dir_all(path.parent().unwrap())?;
    1065           18 :         let mut file_for_writing = std::fs::OpenOptions::new()
    1066           18 :             .write(true)
    1067           18 :             .create_new(true)
    1068           18 :             .open(path)?;
    1069           18 :         write!(file_for_writing, "{}", contents)?;
    1070           18 :         drop(file_for_writing);
    1071           18 :         let file_size = path.metadata()?.len() as usize;
    1072           18 :         Ok((
    1073           18 :             fs::OpenOptions::new().read(true).open(&path).await?,
    1074           18 :             file_size,
    1075              :         ))
    1076           18 :     }
    1077              : 
    1078           30 :     fn dummy_contents(name: &str) -> String {
    1079           30 :         format!("contents for {name}")
    1080           30 :     }
    1081              : 
    1082            2 :     async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
    1083            6 :         let mut files = storage.list_all().await?;
    1084            2 :         files.sort_by(|a, b| a.0.cmp(&b.0));
    1085            2 :         Ok(files)
    1086            2 :     }
    1087              : 
    1088           22 :     async fn aggregate(
    1089           22 :         stream: impl Stream<Item = std::io::Result<Bytes>>,
    1090           22 :     ) -> anyhow::Result<Vec<u8>> {
    1091           22 :         use futures::stream::StreamExt;
    1092           22 :         let mut out = Vec::new();
    1093           22 :         let mut stream = std::pin::pin!(stream);
    1094           44 :         while let Some(res) = stream.next().await {
    1095           22 :             out.extend_from_slice(&res?[..]);
    1096              :         }
    1097           22 :         Ok(out)
    1098           22 :     }
    1099              : }
        

Generated by: LCOV version 2.1-beta