LCOV - code coverage report
Current view: top level - libs/remote_storage/src - local_fs.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 89.5 % 992 888
Test Date: 2025-04-24 20:31:15 Functions: 59.3 % 135 80

            Line data    Source code
       1              : //! Local filesystem acting as a remote storage.
       2              : //! Multiple API users can use the same "storage" of this kind by using different storage roots.
       3              : //!
       4              : //! This storage used in tests, but can also be used in cases when a certain persistent
       5              : //! volume is mounted to the local FS.
       6              : 
       7              : use std::collections::HashSet;
       8              : use std::io::ErrorKind;
       9              : use std::num::NonZeroU32;
      10              : use std::time::{Duration, SystemTime, UNIX_EPOCH};
      11              : 
      12              : use anyhow::{Context, bail, ensure};
      13              : use bytes::Bytes;
      14              : use camino::{Utf8Path, Utf8PathBuf};
      15              : use futures::stream::Stream;
      16              : use tokio::fs;
      17              : use tokio::io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
      18              : use tokio_util::io::ReaderStream;
      19              : use tokio_util::sync::CancellationToken;
      20              : use utils::crashsafe::path_with_suffix_extension;
      21              : 
      22              : use super::{RemoteStorage, StorageMetadata};
      23              : use crate::{
      24              :     Download, DownloadError, DownloadOpts, Etag, Listing, ListingMode, ListingObject,
      25              :     REMOTE_STORAGE_PREFIX_SEPARATOR, RemotePath, TimeTravelError, TimeoutOrCancel,
      26              : };
      27              : 
      28              : const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
      29              : 
      30              : #[derive(Debug, Clone)]
      31              : pub struct LocalFs {
      32              :     storage_root: Utf8PathBuf,
      33              :     timeout: Duration,
      34              : }
      35              : 
      36              : impl LocalFs {
      37              :     /// Attempts to create local FS storage, along with its root directory.
      38              :     /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
      39         1501 :     pub fn new(mut storage_root: Utf8PathBuf, timeout: Duration) -> anyhow::Result<Self> {
      40         1501 :         if !storage_root.exists() {
      41           33 :             std::fs::create_dir_all(&storage_root).with_context(|| {
      42            0 :                 format!("Failed to create all directories in the given root path {storage_root:?}")
      43           33 :             })?;
      44         1468 :         }
      45         1501 :         if !storage_root.is_absolute() {
      46         1392 :             storage_root = storage_root.canonicalize_utf8().with_context(|| {
      47            0 :                 format!("Failed to represent path {storage_root:?} as an absolute path")
      48         1392 :             })?;
      49          109 :         }
      50              : 
      51         1501 :         Ok(Self {
      52         1501 :             storage_root,
      53         1501 :             timeout,
      54         1501 :         })
      55         1501 :     }
      56              : 
      57              :     // mirrors S3Bucket::s3_object_to_relative_path
      58          405 :     fn local_file_to_relative_path(&self, key: Utf8PathBuf) -> RemotePath {
      59          405 :         let relative_path = key
      60          405 :             .strip_prefix(&self.storage_root)
      61          405 :             .expect("relative path must contain storage_root as prefix");
      62          405 :         RemotePath(relative_path.into())
      63          405 :     }
      64              : 
      65          326 :     async fn read_storage_metadata(
      66          326 :         &self,
      67          326 :         file_path: &Utf8Path,
      68          326 :     ) -> anyhow::Result<Option<StorageMetadata>> {
      69          326 :         let metadata_path = storage_metadata_path(file_path);
      70          326 :         if metadata_path.exists() && metadata_path.is_file() {
      71            6 :             let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
      72            0 :                 format!("Failed to read metadata from the local storage at '{metadata_path}'")
      73            6 :             })?;
      74              : 
      75            6 :             serde_json::from_str(&metadata_string)
      76            6 :                 .with_context(|| {
      77            0 :                     format!(
      78            0 :                         "Failed to deserialize metadata from the local storage at '{metadata_path}'",
      79            0 :                     )
      80            6 :                 })
      81            6 :                 .map(|metadata| Some(StorageMetadata(metadata)))
      82              :         } else {
      83          320 :             Ok(None)
      84              :         }
      85          326 :     }
      86              : 
      87              :     #[cfg(test)]
      88            9 :     async fn list_all(&self) -> anyhow::Result<Vec<RemotePath>> {
      89              :         use std::future::Future;
      90              :         use std::pin::Pin;
      91           27 :         fn get_all_files<'a, P>(
      92           27 :             directory_path: P,
      93           27 :         ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Utf8PathBuf>>> + Send + Sync + 'a>>
      94           27 :         where
      95           27 :             P: AsRef<Utf8Path> + Send + Sync + 'a,
      96           27 :         {
      97           27 :             Box::pin(async move {
      98           27 :                 let directory_path = directory_path.as_ref();
      99           27 :                 if directory_path.exists() {
     100           27 :                     if directory_path.is_dir() {
     101           27 :                         let mut paths = Vec::new();
     102           27 :                         let mut dir_contents = fs::read_dir(directory_path).await?;
     103           54 :                         while let Some(dir_entry) = dir_contents.next_entry().await? {
     104           27 :                             let file_type = dir_entry.file_type().await?;
     105           27 :                             let entry_path =
     106           27 :                                 Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| {
     107            0 :                                     anyhow::Error::msg(format!(
     108            0 :                                         "non-Unicode path: {}",
     109            0 :                                         pb.to_string_lossy()
     110            0 :                                     ))
     111           27 :                                 })?;
     112           27 :                             if file_type.is_symlink() {
     113            0 :                                 tracing::debug!("{entry_path:?} is a symlink, skipping")
     114           27 :                             } else if file_type.is_dir() {
     115           18 :                                 paths.extend(get_all_files(&entry_path).await?.into_iter())
     116            9 :                             } else {
     117            9 :                                 paths.push(entry_path);
     118            9 :                             }
     119              :                         }
     120           27 :                         Ok(paths)
     121              :                     } else {
     122            0 :                         bail!("Path {directory_path:?} is not a directory")
     123              :                     }
     124              :                 } else {
     125            0 :                     Ok(Vec::new())
     126              :                 }
     127           27 :             })
     128           27 :         }
     129              : 
     130            9 :         Ok(get_all_files(&self.storage_root)
     131            9 :             .await?
     132            9 :             .into_iter()
     133            9 :             .map(|path| {
     134            9 :                 path.strip_prefix(&self.storage_root)
     135            9 :                     .context("Failed to strip storage root prefix")
     136            9 :                     .and_then(RemotePath::new)
     137            9 :                     .expect(
     138            9 :                         "We list files for storage root, hence should be able to remote the prefix",
     139            9 :                     )
     140            9 :             })
     141            9 :             .collect())
     142            9 :     }
     143              : 
     144              :     // recursively lists all files in a directory,
     145              :     // mirroring the `list_files` for `s3_bucket`
     146         2808 :     async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
     147         2808 :         let full_path = match folder {
     148         2802 :             Some(folder) => folder.with_base(&self.storage_root),
     149            6 :             None => self.storage_root.clone(),
     150              :         };
     151              : 
     152              :         // If we were given a directory, we may use it as our starting point.
     153              :         // Otherwise, we must go up to the first ancestor dir that exists.  This is because
     154              :         // S3 object list prefixes can be arbitrary strings, but when reading
     155              :         // the local filesystem we need a directory to start calling read_dir on.
     156         2808 :         let mut initial_dir = full_path.clone();
     157         2808 : 
     158         2808 :         // If there's no trailing slash, we have to start looking from one above: even if
     159         2808 :         // `initial_dir` is a directory, we should still list any prefixes in the parent
     160         2808 :         // that start with the same string.
     161         2808 :         if !full_path.to_string().ends_with('/') {
     162         1410 :             initial_dir.pop();
     163         1410 :         }
     164              : 
     165              :         loop {
     166              :             // Did we make it to the root?
     167         9600 :             if initial_dir.parent().is_none() {
     168            0 :                 anyhow::bail!("list_files: failed to find valid ancestor dir for {full_path}");
     169         9600 :             }
     170         9600 : 
     171         9600 :             match fs::metadata(initial_dir.clone()).await {
     172         2808 :                 Ok(meta) if meta.is_dir() => {
     173         2808 :                     // We found a directory, break
     174         2808 :                     break;
     175              :                 }
     176            0 :                 Ok(_meta) => {
     177            0 :                     // It's not a directory: strip back to the parent
     178            0 :                     initial_dir.pop();
     179            0 :                 }
     180         6792 :                 Err(e) if e.kind() == ErrorKind::NotFound => {
     181         6792 :                     // It's not a file that exists: strip the prefix back to the parent directory
     182         6792 :                     initial_dir.pop();
     183         6792 :                 }
     184            0 :                 Err(e) => {
     185            0 :                     // Unexpected I/O error
     186            0 :                     anyhow::bail!(e)
     187              :                 }
     188              :             }
     189              :         }
     190              :         // Note that Utf8PathBuf starts_with only considers full path segments, but
     191              :         // object prefixes are arbitrary strings, so we need the strings for doing
     192              :         // starts_with later.
     193         2808 :         let prefix = full_path.as_str();
     194         2808 : 
     195         2808 :         let mut files = vec![];
     196         2808 :         let mut directory_queue = vec![initial_dir];
     197         5720 :         while let Some(cur_folder) = directory_queue.pop() {
     198         2912 :             let mut entries = cur_folder.read_dir_utf8()?;
     199         3542 :             while let Some(Ok(entry)) = entries.next() {
     200          630 :                 let file_name = entry.file_name();
     201          630 :                 let full_file_name = cur_folder.join(file_name);
     202          630 :                 if full_file_name.as_str().starts_with(prefix) {
     203          405 :                     let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
     204          405 :                     files.push(file_remote_path);
     205          405 :                     if full_file_name.is_dir() {
     206          104 :                         directory_queue.push(full_file_name);
     207          301 :                     }
     208          225 :                 }
     209              :             }
     210              :         }
     211              : 
     212         2808 :         Ok(files)
     213         2808 :     }
     214              : 
     215        20008 :     async fn upload0(
     216        20008 :         &self,
     217        20008 :         data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
     218        20008 :         data_size_bytes: usize,
     219        20008 :         to: &RemotePath,
     220        20008 :         metadata: Option<StorageMetadata>,
     221        20008 :         cancel: &CancellationToken,
     222        20008 :     ) -> anyhow::Result<()> {
     223        20008 :         let target_file_path = to.with_base(&self.storage_root);
     224        20008 :         create_target_directory(&target_file_path).await?;
     225              :         // We need this dance with sort of durable rename (without fsyncs)
     226              :         // to prevent partial uploads. This was really hit when pageserver shutdown
     227              :         // cancelled the upload and partial file was left on the fs
     228              :         // NOTE: Because temp file suffix always the same this operation is racy.
     229              :         // Two concurrent operations can lead to the following sequence:
     230              :         // T1: write(temp)
     231              :         // T2: write(temp) -> overwrites the content
     232              :         // T1: rename(temp, dst) -> succeeds
     233              :         // T2: rename(temp, dst) -> fails, temp no longet exists
     234              :         // This can be solved by supplying unique temp suffix every time, but this situation
     235              :         // is not normal in the first place, the error can help (and helped at least once)
     236              :         // to discover bugs in upper level synchronization.
     237        20004 :         let temp_file_path =
     238        20004 :             path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
     239        19938 :         let mut destination = io::BufWriter::new(
     240        20004 :             fs::OpenOptions::new()
     241        20004 :                 .write(true)
     242        20004 :                 .create(true)
     243        20004 :                 .truncate(true)
     244        20004 :                 .open(&temp_file_path)
     245        20004 :                 .await
     246        19938 :                 .with_context(|| {
     247            0 :                     format!("Failed to open target fs destination at '{target_file_path}'")
     248        19938 :                 })?,
     249              :         );
     250              : 
     251        19938 :         let from_size_bytes = data_size_bytes as u64;
     252        19938 :         let data = tokio_util::io::StreamReader::new(data);
     253        19938 :         let data = std::pin::pin!(data);
     254        19938 :         let mut buffer_to_read = data.take(from_size_bytes);
     255        19938 : 
     256        19938 :         // alternatively we could just write the bytes to a file, but local_fs is a testing utility
     257        19938 :         let copy = io::copy_buf(&mut buffer_to_read, &mut destination);
     258              : 
     259        19938 :         let bytes_read = tokio::select! {
     260              :             biased;
     261        19938 :             _ = cancel.cancelled() => {
     262            3 :                 let file = destination.into_inner();
     263            3 :                 // wait for the inflight operation(s) to complete so that there could be a next
     264            3 :                 // attempt right away and our writes are not directed to their file.
     265            3 :                 file.into_std().await;
     266              : 
     267              :                 // TODO: leave the temp or not? leaving is probably less racy. enabled truncate at
     268              :                 // least.
     269            3 :                 fs::remove_file(temp_file_path).await.context("remove temp_file_path after cancellation or timeout")?;
     270            3 :                 return Err(TimeoutOrCancel::Cancel.into());
     271              :             }
     272        19938 :             read = copy => read,
     273              :         };
     274              : 
     275        19850 :         let bytes_read =
     276        19850 :             bytes_read.with_context(|| {
     277            0 :                 format!(
     278            0 :                     "Failed to upload file (write temp) to the local storage at '{temp_file_path}'",
     279            0 :                 )
     280        19850 :             })?;
     281              : 
     282        19850 :         if bytes_read < from_size_bytes {
     283            3 :             bail!(
     284            3 :                 "Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes"
     285            3 :             );
     286        19847 :         }
     287        19847 :         // Check if there is any extra data after the given size.
     288        19847 :         let mut from = buffer_to_read.into_inner();
     289        19847 :         let extra_read = from.read(&mut [1]).await?;
     290        19830 :         ensure!(
     291        19830 :             extra_read == 0,
     292            6 :             "Provided stream was larger than expected: expected {from_size_bytes} bytes",
     293              :         );
     294              : 
     295        19824 :         destination.flush().await.with_context(|| {
     296            0 :             format!(
     297            0 :                 "Failed to upload (flush temp) file to the local storage at '{temp_file_path}'",
     298            0 :             )
     299        19824 :         })?;
     300              : 
     301        19824 :         fs::rename(temp_file_path, &target_file_path)
     302        19824 :             .await
     303        19771 :             .with_context(|| {
     304            0 :                 format!(
     305            0 :                     "Failed to upload (rename) file to the local storage at '{target_file_path}'",
     306            0 :                 )
     307        19771 :             })?;
     308              : 
     309        19771 :         if let Some(storage_metadata) = metadata {
     310              :             // FIXME: we must not be using metadata much, since this would forget the old metadata
     311              :             // for new writes? or perhaps metadata is sticky; could consider removing if it's never
     312              :             // used.
     313            3 :             let storage_metadata_path = storage_metadata_path(&target_file_path);
     314            3 :             fs::write(
     315            3 :                 &storage_metadata_path,
     316            3 :                 serde_json::to_string(&storage_metadata.0)
     317            3 :                     .context("Failed to serialize storage metadata as json")?,
     318              :             )
     319            3 :             .await
     320            3 :             .with_context(|| {
     321            0 :                 format!(
     322            0 :                     "Failed to write metadata to the local storage at '{storage_metadata_path}'",
     323            0 :                 )
     324            3 :             })?;
     325           45 :         }
     326              : 
     327        19771 :         Ok(())
     328           60 :     }
     329              : }
     330              : 
     331              : impl RemoteStorage for LocalFs {
     332            6 :     fn list_streaming(
     333            6 :         &self,
     334            6 :         prefix: Option<&RemotePath>,
     335            6 :         mode: ListingMode,
     336            6 :         max_keys: Option<NonZeroU32>,
     337            6 :         cancel: &CancellationToken,
     338            6 :     ) -> impl Stream<Item = Result<Listing, DownloadError>> {
     339            6 :         let listing = self.list(prefix, mode, max_keys, cancel);
     340            6 :         futures::stream::once(listing)
     341            6 :     }
     342              : 
     343         2808 :     async fn list(
     344         2808 :         &self,
     345         2808 :         prefix: Option<&RemotePath>,
     346         2808 :         mode: ListingMode,
     347         2808 :         max_keys: Option<NonZeroU32>,
     348         2808 :         cancel: &CancellationToken,
     349         2808 :     ) -> Result<Listing, DownloadError> {
     350         2808 :         let op = async {
     351         2808 :             let mut result = Listing::default();
     352              : 
     353              :             // Filter out directories: in S3 directories don't exist, only the keys within them do.
     354         2808 :             let keys = self
     355         2808 :                 .list_recursive(prefix)
     356         2808 :                 .await
     357         2808 :                 .map_err(DownloadError::Other)?;
     358         2808 :             let mut objects = Vec::with_capacity(keys.len());
     359         3213 :             for key in keys {
     360          405 :                 let path = key.with_base(&self.storage_root);
     361          405 :                 let metadata = file_metadata(&path).await;
     362            0 :                 if let Err(DownloadError::NotFound) = metadata {
     363              :                     // Race: if the file is deleted between listing and metadata check, ignore it.
     364            0 :                     continue;
     365          405 :                 }
     366          405 :                 let metadata = metadata?;
     367          405 :                 if metadata.is_dir() {
     368          104 :                     continue;
     369          301 :                 }
     370          301 :                 objects.push(ListingObject {
     371          301 :                     key: key.clone(),
     372          301 :                     last_modified: metadata.modified()?,
     373          301 :                     size: metadata.len(),
     374              :                 });
     375              :             }
     376         2808 :             let objects = objects;
     377         2808 : 
     378         2808 :             if let ListingMode::NoDelimiter = mode {
     379         1401 :                 result.keys = objects;
     380         1401 :             } else {
     381         1407 :                 let mut prefixes = HashSet::new();
     382         1581 :                 for object in objects {
     383          174 :                     let key = object.key;
     384              :                     // If the part after the prefix includes a "/", take only the first part and put it in `prefixes`.
     385          174 :                     let relative_key = if let Some(prefix) = prefix {
     386          165 :                         let mut prefix = prefix.clone();
     387          165 :                         // We only strip the dirname of the prefix, so that when we strip it from the start of keys we
     388          165 :                         // end up with full file/dir names.
     389          165 :                         let prefix_full_local_path = prefix.with_base(&self.storage_root);
     390          165 :                         let has_slash = prefix.0.to_string().ends_with('/');
     391          165 :                         let strip_prefix = if prefix_full_local_path.is_dir() && has_slash {
     392          141 :                             prefix
     393              :                         } else {
     394           24 :                             prefix.0.pop();
     395           24 :                             prefix
     396              :                         };
     397              : 
     398          165 :                         RemotePath::new(key.strip_prefix(&strip_prefix).unwrap()).unwrap()
     399              :                     } else {
     400            9 :                         key
     401              :                     };
     402              : 
     403          174 :                     let relative_key = format!("{}", relative_key);
     404          174 :                     if relative_key.contains(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     405          171 :                         let first_part = relative_key
     406          171 :                             .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     407          171 :                             .next()
     408          171 :                             .unwrap()
     409          171 :                             .to_owned();
     410          171 :                         prefixes.insert(first_part);
     411          171 :                     } else {
     412            3 :                         result.keys.push(ListingObject {
     413            3 :                             key: RemotePath::from_string(&relative_key).unwrap(),
     414            3 :                             last_modified: object.last_modified,
     415            3 :                             size: object.size,
     416            3 :                         });
     417            3 :                     }
     418              :                 }
     419         1407 :                 result.prefixes = prefixes
     420         1407 :                     .into_iter()
     421         1407 :                     .map(|s| RemotePath::from_string(&s).unwrap())
     422         1407 :                     .collect();
     423         1407 :             }
     424              : 
     425         2808 :             if let Some(max_keys) = max_keys {
     426            0 :                 result.keys.truncate(max_keys.get() as usize);
     427         2808 :             }
     428         2808 :             Ok(result)
     429         2808 :         };
     430              : 
     431         2808 :         let timeout = async {
     432         2757 :             tokio::time::sleep(self.timeout).await;
     433            0 :             Err(DownloadError::Timeout)
     434            0 :         };
     435              : 
     436         2808 :         let cancelled = async {
     437         2776 :             cancel.cancelled().await;
     438            0 :             Err(DownloadError::Cancelled)
     439            0 :         };
     440              : 
     441         2808 :         tokio::select! {
     442         2808 :             res = op => res,
     443         2808 :             res = timeout => res,
     444         2808 :             res = cancelled => res,
     445              :         }
     446         2808 :     }
     447              : 
     448            0 :     async fn list_versions(
     449            0 :         &self,
     450            0 :         _prefix: Option<&RemotePath>,
     451            0 :         _mode: ListingMode,
     452            0 :         _max_keys: Option<NonZeroU32>,
     453            0 :         _cancel: &CancellationToken,
     454            0 :     ) -> Result<crate::VersionListing, DownloadError> {
     455            0 :         unimplemented!()
     456              :     }
     457              : 
     458            0 :     async fn head_object(
     459            0 :         &self,
     460            0 :         key: &RemotePath,
     461            0 :         _cancel: &CancellationToken,
     462            0 :     ) -> Result<ListingObject, DownloadError> {
     463            0 :         let target_file_path = key.with_base(&self.storage_root);
     464            0 :         let metadata = file_metadata(&target_file_path).await?;
     465              :         Ok(ListingObject {
     466            0 :             key: key.clone(),
     467            0 :             last_modified: metadata.modified()?,
     468            0 :             size: metadata.len(),
     469              :         })
     470            0 :     }
     471              : 
     472        20008 :     async fn upload(
     473        20008 :         &self,
     474        20008 :         data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
     475        20008 :         data_size_bytes: usize,
     476        20008 :         to: &RemotePath,
     477        20008 :         metadata: Option<StorageMetadata>,
     478        20008 :         cancel: &CancellationToken,
     479        20008 :     ) -> anyhow::Result<()> {
     480        20008 :         let cancel = cancel.child_token();
     481        20008 : 
     482        20008 :         let op = self.upload0(data, data_size_bytes, to, metadata, &cancel);
     483        20008 :         let mut op = std::pin::pin!(op);
     484              : 
     485              :         // race the upload0 to the timeout; if it goes over, do a graceful shutdown
     486        20008 :         let (res, timeout) = tokio::select! {
     487        20008 :             res = &mut op => (res, false),
     488        20008 :             _ = tokio::time::sleep(self.timeout) => {
     489            0 :                 cancel.cancel();
     490            0 :                 (op.await, true)
     491              :             }
     492              :         };
     493              : 
     494           12 :         match res {
     495           12 :             Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
     496            0 :                 // we caused this cancel (or they happened simultaneously) -- swap it out to
     497            0 :                 // Timeout
     498            0 :                 Err(TimeoutOrCancel::Timeout.into())
     499              :             }
     500        19783 :             res => res,
     501              :         }
     502           60 :     }
     503              : 
     504         4499 :     async fn download(
     505         4499 :         &self,
     506         4499 :         from: &RemotePath,
     507         4499 :         opts: &DownloadOpts,
     508         4499 :         cancel: &CancellationToken,
     509         4499 :     ) -> Result<Download, DownloadError> {
     510         4499 :         let target_path = from.with_base(&self.storage_root);
     511              : 
     512         4499 :         let file_metadata = file_metadata(&target_path).await?;
     513          329 :         let etag = mock_etag(&file_metadata);
     514          329 : 
     515          329 :         if opts.etag.as_ref() == Some(&etag) {
     516            0 :             return Err(DownloadError::Unmodified);
     517          329 :         }
     518              : 
     519          329 :         let mut file = fs::OpenOptions::new()
     520          329 :             .read(true)
     521          329 :             .open(&target_path)
     522          329 :             .await
     523          329 :             .with_context(|| {
     524            0 :                 format!("Failed to open source file {target_path:?} to use in the download")
     525          329 :             })
     526          329 :             .map_err(DownloadError::Other)?;
     527              : 
     528          329 :         let mut take = file_metadata.len();
     529          329 :         if let Some((start, end)) = opts.byte_range() {
     530           15 :             if start > 0 {
     531            6 :                 file.seek(io::SeekFrom::Start(start))
     532            6 :                     .await
     533            6 :                     .context("Failed to seek to the range start in a local storage file")
     534            6 :                     .map_err(DownloadError::Other)?;
     535            9 :             }
     536           15 :             if let Some(end) = end {
     537           12 :                 take = end - start;
     538           12 :             }
     539          314 :         }
     540              : 
     541          329 :         let source = ReaderStream::new(file.take(take));
     542              : 
     543          329 :         let metadata = self
     544          329 :             .read_storage_metadata(&target_path)
     545          329 :             .await
     546          326 :             .map_err(DownloadError::Other)?;
     547              : 
     548          326 :         let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
     549          326 :         let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
     550          326 : 
     551          326 :         Ok(Download {
     552          326 :             metadata,
     553          326 :             last_modified: file_metadata
     554          326 :                 .modified()
     555          326 :                 .map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?,
     556          326 :             etag,
     557          326 :             download_stream: Box::pin(source),
     558              :         })
     559         4496 :     }
     560              : 
     561         1825 :     async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> {
     562         1825 :         let file_path = path.with_base(&self.storage_root);
     563         1825 :         match fs::remove_file(&file_path).await {
     564         1796 :             Ok(()) => Ok(()),
     565              :             // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
     566              :             // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
     567              :             // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
     568            3 :             Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
     569            0 :             Err(e) => Err(anyhow::anyhow!(e)),
     570              :         }
     571         1799 :     }
     572              : 
     573           42 :     async fn delete_objects(
     574           42 :         &self,
     575           42 :         paths: &[RemotePath],
     576           42 :         cancel: &CancellationToken,
     577           42 :     ) -> anyhow::Result<()> {
     578           88 :         for path in paths {
     579           46 :             self.delete(path, cancel).await?
     580              :         }
     581           42 :         Ok(())
     582           42 :     }
     583              : 
     584           48 :     fn max_keys_per_delete(&self) -> usize {
     585           48 :         super::MAX_KEYS_PER_DELETE_S3
     586           48 :     }
     587              : 
     588            0 :     async fn copy(
     589            0 :         &self,
     590            0 :         from: &RemotePath,
     591            0 :         to: &RemotePath,
     592            0 :         _cancel: &CancellationToken,
     593            0 :     ) -> anyhow::Result<()> {
     594            0 :         let from_path = from.with_base(&self.storage_root);
     595            0 :         let to_path = to.with_base(&self.storage_root);
     596            0 :         create_target_directory(&to_path).await?;
     597            0 :         fs::copy(&from_path, &to_path).await.with_context(|| {
     598            0 :             format!(
     599            0 :                 "Failed to copy file from '{from_path}' to '{to_path}'",
     600            0 :                 from_path = from_path,
     601            0 :                 to_path = to_path
     602            0 :             )
     603            0 :         })?;
     604            0 :         Ok(())
     605            0 :     }
     606              : 
     607            0 :     async fn time_travel_recover(
     608            0 :         &self,
     609            0 :         _prefix: Option<&RemotePath>,
     610            0 :         _timestamp: SystemTime,
     611            0 :         _done_if_after: SystemTime,
     612            0 :         _cancel: &CancellationToken,
     613            0 :     ) -> Result<(), TimeTravelError> {
     614            0 :         Err(TimeTravelError::Unimplemented)
     615            0 :     }
     616              : }
     617              : 
     618          329 : fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
     619          329 :     path_with_suffix_extension(original_path, "metadata")
     620          329 : }
     621              : 
     622        20008 : async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
     623        20008 :     let target_dir = match target_file_path.parent() {
     624        20008 :         Some(parent_dir) => parent_dir,
     625            0 :         None => bail!("File path '{target_file_path}' has no parent directory"),
     626              :     };
     627        20008 :     if !target_dir.exists() {
     628         4050 :         fs::create_dir_all(target_dir).await?;
     629        15958 :     }
     630        20004 :     Ok(())
     631        20004 : }
     632              : 
     633         4904 : async fn file_metadata(file_path: &Utf8Path) -> Result<std::fs::Metadata, DownloadError> {
     634         4904 :     tokio::fs::metadata(&file_path).await.map_err(|e| {
     635         4170 :         if e.kind() == ErrorKind::NotFound {
     636         4170 :             DownloadError::NotFound
     637              :         } else {
     638            0 :             DownloadError::BadInput(e.into())
     639              :         }
     640         4904 :     })
     641         4904 : }
     642              : 
     643              : // Use mtime as stand-in for ETag.  We could calculate a meaningful one by md5'ing the contents of files we
     644              : // read, but that's expensive and the local_fs test helper's whole reason for existence is to run small tests
     645              : // quickly, with less overhead than using a mock S3 server.
     646          329 : fn mock_etag(meta: &std::fs::Metadata) -> Etag {
     647          329 :     let mtime = meta.modified().expect("Filesystem mtime missing");
     648          329 :     format!("{}", mtime.duration_since(UNIX_EPOCH).unwrap().as_millis()).into()
     649          329 : }
     650              : 
     651              : #[cfg(test)]
     652              : mod fs_tests {
     653              :     use std::collections::HashMap;
     654              :     use std::io::Write;
     655              :     use std::ops::Bound;
     656              : 
     657              :     use camino_tempfile::tempdir;
     658              : 
     659              :     use super::*;
     660              : 
     661            9 :     async fn read_and_check_metadata(
     662            9 :         storage: &LocalFs,
     663            9 :         remote_storage_path: &RemotePath,
     664            9 :         expected_metadata: Option<&StorageMetadata>,
     665            9 :     ) -> anyhow::Result<String> {
     666            9 :         let cancel = CancellationToken::new();
     667            9 :         let download = storage
     668            9 :             .download(remote_storage_path, &DownloadOpts::default(), &cancel)
     669            9 :             .await
     670            9 :             .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
     671            9 :         ensure!(
     672            9 :             download.metadata.as_ref() == expected_metadata,
     673            0 :             "Unexpected metadata returned for the downloaded file"
     674              :         );
     675              : 
     676            9 :         let contents = aggregate(download.download_stream).await?;
     677              : 
     678            9 :         String::from_utf8(contents).map_err(anyhow::Error::new)
     679            9 :     }
     680              : 
     681              :     #[tokio::test]
     682            3 :     async fn upload_file() -> anyhow::Result<()> {
     683            3 :         let (storage, cancel) = create_storage()?;
     684            3 : 
     685            3 :         let target_path_1 = upload_dummy_file(&storage, "upload_1", None, &cancel).await?;
     686            3 :         assert_eq!(
     687            3 :             storage.list_all().await?,
     688            3 :             vec![target_path_1.clone()],
     689            3 :             "Should list a single file after first upload"
     690            3 :         );
     691            3 : 
     692            3 :         let target_path_2 = upload_dummy_file(&storage, "upload_2", None, &cancel).await?;
     693            3 :         assert_eq!(
     694            3 :             list_files_sorted(&storage).await?,
     695            3 :             vec![target_path_1.clone(), target_path_2.clone()],
     696            3 :             "Should list a two different files after second upload"
     697            3 :         );
     698            3 : 
     699            3 :         Ok(())
     700            3 :     }
     701              : 
     702              :     #[tokio::test]
     703            3 :     async fn upload_file_negatives() -> anyhow::Result<()> {
     704            3 :         let (storage, cancel) = create_storage()?;
     705            3 : 
     706            3 :         let id = RemotePath::new(Utf8Path::new("dummy"))?;
     707            3 :         let content = Bytes::from_static(b"12345");
     708           12 :         let content = move || futures::stream::once(futures::future::ready(Ok(content.clone())));
     709            3 : 
     710            3 :         // Check that you get an error if the size parameter doesn't match the actual
     711            3 :         // size of the stream.
     712            3 :         storage
     713            3 :             .upload(content(), 0, &id, None, &cancel)
     714            3 :             .await
     715            3 :             .expect_err("upload with zero size succeeded");
     716            3 :         storage
     717            3 :             .upload(content(), 4, &id, None, &cancel)
     718            3 :             .await
     719            3 :             .expect_err("upload with too short size succeeded");
     720            3 :         storage
     721            3 :             .upload(content(), 6, &id, None, &cancel)
     722            3 :             .await
     723            3 :             .expect_err("upload with too large size succeeded");
     724            3 : 
     725            3 :         // Correct size is 5, this should succeed.
     726            3 :         storage.upload(content(), 5, &id, None, &cancel).await?;
     727            3 : 
     728            3 :         Ok(())
     729            3 :     }
     730              : 
     731           33 :     fn create_storage() -> anyhow::Result<(LocalFs, CancellationToken)> {
     732           33 :         let storage_root = tempdir()?.path().to_path_buf();
     733           33 :         LocalFs::new(storage_root, Duration::from_secs(120)).map(|s| (s, CancellationToken::new()))
     734           33 :     }
     735              : 
     736              :     #[tokio::test]
     737            3 :     async fn download_file() -> anyhow::Result<()> {
     738            3 :         let (storage, cancel) = create_storage()?;
     739            3 :         let upload_name = "upload_1";
     740            3 :         let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
     741            3 : 
     742            3 :         let contents = read_and_check_metadata(&storage, &upload_target, None).await?;
     743            3 :         assert_eq!(
     744            3 :             dummy_contents(upload_name),
     745            3 :             contents,
     746            3 :             "We should upload and download the same contents"
     747            3 :         );
     748            3 : 
     749            3 :         let non_existing_path = RemotePath::new(Utf8Path::new("somewhere/else"))?;
     750            3 :         match storage
     751            3 :             .download(&non_existing_path, &DownloadOpts::default(), &cancel)
     752            3 :             .await
     753            3 :         {
     754            3 :             Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
     755            3 :             other => panic!(
     756            0 :                 "Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"
     757            0 :             ),
     758            3 :         }
     759            3 :         Ok(())
     760            3 :     }
     761              : 
     762              :     #[tokio::test]
     763            3 :     async fn download_file_range_positive() -> anyhow::Result<()> {
     764            3 :         let (storage, cancel) = create_storage()?;
     765            3 :         let upload_name = "upload_1";
     766            3 :         let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
     767            3 : 
     768            3 :         let full_range_download_contents =
     769            3 :             read_and_check_metadata(&storage, &upload_target, None).await?;
     770            3 :         assert_eq!(
     771            3 :             dummy_contents(upload_name),
     772            3 :             full_range_download_contents,
     773            3 :             "Download full range should return the whole upload"
     774            3 :         );
     775            3 : 
     776            3 :         let uploaded_bytes = dummy_contents(upload_name).into_bytes();
     777            3 :         let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
     778            3 : 
     779            3 :         let first_part_download = storage
     780            3 :             .download(
     781            3 :                 &upload_target,
     782            3 :                 &DownloadOpts {
     783            3 :                     byte_end: Bound::Excluded(first_part_local.len() as u64),
     784            3 :                     ..Default::default()
     785            3 :                 },
     786            3 :                 &cancel,
     787            3 :             )
     788            3 :             .await?;
     789            3 :         assert!(
     790            3 :             first_part_download.metadata.is_none(),
     791            3 :             "No metadata should be returned for no metadata upload"
     792            3 :         );
     793            3 : 
     794            3 :         let first_part_remote = aggregate(first_part_download.download_stream).await?;
     795            3 :         assert_eq!(
     796            3 :             first_part_local, first_part_remote,
     797            3 :             "First part bytes should be returned when requested"
     798            3 :         );
     799            3 : 
     800            3 :         let second_part_download = storage
     801            3 :             .download(
     802            3 :                 &upload_target,
     803            3 :                 &DownloadOpts {
     804            3 :                     byte_start: Bound::Included(first_part_local.len() as u64),
     805            3 :                     byte_end: Bound::Excluded(
     806            3 :                         (first_part_local.len() + second_part_local.len()) as u64,
     807            3 :                     ),
     808            3 :                     ..Default::default()
     809            3 :                 },
     810            3 :                 &cancel,
     811            3 :             )
     812            3 :             .await?;
     813            3 :         assert!(
     814            3 :             second_part_download.metadata.is_none(),
     815            3 :             "No metadata should be returned for no metadata upload"
     816            3 :         );
     817            3 : 
     818            3 :         let second_part_remote = aggregate(second_part_download.download_stream).await?;
     819            3 :         assert_eq!(
     820            3 :             second_part_local, second_part_remote,
     821            3 :             "Second part bytes should be returned when requested"
     822            3 :         );
     823            3 : 
     824            3 :         let suffix_bytes = storage
     825            3 :             .download(
     826            3 :                 &upload_target,
     827            3 :                 &DownloadOpts {
     828            3 :                     byte_start: Bound::Included(13),
     829            3 :                     ..Default::default()
     830            3 :                 },
     831            3 :                 &cancel,
     832            3 :             )
     833            3 :             .await?
     834            3 :             .download_stream;
     835            3 :         let suffix_bytes = aggregate(suffix_bytes).await?;
     836            3 :         let suffix = std::str::from_utf8(&suffix_bytes)?;
     837            3 :         assert_eq!(upload_name, suffix);
     838            3 : 
     839            3 :         let all_bytes = storage
     840            3 :             .download(&upload_target, &DownloadOpts::default(), &cancel)
     841            3 :             .await?
     842            3 :             .download_stream;
     843            3 :         let all_bytes = aggregate(all_bytes).await?;
     844            3 :         let all_bytes = std::str::from_utf8(&all_bytes)?;
     845            3 :         assert_eq!(dummy_contents("upload_1"), all_bytes);
     846            3 : 
     847            3 :         Ok(())
     848            3 :     }
     849              : 
     850              :     #[tokio::test]
     851              :     #[should_panic(expected = "at or before start")]
     852            3 :     async fn download_file_range_negative() {
     853            3 :         let (storage, cancel) = create_storage().unwrap();
     854            3 :         let upload_name = "upload_1";
     855            3 :         let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel)
     856            3 :             .await
     857            3 :             .unwrap();
     858            3 : 
     859            3 :         storage
     860            3 :             .download(
     861            3 :                 &upload_target,
     862            3 :                 &DownloadOpts {
     863            3 :                     byte_start: Bound::Included(10),
     864            3 :                     byte_end: Bound::Excluded(10),
     865            3 :                     ..Default::default()
     866            3 :                 },
     867            3 :                 &cancel,
     868            3 :             )
     869            3 :             .await
     870            3 :             .unwrap();
     871            3 :     }
     872              : 
     873              :     #[tokio::test]
     874            3 :     async fn delete_file() -> anyhow::Result<()> {
     875            3 :         let (storage, cancel) = create_storage()?;
     876            3 :         let upload_name = "upload_1";
     877            3 :         let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
     878            3 : 
     879            3 :         storage.delete(&upload_target, &cancel).await?;
     880            3 :         assert!(storage.list_all().await?.is_empty());
     881            3 : 
     882            3 :         storage
     883            3 :             .delete(&upload_target, &cancel)
     884            3 :             .await
     885            3 :             .expect("Should allow deleting non-existing storage files");
     886            3 : 
     887            3 :         Ok(())
     888            3 :     }
     889              : 
     890              :     #[tokio::test]
     891            3 :     async fn file_with_metadata() -> anyhow::Result<()> {
     892            3 :         let (storage, cancel) = create_storage()?;
     893            3 :         let upload_name = "upload_1";
     894            3 :         let metadata = StorageMetadata(HashMap::from([
     895            3 :             ("one".to_string(), "1".to_string()),
     896            3 :             ("two".to_string(), "2".to_string()),
     897            3 :         ]));
     898            3 :         let upload_target =
     899            3 :             upload_dummy_file(&storage, upload_name, Some(metadata.clone()), &cancel).await?;
     900            3 : 
     901            3 :         let full_range_download_contents =
     902            3 :             read_and_check_metadata(&storage, &upload_target, Some(&metadata)).await?;
     903            3 :         assert_eq!(
     904            3 :             dummy_contents(upload_name),
     905            3 :             full_range_download_contents,
     906            3 :             "We should upload and download the same contents"
     907            3 :         );
     908            3 : 
     909            3 :         let uploaded_bytes = dummy_contents(upload_name).into_bytes();
     910            3 :         let (first_part_local, _) = uploaded_bytes.split_at(3);
     911            3 : 
     912            3 :         let partial_download_with_metadata = storage
     913            3 :             .download(
     914            3 :                 &upload_target,
     915            3 :                 &DownloadOpts {
     916            3 :                     byte_end: Bound::Excluded(first_part_local.len() as u64),
     917            3 :                     ..Default::default()
     918            3 :                 },
     919            3 :                 &cancel,
     920            3 :             )
     921            3 :             .await?;
     922            3 :         let first_part_remote = aggregate(partial_download_with_metadata.download_stream).await?;
     923            3 :         assert_eq!(
     924            3 :             first_part_local,
     925            3 :             first_part_remote.as_slice(),
     926            3 :             "First part bytes should be returned when requested"
     927            3 :         );
     928            3 : 
     929            3 :         assert_eq!(
     930            3 :             partial_download_with_metadata.metadata,
     931            3 :             Some(metadata),
     932            3 :             "We should get the same metadata back for partial download"
     933            3 :         );
     934            3 : 
     935            3 :         Ok(())
     936            3 :     }
     937              : 
     938              :     #[tokio::test]
     939            3 :     async fn list() -> anyhow::Result<()> {
     940            3 :         // No delimiter: should recursively list everything
     941            3 :         let (storage, cancel) = create_storage()?;
     942            3 :         let child = upload_dummy_file(&storage, "grandparent/parent/child", None, &cancel).await?;
     943            3 :         let child_sibling =
     944            3 :             upload_dummy_file(&storage, "grandparent/parent/child_sibling", None, &cancel).await?;
     945            3 :         let uncle = upload_dummy_file(&storage, "grandparent/uncle", None, &cancel).await?;
     946            3 : 
     947            3 :         let listing = storage
     948            3 :             .list(None, ListingMode::NoDelimiter, None, &cancel)
     949            3 :             .await?;
     950            3 :         assert!(listing.prefixes.is_empty());
     951            3 :         assert_eq!(
     952            3 :             listing
     953            3 :                 .keys
     954            3 :                 .into_iter()
     955            9 :                 .map(|o| o.key)
     956            3 :                 .collect::<HashSet<_>>(),
     957            3 :             HashSet::from([uncle.clone(), child.clone(), child_sibling.clone()])
     958            3 :         );
     959            3 : 
     960            3 :         // Delimiter: should only go one deep
     961            3 :         let listing = storage
     962            3 :             .list(None, ListingMode::WithDelimiter, None, &cancel)
     963            3 :             .await?;
     964            3 : 
     965            3 :         assert_eq!(
     966            3 :             listing.prefixes,
     967            3 :             [RemotePath::from_string("timelines").unwrap()].to_vec()
     968            3 :         );
     969            3 :         assert!(listing.keys.is_empty());
     970            3 : 
     971            3 :         // Delimiter & prefix with a trailing slash
     972            3 :         let listing = storage
     973            3 :             .list(
     974            3 :                 Some(&RemotePath::from_string("timelines/some_timeline/grandparent/").unwrap()),
     975            3 :                 ListingMode::WithDelimiter,
     976            3 :                 None,
     977            3 :                 &cancel,
     978            3 :             )
     979            3 :             .await?;
     980            3 :         assert_eq!(
     981            3 :             listing.keys.into_iter().map(|o| o.key).collect::<Vec<_>>(),
     982            3 :             [RemotePath::from_string("uncle").unwrap()].to_vec()
     983            3 :         );
     984            3 :         assert_eq!(
     985            3 :             listing.prefixes,
     986            3 :             [RemotePath::from_string("parent").unwrap()].to_vec()
     987            3 :         );
     988            3 : 
     989            3 :         // Delimiter and prefix without a trailing slash
     990            3 :         let listing = storage
     991            3 :             .list(
     992            3 :                 Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
     993            3 :                 ListingMode::WithDelimiter,
     994            3 :                 None,
     995            3 :                 &cancel,
     996            3 :             )
     997            3 :             .await?;
     998            3 :         assert_eq!(listing.keys, vec![]);
     999            3 :         assert_eq!(
    1000            3 :             listing.prefixes,
    1001            3 :             [RemotePath::from_string("grandparent").unwrap()].to_vec()
    1002            3 :         );
    1003            3 : 
    1004            3 :         // Delimiter and prefix that's partway through a path component
    1005            3 :         let listing = storage
    1006            3 :             .list(
    1007            3 :                 Some(&RemotePath::from_string("timelines/some_timeline/grandp").unwrap()),
    1008            3 :                 ListingMode::WithDelimiter,
    1009            3 :                 None,
    1010            3 :                 &cancel,
    1011            3 :             )
    1012            3 :             .await?;
    1013            3 :         assert_eq!(listing.keys, vec![]);
    1014            3 :         assert_eq!(
    1015            3 :             listing.prefixes,
    1016            3 :             [RemotePath::from_string("grandparent").unwrap()].to_vec()
    1017            3 :         );
    1018            3 : 
    1019            3 :         Ok(())
    1020            3 :     }
    1021              : 
    1022              :     #[tokio::test]
    1023            3 :     async fn list_part_component() -> anyhow::Result<()> {
    1024            3 :         // No delimiter: should recursively list everything
    1025            3 :         let (storage, cancel) = create_storage()?;
    1026            3 : 
    1027            3 :         // Imitates what happens in a tenant path when we have an unsharded path and a sharded path, and do a listing
    1028            3 :         // of the unsharded path: although there is a "directory" at the unsharded path, it should be handled as
    1029            3 :         // a freeform prefix.
    1030            3 :         let _child_a =
    1031            3 :             upload_dummy_file(&storage, "grandparent/tenant-01/child", None, &cancel).await?;
    1032            3 :         let _child_b =
    1033            3 :             upload_dummy_file(&storage, "grandparent/tenant/child", None, &cancel).await?;
    1034            3 : 
    1035            3 :         // Delimiter and prefix that's partway through a path component
    1036            3 :         let listing = storage
    1037            3 :             .list(
    1038            3 :                 Some(
    1039            3 :                     &RemotePath::from_string("timelines/some_timeline/grandparent/tenant").unwrap(),
    1040            3 :                 ),
    1041            3 :                 ListingMode::WithDelimiter,
    1042            3 :                 None,
    1043            3 :                 &cancel,
    1044            3 :             )
    1045            3 :             .await?;
    1046            3 :         assert_eq!(listing.keys, vec![]);
    1047            3 : 
    1048            3 :         let mut found_prefixes = listing.prefixes.clone();
    1049            3 :         found_prefixes.sort();
    1050            3 :         assert_eq!(
    1051            3 :             found_prefixes,
    1052            3 :             [
    1053            3 :                 RemotePath::from_string("tenant").unwrap(),
    1054            3 :                 RemotePath::from_string("tenant-01").unwrap(),
    1055            3 :             ]
    1056            3 :             .to_vec()
    1057            3 :         );
    1058            3 : 
    1059            3 :         Ok(())
    1060            3 :     }
    1061              : 
    1062              :     #[tokio::test]
    1063            3 :     async fn overwrite_shorter_file() -> anyhow::Result<()> {
    1064            3 :         let (storage, cancel) = create_storage()?;
    1065            3 : 
    1066            3 :         let path = RemotePath::new("does/not/matter/file".into())?;
    1067            3 : 
    1068            3 :         let body = Bytes::from_static(b"long file contents is long");
    1069            3 :         {
    1070            3 :             let len = body.len();
    1071            3 :             let body =
    1072            3 :                 futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
    1073            3 :             storage.upload(body, len, &path, None, &cancel).await?;
    1074            3 :         }
    1075            3 : 
    1076            3 :         let read = aggregate(
    1077            3 :             storage
    1078            3 :                 .download(&path, &DownloadOpts::default(), &cancel)
    1079            3 :                 .await?
    1080            3 :                 .download_stream,
    1081            3 :         )
    1082            3 :         .await?;
    1083            3 :         assert_eq!(body, read);
    1084            3 : 
    1085            3 :         let shorter = Bytes::from_static(b"shorter body");
    1086            3 :         {
    1087            3 :             let len = shorter.len();
    1088            3 :             let body =
    1089            3 :                 futures::stream::once(futures::future::ready(std::io::Result::Ok(shorter.clone())));
    1090            3 :             storage.upload(body, len, &path, None, &cancel).await?;
    1091            3 :         }
    1092            3 : 
    1093            3 :         let read = aggregate(
    1094            3 :             storage
    1095            3 :                 .download(&path, &DownloadOpts::default(), &cancel)
    1096            3 :                 .await?
    1097            3 :                 .download_stream,
    1098            3 :         )
    1099            3 :         .await?;
    1100            3 :         assert_eq!(shorter, read);
    1101            3 :         Ok(())
    1102            3 :     }
    1103              : 
    1104              :     #[tokio::test]
    1105            3 :     async fn cancelled_upload_can_later_be_retried() -> anyhow::Result<()> {
    1106            3 :         let (storage, cancel) = create_storage()?;
    1107            3 : 
    1108            3 :         let path = RemotePath::new("does/not/matter/file".into())?;
    1109            3 : 
    1110            3 :         let body = Bytes::from_static(b"long file contents is long");
    1111            3 :         {
    1112            3 :             let len = body.len();
    1113            3 :             let body =
    1114            3 :                 futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
    1115            3 :             let cancel = cancel.child_token();
    1116            3 :             cancel.cancel();
    1117            3 :             let e = storage
    1118            3 :                 .upload(body, len, &path, None, &cancel)
    1119            3 :                 .await
    1120            3 :                 .unwrap_err();
    1121            3 : 
    1122            3 :             assert!(TimeoutOrCancel::caused_by_cancel(&e));
    1123            3 :         }
    1124            3 : 
    1125            3 :         {
    1126            3 :             let len = body.len();
    1127            3 :             let body =
    1128            3 :                 futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
    1129            3 :             storage.upload(body, len, &path, None, &cancel).await?;
    1130            3 :         }
    1131            3 : 
    1132            3 :         let read = aggregate(
    1133            3 :             storage
    1134            3 :                 .download(&path, &DownloadOpts::default(), &cancel)
    1135            3 :                 .await?
    1136            3 :                 .download_stream,
    1137            3 :         )
    1138            3 :         .await?;
    1139            3 :         assert_eq!(body, read);
    1140            3 : 
    1141            3 :         Ok(())
    1142            3 :     }
    1143              : 
    1144           36 :     async fn upload_dummy_file(
    1145           36 :         storage: &LocalFs,
    1146           36 :         name: &str,
    1147           36 :         metadata: Option<StorageMetadata>,
    1148           36 :         cancel: &CancellationToken,
    1149           36 :     ) -> anyhow::Result<RemotePath> {
    1150           36 :         let from_path = storage
    1151           36 :             .storage_root
    1152           36 :             .join("timelines")
    1153           36 :             .join("some_timeline")
    1154           36 :             .join(name);
    1155           36 :         let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
    1156              : 
    1157           36 :         let relative_path = from_path
    1158           36 :             .strip_prefix(&storage.storage_root)
    1159           36 :             .context("Failed to strip storage root prefix")
    1160           36 :             .and_then(RemotePath::new)
    1161           36 :             .with_context(|| {
    1162            0 :                 format!(
    1163            0 :                     "Failed to resolve remote part of path {:?} for base {:?}",
    1164            0 :                     from_path, storage.storage_root
    1165            0 :                 )
    1166           36 :             })?;
    1167              : 
    1168           36 :         let file = tokio_util::io::ReaderStream::new(file);
    1169           36 : 
    1170           36 :         storage
    1171           36 :             .upload(file, size, &relative_path, metadata, cancel)
    1172           36 :             .await?;
    1173           36 :         Ok(relative_path)
    1174           36 :     }
    1175              : 
    1176           36 :     async fn create_file_for_upload(
    1177           36 :         path: &Utf8Path,
    1178           36 :         contents: &str,
    1179           36 :     ) -> anyhow::Result<(fs::File, usize)> {
    1180           36 :         std::fs::create_dir_all(path.parent().unwrap())?;
    1181           36 :         let mut file_for_writing = std::fs::OpenOptions::new()
    1182           36 :             .write(true)
    1183           36 :             .create_new(true)
    1184           36 :             .open(path)?;
    1185           36 :         write!(file_for_writing, "{}", contents)?;
    1186           36 :         drop(file_for_writing);
    1187           36 :         let file_size = path.metadata()?.len() as usize;
    1188           36 :         Ok((
    1189           36 :             fs::OpenOptions::new().read(true).open(&path).await?,
    1190           36 :             file_size,
    1191              :         ))
    1192           36 :     }
    1193              : 
    1194           54 :     fn dummy_contents(name: &str) -> String {
    1195           54 :         format!("contents for {name}")
    1196           54 :     }
    1197              : 
    1198            3 :     async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
    1199            3 :         let mut files = storage.list_all().await?;
    1200            3 :         files.sort_by(|a, b| a.0.cmp(&b.0));
    1201            3 :         Ok(files)
    1202            3 :     }
    1203              : 
    1204           33 :     async fn aggregate(
    1205           33 :         stream: impl Stream<Item = std::io::Result<Bytes>>,
    1206           33 :     ) -> anyhow::Result<Vec<u8>> {
    1207              :         use futures::stream::StreamExt;
    1208           33 :         let mut out = Vec::new();
    1209           33 :         let mut stream = std::pin::pin!(stream);
    1210           66 :         while let Some(res) = stream.next().await {
    1211           33 :             out.extend_from_slice(&res?[..]);
    1212              :         }
    1213           33 :         Ok(out)
    1214           33 :     }
    1215              : }
        

Generated by: LCOV version 2.1-beta