LCOV - code coverage report
Current view: top level - libs/remote_storage/src - simulate_failures.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 60.5 % 129 78
Test Date: 2024-02-07 07:37:29 Functions: 63.6 % 33 21

            Line data    Source code
       1              : //! This module provides a wrapper around a real RemoteStorage implementation that
       2              : //! causes the first N attempts at each upload or download operatio to fail. For
       3              : //! testing purposes.
       4              : use bytes::Bytes;
       5              : use futures::stream::Stream;
       6              : use std::collections::HashMap;
       7              : use std::sync::Mutex;
       8              : use std::time::SystemTime;
       9              : use std::{collections::hash_map::Entry, sync::Arc};
      10              : use tokio_util::sync::CancellationToken;
      11              : 
      12              : use crate::{
      13              :     Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage,
      14              :     StorageMetadata, TimeTravelError,
      15              : };
      16              : 
      17              : pub struct UnreliableWrapper {
      18              :     inner: GenericRemoteStorage<Arc<VoidStorage>>,
      19              : 
      20              :     // This many attempts of each operation will fail, then we let it succeed.
      21              :     attempts_to_fail: u64,
      22              : 
      23              :     // Tracks how many failed attempts of each operation has been made.
      24              :     attempts: Mutex<HashMap<RemoteOp, u64>>,
      25              : }
      26              : 
      27              : /// Used to identify retries of different unique operation.
      28         7313 : #[derive(Debug, Hash, Eq, PartialEq)]
      29              : enum RemoteOp {
      30              :     ListPrefixes(Option<RemotePath>),
      31              :     Upload(RemotePath),
      32              :     Download(RemotePath),
      33              :     Delete(RemotePath),
      34              :     DeleteObjects(Vec<RemotePath>),
      35              :     TimeTravelRecover(Option<RemotePath>),
      36              : }
      37              : 
      38              : impl UnreliableWrapper {
      39           67 :     pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
      40           67 :         assert!(attempts_to_fail > 0);
      41           67 :         let inner = match inner {
      42           52 :             GenericRemoteStorage::AwsS3(s) => GenericRemoteStorage::AwsS3(s),
      43            0 :             GenericRemoteStorage::AzureBlob(s) => GenericRemoteStorage::AzureBlob(s),
      44           15 :             GenericRemoteStorage::LocalFs(s) => GenericRemoteStorage::LocalFs(s),
      45              :             // We could also make this a no-op, as in, extract the inner of the passed generic remote storage
      46            0 :             GenericRemoteStorage::Unreliable(_s) => {
      47            0 :                 panic!("Can't wrap unreliable wrapper unreliably")
      48              :             }
      49              :         };
      50           67 :         UnreliableWrapper {
      51           67 :             inner,
      52           67 :             attempts_to_fail,
      53           67 :             attempts: Mutex::new(HashMap::new()),
      54           67 :         }
      55           67 :     }
      56              : 
      57              :     ///
      58              :     /// Common functionality for all operations.
      59              :     ///
      60              :     /// On the first attempts of this operation, return an error. After 'attempts_to_fail'
      61              :     /// attempts, let the operation go ahead, and clear the counter.
      62              :     ///
      63         6021 :     fn attempt(&self, op: RemoteOp) -> Result<u64, DownloadError> {
      64         6021 :         let mut attempts = self.attempts.lock().unwrap();
      65         6021 : 
      66         6021 :         match attempts.entry(op) {
      67         2959 :             Entry::Occupied(mut e) => {
      68         2959 :                 let attempts_before_this = {
      69         2959 :                     let p = e.get_mut();
      70         2959 :                     *p += 1;
      71         2959 :                     *p
      72         2959 :                 };
      73         2959 : 
      74         2959 :                 if attempts_before_this >= self.attempts_to_fail {
      75              :                     // let it succeed
      76         2959 :                     e.remove();
      77         2959 :                     Ok(attempts_before_this)
      78              :                 } else {
      79            0 :                     let error =
      80            0 :                         anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
      81            0 :                     Err(DownloadError::Other(error))
      82              :                 }
      83              :             }
      84         3062 :             Entry::Vacant(e) => {
      85         3062 :                 let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
      86         3062 :                 e.insert(1);
      87         3062 :                 Err(DownloadError::Other(error))
      88              :             }
      89              :         }
      90         6021 :     }
      91              : 
      92         2012 :     async fn delete_inner(&self, path: &RemotePath, attempt: bool) -> anyhow::Result<()> {
      93         2012 :         if attempt {
      94           68 :             self.attempt(RemoteOp::Delete(path.clone()))?;
      95         1944 :         }
      96         8102 :         self.inner.delete(path).await
      97         2012 :     }
      98              : }
      99              : 
     100              : // We never construct this, so the type is not important, just has to not be UnreliableWrapper and impl RemoteStorage.
     101              : type VoidStorage = crate::LocalFs;
     102              : 
     103              : impl RemoteStorage for UnreliableWrapper {
     104            0 :     async fn list_prefixes(
     105            0 :         &self,
     106            0 :         prefix: Option<&RemotePath>,
     107            0 :     ) -> Result<Vec<RemotePath>, DownloadError> {
     108            0 :         self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
     109            0 :         self.inner.list_prefixes(prefix).await
     110            0 :     }
     111              : 
     112          128 :     async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
     113          128 :         self.attempt(RemoteOp::ListPrefixes(folder.cloned()))?;
     114          262 :         self.inner.list_files(folder).await
     115          128 :     }
     116              : 
     117          126 :     async fn list(
     118          126 :         &self,
     119          126 :         prefix: Option<&RemotePath>,
     120          126 :         mode: ListingMode,
     121          126 :     ) -> Result<Listing, DownloadError> {
     122          126 :         self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
     123          221 :         self.inner.list(prefix, mode).await
     124          126 :     }
     125              : 
     126         5306 :     async fn upload(
     127         5306 :         &self,
     128         5306 :         data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     129         5306 :         // S3 PUT request requires the content length to be specified,
     130         5306 :         // otherwise it starts to fail with the concurrent connection count increasing.
     131         5306 :         data_size_bytes: usize,
     132         5306 :         to: &RemotePath,
     133         5306 :         metadata: Option<StorageMetadata>,
     134         5306 :     ) -> anyhow::Result<()> {
     135         5306 :         self.attempt(RemoteOp::Upload(to.clone()))?;
     136        19068 :         self.inner.upload(data, data_size_bytes, to, metadata).await
     137         5306 :     }
     138              : 
     139          165 :     async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
     140          165 :         self.attempt(RemoteOp::Download(from.clone()))?;
     141          186 :         self.inner.download(from).await
     142          165 :     }
     143              : 
     144            0 :     async fn download_byte_range(
     145            0 :         &self,
     146            0 :         from: &RemotePath,
     147            0 :         start_inclusive: u64,
     148            0 :         end_exclusive: Option<u64>,
     149            0 :     ) -> Result<Download, DownloadError> {
     150            0 :         // Note: We treat any download_byte_range as an "attempt" of the same
     151            0 :         // operation. We don't pay attention to the ranges. That's good enough
     152            0 :         // for now.
     153            0 :         self.attempt(RemoteOp::Download(from.clone()))?;
     154            0 :         self.inner
     155            0 :             .download_byte_range(from, start_inclusive, end_exclusive)
     156            0 :             .await
     157            0 :     }
     158              : 
     159           68 :     async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
     160          140 :         self.delete_inner(path, true).await
     161           68 :     }
     162              : 
     163          228 :     async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
     164          228 :         self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?;
     165          114 :         let mut error_counter = 0;
     166         2058 :         for path in paths {
     167              :             // Dont record attempt because it was already recorded above
     168         7962 :             if (self.delete_inner(path, false).await).is_err() {
     169            0 :                 error_counter += 1;
     170         1944 :             }
     171              :         }
     172          114 :         if error_counter > 0 {
     173            0 :             return Err(anyhow::anyhow!(
     174            0 :                 "failed to delete {} objects",
     175            0 :                 error_counter
     176            0 :             ));
     177          114 :         }
     178          114 :         Ok(())
     179          228 :     }
     180              : 
     181            0 :     async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
     182            0 :         // copy is equivalent to download + upload
     183            0 :         self.attempt(RemoteOp::Download(from.clone()))?;
     184            0 :         self.attempt(RemoteOp::Upload(to.clone()))?;
     185            0 :         self.inner.copy_object(from, to).await
     186            0 :     }
     187              : 
     188            0 :     async fn time_travel_recover(
     189            0 :         &self,
     190            0 :         prefix: Option<&RemotePath>,
     191            0 :         timestamp: SystemTime,
     192            0 :         done_if_after: SystemTime,
     193            0 :         cancel: &CancellationToken,
     194            0 :     ) -> Result<(), TimeTravelError> {
     195            0 :         self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))
     196            0 :             .map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?;
     197            0 :         self.inner
     198            0 :             .time_travel_recover(prefix, timestamp, done_if_after, cancel)
     199            0 :             .await
     200            0 :     }
     201              : }
        

Generated by: LCOV version 2.1-beta