LCOV - differential code coverage report
Current view: top level - libs/remote_storage/src - simulate_failures.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 79.6 % 93 74 19 74
Current Date: 2023-10-19 02:04:12 Functions: 69.7 % 33 23 10 23
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! This module provides a wrapper around a real RemoteStorage implementation that
       2                 : //! causes the first N attempts at each upload or download operatio to fail. For
       3                 : //! testing purposes.
       4                 : use std::collections::hash_map::Entry;
       5                 : use std::collections::HashMap;
       6                 : use std::sync::Mutex;
       7                 : 
       8                 : use crate::{Download, DownloadError, RemotePath, RemoteStorage, StorageMetadata};
       9                 : 
      10                 : pub struct UnreliableWrapper {
      11                 :     inner: crate::GenericRemoteStorage,
      12                 : 
      13                 :     // This many attempts of each operation will fail, then we let it succeed.
      14                 :     attempts_to_fail: u64,
      15                 : 
      16                 :     // Tracks how many failed attempts of each operation has been made.
      17                 :     attempts: Mutex<HashMap<RemoteOp, u64>>,
      18                 : }
      19                 : 
      20                 : /// Used to identify retries of different unique operation.
      21 CBC        7421 : #[derive(Debug, Hash, Eq, PartialEq)]
      22                 : enum RemoteOp {
      23                 :     ListPrefixes(Option<RemotePath>),
      24                 :     Upload(RemotePath),
      25                 :     Download(RemotePath),
      26                 :     Delete(RemotePath),
      27                 :     DeleteObjects(Vec<RemotePath>),
      28                 : }
      29                 : 
      30                 : impl UnreliableWrapper {
      31              65 :     pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
      32              65 :         assert!(attempts_to_fail > 0);
      33              65 :         UnreliableWrapper {
      34              65 :             inner,
      35              65 :             attempts_to_fail,
      36              65 :             attempts: Mutex::new(HashMap::new()),
      37              65 :         }
      38              65 :     }
      39                 : 
      40                 :     ///
      41                 :     /// Common functionality for all operations.
      42                 :     ///
      43                 :     /// On the first attempts of this operation, return an error. After 'attempts_to_fail'
      44                 :     /// attempts, let the operation go ahead, and clear the counter.
      45                 :     ///
      46            6127 :     fn attempt(&self, op: RemoteOp) -> Result<u64, DownloadError> {
      47            6127 :         let mut attempts = self.attempts.lock().unwrap();
      48            6127 : 
      49            6127 :         match attempts.entry(op) {
      50            2996 :             Entry::Occupied(mut e) => {
      51            2996 :                 let attempts_before_this = {
      52            2996 :                     let p = e.get_mut();
      53            2996 :                     *p += 1;
      54            2996 :                     *p
      55            2996 :                 };
      56            2996 : 
      57            2996 :                 if attempts_before_this >= self.attempts_to_fail {
      58                 :                     // let it succeed
      59            2996 :                     e.remove();
      60            2996 :                     Ok(attempts_before_this)
      61                 :                 } else {
      62 UBC           0 :                     let error =
      63               0 :                         anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
      64               0 :                     Err(DownloadError::Other(error))
      65                 :                 }
      66                 :             }
      67 CBC        3131 :             Entry::Vacant(e) => {
      68            3131 :                 let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
      69            3131 :                 e.insert(1);
      70            3131 :                 Err(DownloadError::Other(error))
      71                 :             }
      72                 :         }
      73            6127 :     }
      74                 : 
      75            1973 :     async fn delete_inner(&self, path: &RemotePath, attempt: bool) -> anyhow::Result<()> {
      76            1973 :         if attempt {
      77              72 :             self.attempt(RemoteOp::Delete(path.clone()))?;
      78            1901 :         }
      79            7777 :         self.inner.delete(path).await
      80            1973 :     }
      81                 : }
      82                 : 
      83                 : #[async_trait::async_trait]
      84                 : impl RemoteStorage for UnreliableWrapper {
      85              18 :     async fn list_prefixes(
      86              18 :         &self,
      87              18 :         prefix: Option<&RemotePath>,
      88              18 :     ) -> Result<Vec<RemotePath>, DownloadError> {
      89              18 :         self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
      90              27 :         self.inner.list_prefixes(prefix).await
      91              36 :     }
      92                 : 
      93             134 :     async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
      94             134 :         self.attempt(RemoteOp::ListPrefixes(folder.cloned()))?;
      95             272 :         self.inner.list_files(folder).await
      96             268 :     }
      97                 : 
      98            5335 :     async fn upload(
      99            5335 :         &self,
     100            5335 :         data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
     101            5335 :         // S3 PUT request requires the content length to be specified,
     102            5335 :         // otherwise it starts to fail with the concurrent connection count increasing.
     103            5335 :         data_size_bytes: usize,
     104            5335 :         to: &RemotePath,
     105            5335 :         metadata: Option<StorageMetadata>,
     106            5335 :     ) -> anyhow::Result<()> {
     107            5335 :         self.attempt(RemoteOp::Upload(to.clone()))?;
     108           43379 :         self.inner.upload(data, data_size_bytes, to, metadata).await
     109           10670 :     }
     110                 : 
     111             286 :     async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
     112             286 :         self.attempt(RemoteOp::Download(from.clone()))?;
     113             430 :         self.inner.download(from).await
     114             572 :     }
     115                 : 
     116 UBC           0 :     async fn download_byte_range(
     117               0 :         &self,
     118               0 :         from: &RemotePath,
     119               0 :         start_inclusive: u64,
     120               0 :         end_exclusive: Option<u64>,
     121               0 :     ) -> Result<Download, DownloadError> {
     122                 :         // Note: We treat any download_byte_range as an "attempt" of the same
     123                 :         // operation. We don't pay attention to the ranges. That's good enough
     124                 :         // for now.
     125               0 :         self.attempt(RemoteOp::Download(from.clone()))?;
     126               0 :         self.inner
     127               0 :             .download_byte_range(from, start_inclusive, end_exclusive)
     128               0 :             .await
     129               0 :     }
     130                 : 
     131 CBC          72 :     async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
     132             146 :         self.delete_inner(path, true).await
     133             144 :     }
     134                 : 
     135             282 :     async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
     136             282 :         self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?;
     137             141 :         let mut error_counter = 0;
     138            2042 :         for path in paths {
     139                 :             // Dont record attempt because it was already recorded above
     140            7631 :             if (self.delete_inner(path, false).await).is_err() {
     141 UBC           0 :                 error_counter += 1;
     142 CBC        1901 :             }
     143                 :         }
     144             141 :         if error_counter > 0 {
     145 UBC           0 :             return Err(anyhow::anyhow!(
     146               0 :                 "failed to delete {} objects",
     147               0 :                 error_counter
     148               0 :             ));
     149 CBC         141 :         }
     150             141 :         Ok(())
     151             564 :     }
     152                 : }
        

Generated by: LCOV version 2.1-beta