LCOV - code coverage report
Current view: top level - libs/remote_storage/src - simulate_failures.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 79.6 % 93 74
Test Date: 2023-09-06 10:18:01 Functions: 69.7 % 33 23

            Line data    Source code
       1              : //! This module provides a wrapper around a real RemoteStorage implementation that
       2              : //! causes the first N attempts at each upload or download operatio to fail. For
       3              : //! testing purposes.
       4              : use std::collections::hash_map::Entry;
       5              : use std::collections::HashMap;
       6              : use std::sync::Mutex;
       7              : 
       8              : use crate::{Download, DownloadError, RemotePath, RemoteStorage, StorageMetadata};
       9              : 
      10              : pub struct UnreliableWrapper {
      11              :     inner: crate::GenericRemoteStorage,
      12              : 
      13              :     // This many attempts of each operation will fail, then we let it succeed.
      14              :     attempts_to_fail: u64,
      15              : 
      16              :     // Tracks how many failed attempts of each operation has been made.
      17              :     attempts: Mutex<HashMap<RemoteOp, u64>>,
      18              : }
      19              : 
      20              : /// Used to identify retries of different unique operation.
      21        11572 : #[derive(Debug, Hash, Eq, PartialEq)]
      22              : enum RemoteOp {
      23              :     ListPrefixes(Option<RemotePath>),
      24              :     Upload(RemotePath),
      25              :     Download(RemotePath),
      26              :     Delete(RemotePath),
      27              :     DeleteObjects(Vec<RemotePath>),
      28              : }
      29              : 
      30              : impl UnreliableWrapper {
      31           56 :     pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
      32           56 :         assert!(attempts_to_fail > 0);
      33           56 :         UnreliableWrapper {
      34           56 :             inner,
      35           56 :             attempts_to_fail,
      36           56 :             attempts: Mutex::new(HashMap::new()),
      37           56 :         }
      38           56 :     }
      39              : 
      40              :     ///
      41              :     /// Common functionality for all operations.
      42              :     ///
      43              :     /// On the first attempts of this operation, return an error. After 'attempts_to_fail'
      44              :     /// attempts, let the operation go ahead, and clear the counter.
      45              :     ///
      46         9882 :     fn attempt(&self, op: RemoteOp) -> Result<u64, DownloadError> {
      47         9882 :         let mut attempts = self.attempts.lock().unwrap();
      48         9882 : 
      49         9882 :         match attempts.entry(op) {
      50         4887 :             Entry::Occupied(mut e) => {
      51         4887 :                 let attempts_before_this = {
      52         4887 :                     let p = e.get_mut();
      53         4887 :                     *p += 1;
      54         4887 :                     *p
      55         4887 :                 };
      56         4887 : 
      57         4887 :                 if attempts_before_this >= self.attempts_to_fail {
      58              :                     // let it succeed
      59         4887 :                     e.remove();
      60         4887 :                     Ok(attempts_before_this)
      61              :                 } else {
      62            0 :                     let error =
      63            0 :                         anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
      64            0 :                     Err(DownloadError::Other(error))
      65              :                 }
      66              :             }
      67         4995 :             Entry::Vacant(e) => {
      68         4995 :                 let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
      69         4995 :                 e.insert(1);
      70         4995 :                 Err(DownloadError::Other(error))
      71              :             }
      72              :         }
      73         9882 :     }
      74              : 
      75         4218 :     async fn delete_inner(&self, path: &RemotePath, attempt: bool) -> anyhow::Result<()> {
      76         4218 :         if attempt {
      77         4090 :             self.attempt(RemoteOp::Delete(path.clone()))?;
      78          128 :         }
      79         8535 :         self.inner.delete(path).await
      80         4218 :     }
      81              : }
      82              : 
      83              : #[async_trait::async_trait]
      84              : impl RemoteStorage for UnreliableWrapper {
      85           12 :     async fn list_prefixes(
      86           12 :         &self,
      87           12 :         prefix: Option<&RemotePath>,
      88           12 :     ) -> Result<Vec<RemotePath>, DownloadError> {
      89           12 :         self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
      90           18 :         self.inner.list_prefixes(prefix).await
      91           24 :     }
      92              : 
      93          128 :     async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
      94          128 :         self.attempt(RemoteOp::ListPrefixes(folder.cloned()))?;
      95          270 :         self.inner.list_files(folder).await
      96          256 :     }
      97              : 
      98         5448 :     async fn upload(
      99         5448 :         &self,
     100         5448 :         data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
     101         5448 :         // S3 PUT request requires the content length to be specified,
     102         5448 :         // otherwise it starts to fail with the concurrent connection count increasing.
     103         5448 :         data_size_bytes: usize,
     104         5448 :         to: &RemotePath,
     105         5448 :         metadata: Option<StorageMetadata>,
     106         5448 :     ) -> anyhow::Result<()> {
     107         5448 :         self.attempt(RemoteOp::Upload(to.clone()))?;
     108        41224 :         self.inner.upload(data, data_size_bytes, to, metadata).await
     109        10896 :     }
     110              : 
     111          190 :     async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
     112          190 :         self.attempt(RemoteOp::Download(from.clone()))?;
     113          285 :         self.inner.download(from).await
     114          380 :     }
     115              : 
     116            0 :     async fn download_byte_range(
     117            0 :         &self,
     118            0 :         from: &RemotePath,
     119            0 :         start_inclusive: u64,
     120            0 :         end_exclusive: Option<u64>,
     121            0 :     ) -> Result<Download, DownloadError> {
     122              :         // Note: We treat any download_byte_range as an "attempt" of the same
     123              :         // operation. We don't pay attention to the ranges. That's good enough
     124              :         // for now.
     125            0 :         self.attempt(RemoteOp::Download(from.clone()))?;
     126            0 :         self.inner
     127            0 :             .download_byte_range(from, start_inclusive, end_exclusive)
     128            0 :             .await
     129            0 :     }
     130              : 
     131         4090 :     async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
     132         8023 :         self.delete_inner(path, true).await
     133         8180 :     }
     134              : 
     135           14 :     async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
     136           14 :         self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?;
     137            7 :         let mut error_counter = 0;
     138          135 :         for path in paths {
     139              :             // Dont record attempt because it was already recorded above
     140          512 :             if (self.delete_inner(path, false).await).is_err() {
     141            0 :                 error_counter += 1;
     142          128 :             }
     143              :         }
     144            7 :         if error_counter > 0 {
     145            0 :             return Err(anyhow::anyhow!(
     146            0 :                 "failed to delete {} objects",
     147            0 :                 error_counter
     148            0 :             ));
     149            7 :         }
     150            7 :         Ok(())
     151           28 :     }
     152              : }
        

Generated by: LCOV version 2.1-beta