LCOV - code coverage report
Current view: top level - libs/remote_storage/src - local_fs.rs (source / functions) Coverage Total Hit
Test: 2aa98e37cd3250b9a68c97ef6050b16fe702ab33.info Lines: 90.3 % 995 898
Test Date: 2024-08-29 11:33:10 Functions: 59.1 % 137 81

            Line data    Source code
       1              : //! Local filesystem acting as a remote storage.
       2              : //! Multiple API users can use the same "storage" of this kind by using different storage roots.
       3              : //!
       4              : //! This storage used in tests, but can also be used in cases when a certain persistent
       5              : //! volume is mounted to the local FS.
       6              : 
       7              : use std::{
       8              :     collections::HashSet,
       9              :     io::ErrorKind,
      10              :     num::NonZeroU32,
      11              :     time::{Duration, SystemTime, UNIX_EPOCH},
      12              : };
      13              : 
      14              : use anyhow::{bail, ensure, Context};
      15              : use bytes::Bytes;
      16              : use camino::{Utf8Path, Utf8PathBuf};
      17              : use futures::stream::Stream;
      18              : use tokio::{
      19              :     fs,
      20              :     io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
      21              : };
      22              : use tokio_util::{io::ReaderStream, sync::CancellationToken};
      23              : use utils::crashsafe::path_with_suffix_extension;
      24              : 
      25              : use crate::{
      26              :     Download, DownloadError, Listing, ListingMode, ListingObject, RemotePath, TimeTravelError,
      27              :     TimeoutOrCancel, REMOTE_STORAGE_PREFIX_SEPARATOR,
      28              : };
      29              : 
      30              : use super::{RemoteStorage, StorageMetadata};
      31              : use crate::Etag;
      32              : 
      33              : const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
      34              : 
      35              : #[derive(Debug, Clone)]
      36              : pub struct LocalFs {
      37              :     storage_root: Utf8PathBuf,
      38              :     timeout: Duration,
      39              : }
      40              : 
      41              : impl LocalFs {
      42              :     /// Attempts to create local FS storage, along with its root directory.
      43              :     /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
      44          700 :     pub fn new(mut storage_root: Utf8PathBuf, timeout: Duration) -> anyhow::Result<Self> {
      45          700 :         if !storage_root.exists() {
      46           88 :             std::fs::create_dir_all(&storage_root).with_context(|| {
      47            0 :                 format!("Failed to create all directories in the given root path {storage_root:?}")
      48           88 :             })?;
      49          612 :         }
      50          700 :         if !storage_root.is_absolute() {
      51          564 :             storage_root = storage_root.canonicalize_utf8().with_context(|| {
      52            0 :                 format!("Failed to represent path {storage_root:?} as an absolute path")
      53          564 :             })?;
      54          136 :         }
      55              : 
      56          700 :         Ok(Self {
      57          700 :             storage_root,
      58          700 :             timeout,
      59          700 :         })
      60          700 :     }
      61              : 
      62              :     // mirrors S3Bucket::s3_object_to_relative_path
      63          410 :     fn local_file_to_relative_path(&self, key: Utf8PathBuf) -> RemotePath {
      64          410 :         let relative_path = key
      65          410 :             .strip_prefix(&self.storage_root)
      66          410 :             .expect("relative path must contain storage_root as prefix");
      67          410 :         RemotePath(relative_path.into())
      68          410 :     }
      69              : 
      70          172 :     async fn read_storage_metadata(
      71          172 :         &self,
      72          172 :         file_path: &Utf8Path,
      73          172 :     ) -> anyhow::Result<Option<StorageMetadata>> {
      74          172 :         let metadata_path = storage_metadata_path(file_path);
      75          172 :         if metadata_path.exists() && metadata_path.is_file() {
      76           16 :             let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
      77            0 :                 format!("Failed to read metadata from the local storage at '{metadata_path}'")
      78           16 :             })?;
      79              : 
      80           16 :             serde_json::from_str(&metadata_string)
      81           16 :                 .with_context(|| {
      82            0 :                     format!(
      83            0 :                         "Failed to deserialize metadata from the local storage at '{metadata_path}'",
      84            0 :                     )
      85           16 :                 })
      86           16 :                 .map(|metadata| Some(StorageMetadata(metadata)))
      87              :         } else {
      88          156 :             Ok(None)
      89              :         }
      90          172 :     }
      91              : 
      92              :     #[cfg(test)]
      93           24 :     async fn list_all(&self) -> anyhow::Result<Vec<RemotePath>> {
      94           24 :         use std::{future::Future, pin::Pin};
      95           72 :         fn get_all_files<'a, P>(
      96           72 :             directory_path: P,
      97           72 :         ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Utf8PathBuf>>> + Send + Sync + 'a>>
      98           72 :         where
      99           72 :             P: AsRef<Utf8Path> + Send + Sync + 'a,
     100           72 :         {
     101           72 :             Box::pin(async move {
     102           72 :                 let directory_path = directory_path.as_ref();
     103           72 :                 if directory_path.exists() {
     104           72 :                     if directory_path.is_dir() {
     105           72 :                         let mut paths = Vec::new();
     106           72 :                         let mut dir_contents = fs::read_dir(directory_path).await?;
     107          144 :                         while let Some(dir_entry) = dir_contents.next_entry().await? {
     108           72 :                             let file_type = dir_entry.file_type().await?;
     109           72 :                             let entry_path =
     110           72 :                                 Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| {
     111            0 :                                     anyhow::Error::msg(format!(
     112            0 :                                         "non-Unicode path: {}",
     113            0 :                                         pb.to_string_lossy()
     114            0 :                                     ))
     115           72 :                                 })?;
     116           72 :                             if file_type.is_symlink() {
     117           24 :                                 tracing::debug!("{entry_path:?} is a symlink, skipping")
     118           72 :                             } else if file_type.is_dir() {
     119           69 :                                 paths.extend(get_all_files(&entry_path).await?.into_iter())
     120           24 :                             } else {
     121           24 :                                 paths.push(entry_path);
     122           24 :                             }
     123           24 :                         }
     124           72 :                         Ok(paths)
     125           24 :                     } else {
     126           24 :                         bail!("Path {directory_path:?} is not a directory")
     127           24 :                     }
     128           24 :                 } else {
     129           24 :                     Ok(Vec::new())
     130           24 :                 }
     131           72 :             })
     132           72 :         }
     133           24 : 
     134           24 :         Ok(get_all_files(&self.storage_root)
     135           69 :             .await?
     136           24 :             .into_iter()
     137           24 :             .map(|path| {
     138           24 :                 path.strip_prefix(&self.storage_root)
     139           24 :                     .context("Failed to strip storage root prefix")
     140           24 :                     .and_then(RemotePath::new)
     141           24 :                     .expect(
     142           24 :                         "We list files for storage root, hence should be able to remote the prefix",
     143           24 :                     )
     144           24 :             })
     145           24 :             .collect())
     146           24 :     }
     147              : 
     148              :     // recursively lists all files in a directory,
     149              :     // mirroring the `list_files` for `s3_bucket`
     150          630 :     async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
     151          630 :         let full_path = match folder {
     152          614 :             Some(folder) => folder.with_base(&self.storage_root),
     153           16 :             None => self.storage_root.clone(),
     154              :         };
     155              : 
     156              :         // If we were given a directory, we may use it as our starting point.
     157              :         // Otherwise, we must go up to the first ancestor dir that exists.  This is because
     158              :         // S3 object list prefixes can be arbitrary strings, but when reading
     159              :         // the local filesystem we need a directory to start calling read_dir on.
     160          630 :         let mut initial_dir = full_path.clone();
     161          630 : 
     162          630 :         // If there's no trailing slash, we have to start looking from one above: even if
     163          630 :         // `initial_dir` is a directory, we should still list any prefixes in the parent
     164          630 :         // that start with the same string.
     165          630 :         if !full_path.to_string().ends_with('/') {
     166           58 :             initial_dir.pop();
     167          572 :         }
     168              : 
     169         2286 :         loop {
     170         2286 :             // Did we make it to the root?
     171         2286 :             if initial_dir.parent().is_none() {
     172            0 :                 anyhow::bail!("list_files: failed to find valid ancestor dir for {full_path}");
     173         2286 :             }
     174         2286 : 
     175         2286 :             match fs::metadata(initial_dir.clone()).await {
     176          630 :                 Ok(meta) if meta.is_dir() => {
     177          630 :                     // We found a directory, break
     178          630 :                     break;
     179              :                 }
     180            0 :                 Ok(_meta) => {
     181            0 :                     // It's not a directory: strip back to the parent
     182            0 :                     initial_dir.pop();
     183            0 :                 }
     184         1656 :                 Err(e) if e.kind() == ErrorKind::NotFound => {
     185         1656 :                     // It's not a file that exists: strip the prefix back to the parent directory
     186         1656 :                     initial_dir.pop();
     187         1656 :                 }
     188            0 :                 Err(e) => {
     189            0 :                     // Unexpected I/O error
     190            0 :                     anyhow::bail!(e)
     191              :                 }
     192              :             }
     193              :         }
     194              :         // Note that Utf8PathBuf starts_with only considers full path segments, but
     195              :         // object prefixes are arbitrary strings, so we need the strings for doing
     196              :         // starts_with later.
     197          630 :         let prefix = full_path.as_str();
     198          630 : 
     199          630 :         let mut files = vec![];
     200          630 :         let mut directory_queue = vec![initial_dir];
     201         1414 :         while let Some(cur_folder) = directory_queue.pop() {
     202          784 :             let mut entries = cur_folder.read_dir_utf8()?;
     203         2213 :             while let Some(Ok(entry)) = entries.next() {
     204         1429 :                 let file_name = entry.file_name();
     205         1429 :                 let full_file_name = cur_folder.join(file_name);
     206         1429 :                 if full_file_name.as_str().starts_with(prefix) {
     207          410 :                     let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
     208          410 :                     files.push(file_remote_path);
     209          410 :                     if full_file_name.is_dir() {
     210          154 :                         directory_queue.push(full_file_name);
     211          256 :                     }
     212         1019 :                 }
     213              :             }
     214              :         }
     215              : 
     216          630 :         Ok(files)
     217          630 :     }
     218              : 
     219         8574 :     async fn upload0(
     220         8574 :         &self,
     221         8574 :         data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
     222         8574 :         data_size_bytes: usize,
     223         8574 :         to: &RemotePath,
     224         8574 :         metadata: Option<StorageMetadata>,
     225         8574 :         cancel: &CancellationToken,
     226         8574 :     ) -> anyhow::Result<()> {
     227         8574 :         let target_file_path = to.with_base(&self.storage_root);
     228         8574 :         create_target_directory(&target_file_path).await?;
     229              :         // We need this dance with sort of durable rename (without fsyncs)
     230              :         // to prevent partial uploads. This was really hit when pageserver shutdown
     231              :         // cancelled the upload and partial file was left on the fs
     232              :         // NOTE: Because temp file suffix always the same this operation is racy.
     233              :         // Two concurrent operations can lead to the following sequence:
     234              :         // T1: write(temp)
     235              :         // T2: write(temp) -> overwrites the content
     236              :         // T1: rename(temp, dst) -> succeeds
     237              :         // T2: rename(temp, dst) -> fails, temp no longet exists
     238              :         // This can be solved by supplying unique temp suffix every time, but this situation
     239              :         // is not normal in the first place, the error can help (and helped at least once)
     240              :         // to discover bugs in upper level synchronization.
     241         8569 :         let temp_file_path =
     242         8569 :             path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
     243         8477 :         let mut destination = io::BufWriter::new(
     244         8569 :             fs::OpenOptions::new()
     245         8569 :                 .write(true)
     246         8569 :                 .create(true)
     247         8569 :                 .truncate(true)
     248         8569 :                 .open(&temp_file_path)
     249         7869 :                 .await
     250         8477 :                 .with_context(|| {
     251            0 :                     format!("Failed to open target fs destination at '{target_file_path}'")
     252         8477 :                 })?,
     253              :         );
     254              : 
     255         8477 :         let from_size_bytes = data_size_bytes as u64;
     256         8477 :         let data = tokio_util::io::StreamReader::new(data);
     257         8477 :         let data = std::pin::pin!(data);
     258         8477 :         let mut buffer_to_read = data.take(from_size_bytes);
     259         8477 : 
     260         8477 :         // alternatively we could just write the bytes to a file, but local_fs is a testing utility
     261         8477 :         let copy = io::copy_buf(&mut buffer_to_read, &mut destination);
     262              : 
     263         8453 :         let bytes_read = tokio::select! {
     264              :             biased;
     265              :             _ = cancel.cancelled() => {
     266              :                 let file = destination.into_inner();
     267              :                 // wait for the inflight operation(s) to complete so that there could be a next
     268              :                 // attempt right away and our writes are not directed to their file.
     269              :                 file.into_std().await;
     270              : 
     271              :                 // TODO: leave the temp or not? leaving is probably less racy. enabled truncate at
     272              :                 // least.
     273              :                 fs::remove_file(temp_file_path).await.context("remove temp_file_path after cancellation or timeout")?;
     274              :                 return Err(TimeoutOrCancel::Cancel.into());
     275              :             }
     276              :             read = copy => read,
     277              :         };
     278              : 
     279         8453 :         let bytes_read =
     280         8453 :             bytes_read.with_context(|| {
     281            0 :                 format!(
     282            0 :                     "Failed to upload file (write temp) to the local storage at '{temp_file_path}'",
     283            0 :                 )
     284         8453 :             })?;
     285              : 
     286         8453 :         if bytes_read < from_size_bytes {
     287            8 :             bail!("Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes");
     288         8445 :         }
     289         8445 :         // Check if there is any extra data after the given size.
     290         8445 :         let mut from = buffer_to_read.into_inner();
     291         8445 :         let extra_read = from.read(&mut [1]).await?;
     292         8444 :         ensure!(
     293         8444 :             extra_read == 0,
     294           16 :             "Provided stream was larger than expected: expected {from_size_bytes} bytes",
     295              :         );
     296              : 
     297         8428 :         destination.flush().await.with_context(|| {
     298            0 :             format!(
     299            0 :                 "Failed to upload (flush temp) file to the local storage at '{temp_file_path}'",
     300            0 :             )
     301         8428 :         })?;
     302              : 
     303         8428 :         fs::rename(temp_file_path, &target_file_path)
     304         7678 :             .await
     305         8422 :             .with_context(|| {
     306            0 :                 format!(
     307            0 :                     "Failed to upload (rename) file to the local storage at '{target_file_path}'",
     308            0 :                 )
     309         8422 :             })?;
     310              : 
     311         8422 :         if let Some(storage_metadata) = metadata {
     312              :             // FIXME: we must not be using metadata much, since this would forget the old metadata
     313              :             // for new writes? or perhaps metadata is sticky; could consider removing if it's never
     314              :             // used.
     315            8 :             let storage_metadata_path = storage_metadata_path(&target_file_path);
     316            8 :             fs::write(
     317            8 :                 &storage_metadata_path,
     318            8 :                 serde_json::to_string(&storage_metadata.0)
     319            8 :                     .context("Failed to serialize storage metadata as json")?,
     320              :             )
     321            7 :             .await
     322            8 :             .with_context(|| {
     323            0 :                 format!(
     324            0 :                     "Failed to write metadata to the local storage at '{storage_metadata_path}'",
     325            0 :                 )
     326            8 :             })?;
     327         8414 :         }
     328              : 
     329         8422 :         Ok(())
     330         8454 :     }
     331              : }
     332              : 
     333              : impl RemoteStorage for LocalFs {
     334            0 :     fn list_streaming(
     335            0 :         &self,
     336            0 :         prefix: Option<&RemotePath>,
     337            0 :         mode: ListingMode,
     338            0 :         max_keys: Option<NonZeroU32>,
     339            0 :         cancel: &CancellationToken,
     340            0 :     ) -> impl Stream<Item = Result<Listing, DownloadError>> {
     341            0 :         let listing = self.list(prefix, mode, max_keys, cancel);
     342            0 :         futures::stream::once(listing)
     343            0 :     }
     344              : 
     345          630 :     async fn list(
     346          630 :         &self,
     347          630 :         prefix: Option<&RemotePath>,
     348          630 :         mode: ListingMode,
     349          630 :         max_keys: Option<NonZeroU32>,
     350          630 :         cancel: &CancellationToken,
     351          630 :     ) -> Result<Listing, DownloadError> {
     352          630 :         let op = async {
     353          630 :             let mut result = Listing::default();
     354              : 
     355              :             // Filter out directories: in S3 directories don't exist, only the keys within them do.
     356          630 :             let keys = self
     357          630 :                 .list_recursive(prefix)
     358         2241 :                 .await
     359          630 :                 .map_err(DownloadError::Other)?;
     360          630 :             let objects = keys
     361          630 :                 .into_iter()
     362          630 :                 .filter_map(|k| {
     363          410 :                     let path = k.with_base(&self.storage_root);
     364          410 :                     if path.is_dir() {
     365          154 :                         None
     366              :                     } else {
     367          256 :                         Some(ListingObject {
     368          256 :                             key: k.clone(),
     369          256 :                             // LocalFs is just for testing, so just specify a dummy time
     370          256 :                             last_modified: SystemTime::now(),
     371          256 :                             size: 0,
     372          256 :                         })
     373              :                     }
     374          630 :                 })
     375          630 :                 .collect();
     376          630 : 
     377          630 :             if let ListingMode::NoDelimiter = mode {
     378           26 :                 result.keys = objects;
     379           26 :             } else {
     380          604 :                 let mut prefixes = HashSet::new();
     381          782 :                 for object in objects {
     382          178 :                     let key = object.key;
     383              :                     // If the part after the prefix includes a "/", take only the first part and put it in `prefixes`.
     384          178 :                     let relative_key = if let Some(prefix) = prefix {
     385          154 :                         let mut prefix = prefix.clone();
     386          154 :                         // We only strip the dirname of the prefix, so that when we strip it from the start of keys we
     387          154 :                         // end up with full file/dir names.
     388          154 :                         let prefix_full_local_path = prefix.with_base(&self.storage_root);
     389          154 :                         let has_slash = prefix.0.to_string().ends_with('/');
     390          154 :                         let strip_prefix = if prefix_full_local_path.is_dir() && has_slash {
     391           90 :                             prefix
     392              :                         } else {
     393           64 :                             prefix.0.pop();
     394           64 :                             prefix
     395              :                         };
     396              : 
     397          154 :                         RemotePath::new(key.strip_prefix(&strip_prefix).unwrap()).unwrap()
     398              :                     } else {
     399           24 :                         key
     400              :                     };
     401              : 
     402          178 :                     let relative_key = format!("{}", relative_key);
     403          178 :                     if relative_key.contains(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     404          170 :                         let first_part = relative_key
     405          170 :                             .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     406          170 :                             .next()
     407          170 :                             .unwrap()
     408          170 :                             .to_owned();
     409          170 :                         prefixes.insert(first_part);
     410          170 :                     } else {
     411            8 :                         result.keys.push(ListingObject {
     412            8 :                             key: RemotePath::from_string(&relative_key).unwrap(),
     413            8 :                             // LocalFs is just for testing
     414            8 :                             last_modified: SystemTime::now(),
     415            8 :                             size: 0,
     416            8 :                         });
     417            8 :                     }
     418              :                 }
     419          604 :                 result.prefixes = prefixes
     420          604 :                     .into_iter()
     421          604 :                     .map(|s| RemotePath::from_string(&s).unwrap())
     422          604 :                     .collect();
     423          604 :             }
     424              : 
     425          630 :             if let Some(max_keys) = max_keys {
     426            0 :                 result.keys.truncate(max_keys.get() as usize);
     427          630 :             }
     428          630 :             Ok(result)
     429          630 :         };
     430              : 
     431          630 :         let timeout = async {
     432         1825 :             tokio::time::sleep(self.timeout).await;
     433            0 :             Err(DownloadError::Timeout)
     434            0 :         };
     435              : 
     436          630 :         let cancelled = async {
     437         2024 :             cancel.cancelled().await;
     438            0 :             Err(DownloadError::Cancelled)
     439            0 :         };
     440              : 
     441              :         tokio::select! {
     442              :             res = op => res,
     443              :             res = timeout => res,
     444              :             res = cancelled => res,
     445              :         }
     446          630 :     }
     447              : 
     448            0 :     async fn head_object(
     449            0 :         &self,
     450            0 :         key: &RemotePath,
     451            0 :         _cancel: &CancellationToken,
     452            0 :     ) -> Result<ListingObject, DownloadError> {
     453            0 :         let target_file_path = key.with_base(&self.storage_root);
     454            0 :         let metadata = file_metadata(&target_file_path).await?;
     455              :         Ok(ListingObject {
     456            0 :             key: key.clone(),
     457            0 :             last_modified: metadata.modified()?,
     458            0 :             size: metadata.len(),
     459              :         })
     460            0 :     }
     461              : 
     462         8574 :     async fn upload(
     463         8574 :         &self,
     464         8574 :         data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
     465         8574 :         data_size_bytes: usize,
     466         8574 :         to: &RemotePath,
     467         8574 :         metadata: Option<StorageMetadata>,
     468         8574 :         cancel: &CancellationToken,
     469         8574 :     ) -> anyhow::Result<()> {
     470         8574 :         let cancel = cancel.child_token();
     471         8574 : 
     472         8574 :         let op = self.upload0(data, data_size_bytes, to, metadata, &cancel);
     473         8574 :         let mut op = std::pin::pin!(op);
     474              : 
     475              :         // race the upload0 to the timeout; if it goes over, do a graceful shutdown
     476         8454 :         let (res, timeout) = tokio::select! {
     477              :             res = &mut op => (res, false),
     478              :             _ = tokio::time::sleep(self.timeout) => {
     479              :                 cancel.cancel();
     480              :                 (op.await, true)
     481              :             }
     482              :         };
     483              : 
     484           32 :         match res {
     485           32 :             Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
     486            0 :                 // we caused this cancel (or they happened simultaneously) -- swap it out to
     487            0 :                 // Timeout
     488            0 :                 Err(TimeoutOrCancel::Timeout.into())
     489              :             }
     490         8454 :             res => res,
     491              :         }
     492         8454 :     }
     493              : 
     494          182 :     async fn download(
     495          182 :         &self,
     496          182 :         from: &RemotePath,
     497          182 :         cancel: &CancellationToken,
     498          182 :     ) -> Result<Download, DownloadError> {
     499          182 :         let target_path = from.with_base(&self.storage_root);
     500              : 
     501          182 :         let file_metadata = file_metadata(&target_path).await?;
     502              : 
     503          132 :         let source = ReaderStream::new(
     504          132 :             fs::OpenOptions::new()
     505          132 :                 .read(true)
     506          132 :                 .open(&target_path)
     507          126 :                 .await
     508          132 :                 .with_context(|| {
     509            0 :                     format!("Failed to open source file {target_path:?} to use in the download")
     510          132 :                 })
     511          132 :                 .map_err(DownloadError::Other)?,
     512              :         );
     513              : 
     514          132 :         let metadata = self
     515          132 :             .read_storage_metadata(&target_path)
     516            8 :             .await
     517          132 :             .map_err(DownloadError::Other)?;
     518              : 
     519          132 :         let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
     520          132 :         let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
     521          132 : 
     522          132 :         let etag = mock_etag(&file_metadata);
     523          132 :         Ok(Download {
     524          132 :             metadata,
     525          132 :             last_modified: file_metadata
     526          132 :                 .modified()
     527          132 :                 .map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?,
     528          132 :             etag,
     529          132 :             download_stream: Box::pin(source),
     530              :         })
     531          182 :     }
     532              : 
     533           56 :     async fn download_byte_range(
     534           56 :         &self,
     535           56 :         from: &RemotePath,
     536           56 :         start_inclusive: u64,
     537           56 :         end_exclusive: Option<u64>,
     538           56 :         cancel: &CancellationToken,
     539           56 :     ) -> Result<Download, DownloadError> {
     540           56 :         if let Some(end_exclusive) = end_exclusive {
     541           40 :             if end_exclusive <= start_inclusive {
     542            8 :                 return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
     543           32 :             };
     544           32 :             if start_inclusive == end_exclusive.saturating_sub(1) {
     545            8 :                 return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
     546           24 :             }
     547           16 :         }
     548              : 
     549           40 :         let target_path = from.with_base(&self.storage_root);
     550           40 :         let file_metadata = file_metadata(&target_path).await?;
     551           40 :         let mut source = tokio::fs::OpenOptions::new()
     552           40 :             .read(true)
     553           40 :             .open(&target_path)
     554           39 :             .await
     555           40 :             .with_context(|| {
     556            0 :                 format!("Failed to open source file {target_path:?} to use in the download")
     557           40 :             })
     558           40 :             .map_err(DownloadError::Other)?;
     559              : 
     560           40 :         let len = source
     561           40 :             .metadata()
     562           40 :             .await
     563           40 :             .context("query file length")
     564           40 :             .map_err(DownloadError::Other)?
     565           40 :             .len();
     566           40 : 
     567           40 :         source
     568           40 :             .seek(io::SeekFrom::Start(start_inclusive))
     569           39 :             .await
     570           40 :             .context("Failed to seek to the range start in a local storage file")
     571           40 :             .map_err(DownloadError::Other)?;
     572              : 
     573           40 :         let metadata = self
     574           40 :             .read_storage_metadata(&target_path)
     575            8 :             .await
     576           40 :             .map_err(DownloadError::Other)?;
     577              : 
     578           40 :         let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive);
     579           40 :         let source = ReaderStream::new(source);
     580           40 : 
     581           40 :         let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
     582           40 :         let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
     583           40 : 
     584           40 :         let etag = mock_etag(&file_metadata);
     585           40 :         Ok(Download {
     586           40 :             metadata,
     587           40 :             last_modified: file_metadata
     588           40 :                 .modified()
     589           40 :                 .map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?,
     590           40 :             etag,
     591           40 :             download_stream: Box::pin(source),
     592              :         })
     593           56 :     }
     594              : 
     595           40 :     async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> {
     596           40 :         let file_path = path.with_base(&self.storage_root);
     597           40 :         match fs::remove_file(&file_path).await {
     598           32 :             Ok(()) => Ok(()),
     599              :             // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
     600              :             // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
     601              :             // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
     602            8 :             Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
     603            0 :             Err(e) => Err(anyhow::anyhow!(e)),
     604              :         }
     605           40 :     }
     606              : 
     607           18 :     async fn delete_objects<'a>(
     608           18 :         &self,
     609           18 :         paths: &'a [RemotePath],
     610           18 :         cancel: &CancellationToken,
     611           18 :     ) -> anyhow::Result<()> {
     612           36 :         for path in paths {
     613           18 :             self.delete(path, cancel).await?
     614              :         }
     615           18 :         Ok(())
     616           18 :     }
     617              : 
     618            0 :     async fn copy(
     619            0 :         &self,
     620            0 :         from: &RemotePath,
     621            0 :         to: &RemotePath,
     622            0 :         _cancel: &CancellationToken,
     623            0 :     ) -> anyhow::Result<()> {
     624            0 :         let from_path = from.with_base(&self.storage_root);
     625            0 :         let to_path = to.with_base(&self.storage_root);
     626            0 :         create_target_directory(&to_path).await?;
     627            0 :         fs::copy(&from_path, &to_path).await.with_context(|| {
     628            0 :             format!(
     629            0 :                 "Failed to copy file from '{from_path}' to '{to_path}'",
     630            0 :                 from_path = from_path,
     631            0 :                 to_path = to_path
     632            0 :             )
     633            0 :         })?;
     634            0 :         Ok(())
     635            0 :     }
     636              : 
     637            0 :     async fn time_travel_recover(
     638            0 :         &self,
     639            0 :         _prefix: Option<&RemotePath>,
     640            0 :         _timestamp: SystemTime,
     641            0 :         _done_if_after: SystemTime,
     642            0 :         _cancel: &CancellationToken,
     643            0 :     ) -> Result<(), TimeTravelError> {
     644            0 :         Err(TimeTravelError::Unimplemented)
     645            0 :     }
     646              : }
     647              : 
     648          180 : fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
     649          180 :     path_with_suffix_extension(original_path, "metadata")
     650          180 : }
     651              : 
     652         8574 : async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
     653         8574 :     let target_dir = match target_file_path.parent() {
     654         8574 :         Some(parent_dir) => parent_dir,
     655            0 :         None => bail!("File path '{target_file_path}' has no parent directory"),
     656              :     };
     657         8574 :     if !target_dir.exists() {
     658         1235 :         fs::create_dir_all(target_dir).await?;
     659         7339 :     }
     660         8569 :     Ok(())
     661         8569 : }
     662              : 
     663          222 : async fn file_metadata(file_path: &Utf8Path) -> Result<std::fs::Metadata, DownloadError> {
     664          222 :     tokio::fs::metadata(&file_path).await.map_err(|e| {
     665           50 :         if e.kind() == ErrorKind::NotFound {
     666           50 :             DownloadError::NotFound
     667              :         } else {
     668            0 :             DownloadError::BadInput(e.into())
     669              :         }
     670          222 :     })
     671          222 : }
     672              : 
     673              : // Use mtime as stand-in for ETag.  We could calculate a meaningful one by md5'ing the contents of files we
     674              : // read, but that's expensive and the local_fs test helper's whole reason for existence is to run small tests
     675              : // quickly, with less overhead than using a mock S3 server.
     676          172 : fn mock_etag(meta: &std::fs::Metadata) -> Etag {
     677          172 :     let mtime = meta.modified().expect("Filesystem mtime missing");
     678          172 :     format!("{}", mtime.duration_since(UNIX_EPOCH).unwrap().as_millis()).into()
     679          172 : }
     680              : 
     681              : #[cfg(test)]
     682              : mod fs_tests {
     683              :     use super::*;
     684              : 
     685              :     use camino_tempfile::tempdir;
     686              :     use std::{collections::HashMap, io::Write};
     687              : 
     688           24 :     async fn read_and_check_metadata(
     689           24 :         storage: &LocalFs,
     690           24 :         remote_storage_path: &RemotePath,
     691           24 :         expected_metadata: Option<&StorageMetadata>,
     692           24 :     ) -> anyhow::Result<String> {
     693           24 :         let cancel = CancellationToken::new();
     694           24 :         let download = storage
     695           24 :             .download(remote_storage_path, &cancel)
     696           55 :             .await
     697           24 :             .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
     698           24 :         ensure!(
     699           24 :             download.metadata.as_ref() == expected_metadata,
     700            0 :             "Unexpected metadata returned for the downloaded file"
     701              :         );
     702              : 
     703           45 :         let contents = aggregate(download.download_stream).await?;
     704              : 
     705           24 :         String::from_utf8(contents).map_err(anyhow::Error::new)
     706           24 :     }
     707              : 
     708              :     #[tokio::test]
     709            8 :     async fn upload_file() -> anyhow::Result<()> {
     710            8 :         let (storage, cancel) = create_storage()?;
     711            8 : 
     712           45 :         let target_path_1 = upload_dummy_file(&storage, "upload_1", None, &cancel).await?;
     713            8 :         assert_eq!(
     714           24 :             storage.list_all().await?,
     715            8 :             vec![target_path_1.clone()],
     716            8 :             "Should list a single file after first upload"
     717            8 :         );
     718            8 : 
     719           48 :         let target_path_2 = upload_dummy_file(&storage, "upload_2", None, &cancel).await?;
     720            8 :         assert_eq!(
     721           24 :             list_files_sorted(&storage).await?,
     722            8 :             vec![target_path_1.clone(), target_path_2.clone()],
     723            8 :             "Should list a two different files after second upload"
     724            8 :         );
     725            8 : 
     726            8 :         Ok(())
     727            8 :     }
     728              : 
     729              :     #[tokio::test]
     730            8 :     async fn upload_file_negatives() -> anyhow::Result<()> {
     731            8 :         let (storage, cancel) = create_storage()?;
     732            8 : 
     733            8 :         let id = RemotePath::new(Utf8Path::new("dummy"))?;
     734            8 :         let content = Bytes::from_static(b"12345");
     735           32 :         let content = move || futures::stream::once(futures::future::ready(Ok(content.clone())));
     736            8 : 
     737            8 :         // Check that you get an error if the size parameter doesn't match the actual
     738            8 :         // size of the stream.
     739            8 :         storage
     740            8 :             .upload(content(), 0, &id, None, &cancel)
     741            8 :             .await
     742            8 :             .expect_err("upload with zero size succeeded");
     743            8 :         storage
     744            8 :             .upload(content(), 4, &id, None, &cancel)
     745           14 :             .await
     746            8 :             .expect_err("upload with too short size succeeded");
     747            8 :         storage
     748            8 :             .upload(content(), 6, &id, None, &cancel)
     749           16 :             .await
     750            8 :             .expect_err("upload with too large size succeeded");
     751            8 : 
     752            8 :         // Correct size is 5, this should succeed.
     753           24 :         storage.upload(content(), 5, &id, None, &cancel).await?;
     754            8 : 
     755            8 :         Ok(())
     756            8 :     }
     757              : 
     758           88 :     fn create_storage() -> anyhow::Result<(LocalFs, CancellationToken)> {
     759           88 :         let storage_root = tempdir()?.path().to_path_buf();
     760           88 :         LocalFs::new(storage_root, Duration::from_secs(120)).map(|s| (s, CancellationToken::new()))
     761           88 :     }
     762              : 
     763              :     #[tokio::test]
     764            8 :     async fn download_file() -> anyhow::Result<()> {
     765            8 :         let (storage, cancel) = create_storage()?;
     766            8 :         let upload_name = "upload_1";
     767           43 :         let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
     768            8 : 
     769           32 :         let contents = read_and_check_metadata(&storage, &upload_target, None).await?;
     770            8 :         assert_eq!(
     771            8 :             dummy_contents(upload_name),
     772            8 :             contents,
     773            8 :             "We should upload and download the same contents"
     774            8 :         );
     775            8 : 
     776            8 :         let non_existing_path = "somewhere/else";
     777            8 :         match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?, &cancel).await {
     778            8 :             Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
     779            8 :             other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
     780            8 :         }
     781            8 :         Ok(())
     782            8 :     }
     783              : 
     784              :     #[tokio::test]
     785            8 :     async fn download_file_range_positive() -> anyhow::Result<()> {
     786            8 :         let (storage, cancel) = create_storage()?;
     787            8 :         let upload_name = "upload_1";
     788           44 :         let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
     789            8 : 
     790            8 :         let full_range_download_contents =
     791           32 :             read_and_check_metadata(&storage, &upload_target, None).await?;
     792            8 :         assert_eq!(
     793            8 :             dummy_contents(upload_name),
     794            8 :             full_range_download_contents,
     795            8 :             "Download full range should return the whole upload"
     796            8 :         );
     797            8 : 
     798            8 :         let uploaded_bytes = dummy_contents(upload_name).into_bytes();
     799            8 :         let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
     800            8 : 
     801            8 :         let first_part_download = storage
     802            8 :             .download_byte_range(
     803            8 :                 &upload_target,
     804            8 :                 0,
     805            8 :                 Some(first_part_local.len() as u64),
     806            8 :                 &cancel,
     807            8 :             )
     808           32 :             .await?;
     809            8 :         assert!(
     810            8 :             first_part_download.metadata.is_none(),
     811            8 :             "No metadata should be returned for no metadata upload"
     812            8 :         );
     813            8 : 
     814            8 :         let first_part_remote = aggregate(first_part_download.download_stream).await?;
     815            8 :         assert_eq!(
     816            8 :             first_part_local, first_part_remote,
     817            8 :             "First part bytes should be returned when requested"
     818            8 :         );
     819            8 : 
     820            8 :         let second_part_download = storage
     821            8 :             .download_byte_range(
     822            8 :                 &upload_target,
     823            8 :                 first_part_local.len() as u64,
     824            8 :                 Some((first_part_local.len() + second_part_local.len()) as u64),
     825            8 :                 &cancel,
     826            8 :             )
     827           32 :             .await?;
     828            8 :         assert!(
     829            8 :             second_part_download.metadata.is_none(),
     830            8 :             "No metadata should be returned for no metadata upload"
     831            8 :         );
     832            8 : 
     833            8 :         let second_part_remote = aggregate(second_part_download.download_stream).await?;
     834            8 :         assert_eq!(
     835            8 :             second_part_local, second_part_remote,
     836            8 :             "Second part bytes should be returned when requested"
     837            8 :         );
     838            8 : 
     839            8 :         let suffix_bytes = storage
     840            8 :             .download_byte_range(&upload_target, 13, None, &cancel)
     841           32 :             .await?
     842            8 :             .download_stream;
     843            8 :         let suffix_bytes = aggregate(suffix_bytes).await?;
     844            8 :         let suffix = std::str::from_utf8(&suffix_bytes)?;
     845            8 :         assert_eq!(upload_name, suffix);
     846            8 : 
     847            8 :         let all_bytes = storage
     848            8 :             .download_byte_range(&upload_target, 0, None, &cancel)
     849           32 :             .await?
     850            8 :             .download_stream;
     851            8 :         let all_bytes = aggregate(all_bytes).await?;
     852            8 :         let all_bytes = std::str::from_utf8(&all_bytes)?;
     853            8 :         assert_eq!(dummy_contents("upload_1"), all_bytes);
     854            8 : 
     855            8 :         Ok(())
     856            8 :     }
     857              : 
     858              :     #[tokio::test]
     859            8 :     async fn download_file_range_negative() -> anyhow::Result<()> {
     860            8 :         let (storage, cancel) = create_storage()?;
     861            8 :         let upload_name = "upload_1";
     862           45 :         let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
     863            8 : 
     864            8 :         let start = 1_000_000_000;
     865            8 :         let end = start + 1;
     866            8 :         match storage
     867            8 :             .download_byte_range(
     868            8 :                 &upload_target,
     869            8 :                 start,
     870            8 :                 Some(end), // exclusive end
     871            8 :                 &cancel,
     872            8 :             )
     873            8 :             .await
     874            8 :         {
     875            8 :             Ok(_) => panic!("Should not allow downloading wrong ranges"),
     876            8 :             Err(e) => {
     877            8 :                 let error_string = e.to_string();
     878            8 :                 assert!(error_string.contains("zero bytes"));
     879            8 :                 assert!(error_string.contains(&start.to_string()));
     880            8 :                 assert!(error_string.contains(&end.to_string()));
     881            8 :             }
     882            8 :         }
     883            8 : 
     884            8 :         let start = 10000;
     885            8 :         let end = 234;
     886            8 :         assert!(start > end, "Should test an incorrect range");
     887            8 :         match storage
     888            8 :             .download_byte_range(&upload_target, start, Some(end), &cancel)
     889            8 :             .await
     890            8 :         {
     891            8 :             Ok(_) => panic!("Should not allow downloading wrong ranges"),
     892            8 :             Err(e) => {
     893            8 :                 let error_string = e.to_string();
     894            8 :                 assert!(error_string.contains("Invalid range"));
     895            8 :                 assert!(error_string.contains(&start.to_string()));
     896            8 :                 assert!(error_string.contains(&end.to_string()));
     897            8 :             }
     898            8 :         }
     899            8 : 
     900            8 :         Ok(())
     901            8 :     }
     902              : 
     903              :     #[tokio::test]
     904            8 :     async fn delete_file() -> anyhow::Result<()> {
     905            8 :         let (storage, cancel) = create_storage()?;
     906            8 :         let upload_name = "upload_1";
     907           42 :         let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
     908            8 : 
     909            8 :         storage.delete(&upload_target, &cancel).await?;
     910           21 :         assert!(storage.list_all().await?.is_empty());
     911            8 : 
     912            8 :         storage
     913            8 :             .delete(&upload_target, &cancel)
     914            8 :             .await
     915            8 :             .expect("Should allow deleting non-existing storage files");
     916            8 : 
     917            8 :         Ok(())
     918            8 :     }
     919              : 
     920              :     #[tokio::test]
     921            8 :     async fn file_with_metadata() -> anyhow::Result<()> {
     922            8 :         let (storage, cancel) = create_storage()?;
     923            8 :         let upload_name = "upload_1";
     924            8 :         let metadata = StorageMetadata(HashMap::from([
     925            8 :             ("one".to_string(), "1".to_string()),
     926            8 :             ("two".to_string(), "2".to_string()),
     927            8 :         ]));
     928            8 :         let upload_target =
     929           50 :             upload_dummy_file(&storage, upload_name, Some(metadata.clone()), &cancel).await?;
     930            8 : 
     931            8 :         let full_range_download_contents =
     932           36 :             read_and_check_metadata(&storage, &upload_target, Some(&metadata)).await?;
     933            8 :         assert_eq!(
     934            8 :             dummy_contents(upload_name),
     935            8 :             full_range_download_contents,
     936            8 :             "We should upload and download the same contents"
     937            8 :         );
     938            8 : 
     939            8 :         let uploaded_bytes = dummy_contents(upload_name).into_bytes();
     940            8 :         let (first_part_local, _) = uploaded_bytes.split_at(3);
     941            8 : 
     942            8 :         let partial_download_with_metadata = storage
     943            8 :             .download_byte_range(
     944            8 :                 &upload_target,
     945            8 :                 0,
     946            8 :                 Some(first_part_local.len() as u64),
     947            8 :                 &cancel,
     948            8 :             )
     949           36 :             .await?;
     950            8 :         let first_part_remote = aggregate(partial_download_with_metadata.download_stream).await?;
     951            8 :         assert_eq!(
     952            8 :             first_part_local,
     953            8 :             first_part_remote.as_slice(),
     954            8 :             "First part bytes should be returned when requested"
     955            8 :         );
     956            8 : 
     957            8 :         assert_eq!(
     958            8 :             partial_download_with_metadata.metadata,
     959            8 :             Some(metadata),
     960            8 :             "We should get the same metadata back for partial download"
     961            8 :         );
     962            8 : 
     963            8 :         Ok(())
     964            8 :     }
     965              : 
     966              :     #[tokio::test]
     967            8 :     async fn list() -> anyhow::Result<()> {
     968            8 :         // No delimiter: should recursively list everything
     969            8 :         let (storage, cancel) = create_storage()?;
     970           41 :         let child = upload_dummy_file(&storage, "grandparent/parent/child", None, &cancel).await?;
     971            8 :         let child_sibling =
     972           43 :             upload_dummy_file(&storage, "grandparent/parent/child_sibling", None, &cancel).await?;
     973           42 :         let uncle = upload_dummy_file(&storage, "grandparent/uncle", None, &cancel).await?;
     974            8 : 
     975            8 :         let listing = storage
     976            8 :             .list(None, ListingMode::NoDelimiter, None, &cancel)
     977            8 :             .await?;
     978            8 :         assert!(listing.prefixes.is_empty());
     979            8 :         assert_eq!(
     980            8 :             listing
     981            8 :                 .keys
     982            8 :                 .into_iter()
     983           24 :                 .map(|o| o.key)
     984            8 :                 .collect::<HashSet<_>>(),
     985            8 :             HashSet::from([uncle.clone(), child.clone(), child_sibling.clone()])
     986            8 :         );
     987            8 : 
     988            8 :         // Delimiter: should only go one deep
     989            8 :         let listing = storage
     990            8 :             .list(None, ListingMode::WithDelimiter, None, &cancel)
     991            8 :             .await?;
     992            8 : 
     993            8 :         assert_eq!(
     994            8 :             listing.prefixes,
     995            8 :             [RemotePath::from_string("timelines").unwrap()].to_vec()
     996            8 :         );
     997            8 :         assert!(listing.keys.is_empty());
     998            8 : 
     999            8 :         // Delimiter & prefix with a trailing slash
    1000            8 :         let listing = storage
    1001            8 :             .list(
    1002            8 :                 Some(&RemotePath::from_string("timelines/some_timeline/grandparent/").unwrap()),
    1003            8 :                 ListingMode::WithDelimiter,
    1004            8 :                 None,
    1005            8 :                 &cancel,
    1006            8 :             )
    1007            8 :             .await?;
    1008            8 :         assert_eq!(
    1009            8 :             listing.keys.into_iter().map(|o| o.key).collect::<Vec<_>>(),
    1010            8 :             [RemotePath::from_string("uncle").unwrap()].to_vec()
    1011            8 :         );
    1012            8 :         assert_eq!(
    1013            8 :             listing.prefixes,
    1014            8 :             [RemotePath::from_string("parent").unwrap()].to_vec()
    1015            8 :         );
    1016            8 : 
    1017            8 :         // Delimiter and prefix without a trailing slash
    1018            8 :         let listing = storage
    1019            8 :             .list(
    1020            8 :                 Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
    1021            8 :                 ListingMode::WithDelimiter,
    1022            8 :                 None,
    1023            8 :                 &cancel,
    1024            8 :             )
    1025            8 :             .await?;
    1026            8 :         assert_eq!(listing.keys, vec![]);
    1027            8 :         assert_eq!(
    1028            8 :             listing.prefixes,
    1029            8 :             [RemotePath::from_string("grandparent").unwrap()].to_vec()
    1030            8 :         );
    1031            8 : 
    1032            8 :         // Delimiter and prefix that's partway through a path component
    1033            8 :         let listing = storage
    1034            8 :             .list(
    1035            8 :                 Some(&RemotePath::from_string("timelines/some_timeline/grandp").unwrap()),
    1036            8 :                 ListingMode::WithDelimiter,
    1037            8 :                 None,
    1038            8 :                 &cancel,
    1039            8 :             )
    1040            8 :             .await?;
    1041            8 :         assert_eq!(listing.keys, vec![]);
    1042            8 :         assert_eq!(
    1043            8 :             listing.prefixes,
    1044            8 :             [RemotePath::from_string("grandparent").unwrap()].to_vec()
    1045            8 :         );
    1046            8 : 
    1047            8 :         Ok(())
    1048            8 :     }
    1049              : 
    1050              :     #[tokio::test]
    1051            8 :     async fn list_part_component() -> anyhow::Result<()> {
    1052            8 :         // No delimiter: should recursively list everything
    1053            8 :         let (storage, cancel) = create_storage()?;
    1054            8 : 
    1055            8 :         // Imitates what happens in a tenant path when we have an unsharded path and a sharded path, and do a listing
    1056            8 :         // of the unsharded path: although there is a "directory" at the unsharded path, it should be handled as
    1057            8 :         // a freeform prefix.
    1058            8 :         let _child_a =
    1059           47 :             upload_dummy_file(&storage, "grandparent/tenant-01/child", None, &cancel).await?;
    1060            8 :         let _child_b =
    1061           48 :             upload_dummy_file(&storage, "grandparent/tenant/child", None, &cancel).await?;
    1062            8 : 
    1063            8 :         // Delimiter and prefix that's partway through a path component
    1064            8 :         let listing = storage
    1065            8 :             .list(
    1066            8 :                 Some(
    1067            8 :                     &RemotePath::from_string("timelines/some_timeline/grandparent/tenant").unwrap(),
    1068            8 :                 ),
    1069            8 :                 ListingMode::WithDelimiter,
    1070            8 :                 None,
    1071            8 :                 &cancel,
    1072            8 :             )
    1073            8 :             .await?;
    1074            8 :         assert_eq!(listing.keys, vec![]);
    1075            8 : 
    1076            8 :         let mut found_prefixes = listing.prefixes.clone();
    1077            8 :         found_prefixes.sort();
    1078            8 :         assert_eq!(
    1079            8 :             found_prefixes,
    1080            8 :             [
    1081            8 :                 RemotePath::from_string("tenant").unwrap(),
    1082            8 :                 RemotePath::from_string("tenant-01").unwrap(),
    1083            8 :             ]
    1084            8 :             .to_vec()
    1085            8 :         );
    1086            8 : 
    1087            8 :         Ok(())
    1088            8 :     }
    1089              : 
    1090              :     #[tokio::test]
    1091            8 :     async fn overwrite_shorter_file() -> anyhow::Result<()> {
    1092            8 :         let (storage, cancel) = create_storage()?;
    1093            8 : 
    1094            8 :         let path = RemotePath::new("does/not/matter/file".into())?;
    1095            8 : 
    1096            8 :         let body = Bytes::from_static(b"long file contents is long");
    1097            8 :         {
    1098            8 :             let len = body.len();
    1099            8 :             let body =
    1100            8 :                 futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
    1101           32 :             storage.upload(body, len, &path, None, &cancel).await?;
    1102            8 :         }
    1103            8 : 
    1104           16 :         let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
    1105            8 :         assert_eq!(body, read);
    1106            8 : 
    1107            8 :         let shorter = Bytes::from_static(b"shorter body");
    1108            8 :         {
    1109            8 :             let len = shorter.len();
    1110            8 :             let body =
    1111            8 :                 futures::stream::once(futures::future::ready(std::io::Result::Ok(shorter.clone())));
    1112           24 :             storage.upload(body, len, &path, None, &cancel).await?;
    1113            8 :         }
    1114            8 : 
    1115           16 :         let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
    1116            8 :         assert_eq!(shorter, read);
    1117            8 :         Ok(())
    1118            8 :     }
    1119              : 
    1120              :     #[tokio::test]
    1121            8 :     async fn cancelled_upload_can_later_be_retried() -> anyhow::Result<()> {
    1122            8 :         let (storage, cancel) = create_storage()?;
    1123            8 : 
    1124            8 :         let path = RemotePath::new("does/not/matter/file".into())?;
    1125            8 : 
    1126            8 :         let body = Bytes::from_static(b"long file contents is long");
    1127            8 :         {
    1128            8 :             let len = body.len();
    1129            8 :             let body =
    1130            8 :                 futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
    1131            8 :             let cancel = cancel.child_token();
    1132            8 :             cancel.cancel();
    1133            8 :             let e = storage
    1134            8 :                 .upload(body, len, &path, None, &cancel)
    1135           24 :                 .await
    1136            8 :                 .unwrap_err();
    1137            8 : 
    1138            8 :             assert!(TimeoutOrCancel::caused_by_cancel(&e));
    1139            8 :         }
    1140            8 : 
    1141            8 :         {
    1142            8 :             let len = body.len();
    1143            8 :             let body =
    1144            8 :                 futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
    1145           22 :             storage.upload(body, len, &path, None, &cancel).await?;
    1146            8 :         }
    1147            8 : 
    1148           14 :         let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
    1149            8 :         assert_eq!(body, read);
    1150            8 : 
    1151            8 :         Ok(())
    1152            8 :     }
    1153              : 
    1154           96 :     async fn upload_dummy_file(
    1155           96 :         storage: &LocalFs,
    1156           96 :         name: &str,
    1157           96 :         metadata: Option<StorageMetadata>,
    1158           96 :         cancel: &CancellationToken,
    1159           96 :     ) -> anyhow::Result<RemotePath> {
    1160           96 :         let from_path = storage
    1161           96 :             .storage_root
    1162           96 :             .join("timelines")
    1163           96 :             .join("some_timeline")
    1164           96 :             .join(name);
    1165           96 :         let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
    1166              : 
    1167           96 :         let relative_path = from_path
    1168           96 :             .strip_prefix(&storage.storage_root)
    1169           96 :             .context("Failed to strip storage root prefix")
    1170           96 :             .and_then(RemotePath::new)
    1171           96 :             .with_context(|| {
    1172            0 :                 format!(
    1173            0 :                     "Failed to resolve remote part of path {:?} for base {:?}",
    1174            0 :                     from_path, storage.storage_root
    1175            0 :                 )
    1176           96 :             })?;
    1177              : 
    1178           96 :         let file = tokio_util::io::ReaderStream::new(file);
    1179           96 : 
    1180           96 :         storage
    1181           96 :             .upload(file, size, &relative_path, metadata, cancel)
    1182          443 :             .await?;
    1183           96 :         Ok(relative_path)
    1184           96 :     }
    1185              : 
    1186           96 :     async fn create_file_for_upload(
    1187           96 :         path: &Utf8Path,
    1188           96 :         contents: &str,
    1189           96 :     ) -> anyhow::Result<(fs::File, usize)> {
    1190           96 :         std::fs::create_dir_all(path.parent().unwrap())?;
    1191           96 :         let mut file_for_writing = std::fs::OpenOptions::new()
    1192           96 :             .write(true)
    1193           96 :             .create_new(true)
    1194           96 :             .open(path)?;
    1195           96 :         write!(file_for_writing, "{}", contents)?;
    1196           96 :         drop(file_for_writing);
    1197           96 :         let file_size = path.metadata()?.len() as usize;
    1198           96 :         Ok((
    1199           96 :             fs::OpenOptions::new().read(true).open(&path).await?,
    1200           96 :             file_size,
    1201              :         ))
    1202           96 :     }
    1203              : 
    1204          144 :     fn dummy_contents(name: &str) -> String {
    1205          144 :         format!("contents for {name}")
    1206          144 :     }
    1207              : 
    1208            8 :     async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
    1209           24 :         let mut files = storage.list_all().await?;
    1210            8 :         files.sort_by(|a, b| a.0.cmp(&b.0));
    1211            8 :         Ok(files)
    1212            8 :     }
    1213              : 
    1214           88 :     async fn aggregate(
    1215           88 :         stream: impl Stream<Item = std::io::Result<Bytes>>,
    1216           88 :     ) -> anyhow::Result<Vec<u8>> {
    1217           88 :         use futures::stream::StreamExt;
    1218           88 :         let mut out = Vec::new();
    1219           88 :         let mut stream = std::pin::pin!(stream);
    1220          176 :         while let Some(res) = stream.next().await {
    1221           88 :             out.extend_from_slice(&res?[..]);
    1222              :         }
    1223           88 :         Ok(out)
    1224           88 :     }
    1225              : }
        

Generated by: LCOV version 2.1-beta