LCOV - differential code coverage report
Current view: top level - libs/remote_storage/src - simulate_failures.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 70.8 % 106 75 31 75
Current Date: 2024-01-09 02:06:09 Functions: 72.4 % 29 21 8 21
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! This module provides a wrapper around a real RemoteStorage implementation that
       2                 : //! causes the first N attempts at each upload or download operatio to fail. For
       3                 : //! testing purposes.
       4                 : use bytes::Bytes;
       5                 : use futures::stream::Stream;
       6                 : use std::collections::hash_map::Entry;
       7                 : use std::collections::HashMap;
       8                 : use std::sync::Mutex;
       9                 : 
      10                 : use crate::{
      11                 :     Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata,
      12                 : };
      13                 : 
      14                 : pub struct UnreliableWrapper {
      15                 :     inner: crate::GenericRemoteStorage,
      16                 : 
      17                 :     // This many attempts of each operation will fail, then we let it succeed.
      18                 :     attempts_to_fail: u64,
      19                 : 
      20                 :     // Tracks how many failed attempts of each operation has been made.
      21                 :     attempts: Mutex<HashMap<RemoteOp, u64>>,
      22                 : }
      23                 : 
      24                 : /// Used to identify retries of different unique operation.
      25 CBC        7027 : #[derive(Debug, Hash, Eq, PartialEq)]
      26                 : enum RemoteOp {
      27                 :     ListPrefixes(Option<RemotePath>),
      28                 :     Upload(RemotePath),
      29                 :     Download(RemotePath),
      30                 :     Delete(RemotePath),
      31                 :     DeleteObjects(Vec<RemotePath>),
      32                 : }
      33                 : 
      34                 : impl UnreliableWrapper {
      35              65 :     pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
      36              65 :         assert!(attempts_to_fail > 0);
      37              65 :         UnreliableWrapper {
      38              65 :             inner,
      39              65 :             attempts_to_fail,
      40              65 :             attempts: Mutex::new(HashMap::new()),
      41              65 :         }
      42              65 :     }
      43                 : 
      44                 :     ///
      45                 :     /// Common functionality for all operations.
      46                 :     ///
      47                 :     /// On the first attempts of this operation, return an error. After 'attempts_to_fail'
      48                 :     /// attempts, let the operation go ahead, and clear the counter.
      49                 :     ///
      50            5738 :     fn attempt(&self, op: RemoteOp) -> Result<u64, DownloadError> {
      51            5738 :         let mut attempts = self.attempts.lock().unwrap();
      52            5738 : 
      53            5738 :         match attempts.entry(op) {
      54            2734 :             Entry::Occupied(mut e) => {
      55            2734 :                 let attempts_before_this = {
      56            2734 :                     let p = e.get_mut();
      57            2734 :                     *p += 1;
      58            2734 :                     *p
      59            2734 :                 };
      60            2734 : 
      61            2734 :                 if attempts_before_this >= self.attempts_to_fail {
      62                 :                     // let it succeed
      63            2734 :                     e.remove();
      64            2734 :                     Ok(attempts_before_this)
      65                 :                 } else {
      66 UBC           0 :                     let error =
      67               0 :                         anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
      68               0 :                     Err(DownloadError::Other(error))
      69                 :                 }
      70                 :             }
      71 CBC        3004 :             Entry::Vacant(e) => {
      72            3004 :                 let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
      73            3004 :                 e.insert(1);
      74            3004 :                 Err(DownloadError::Other(error))
      75                 :             }
      76                 :         }
      77            5738 :     }
      78                 : 
      79            1913 :     async fn delete_inner(&self, path: &RemotePath, attempt: bool) -> anyhow::Result<()> {
      80            1913 :         if attempt {
      81              68 :             self.attempt(RemoteOp::Delete(path.clone()))?;
      82            1845 :         }
      83            7672 :         self.inner.delete(path).await
      84            1913 :     }
      85                 : }
      86                 : 
      87                 : #[async_trait::async_trait]
      88                 : impl RemoteStorage for UnreliableWrapper {
      89 UBC           0 :     async fn list_prefixes(
      90               0 :         &self,
      91               0 :         prefix: Option<&RemotePath>,
      92               0 :     ) -> Result<Vec<RemotePath>, DownloadError> {
      93               0 :         self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
      94               0 :         self.inner.list_prefixes(prefix).await
      95               0 :     }
      96                 : 
      97 CBC         146 :     async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
      98             146 :         self.attempt(RemoteOp::ListPrefixes(folder.cloned()))?;
      99             295 :         self.inner.list_files(folder).await
     100             292 :     }
     101                 : 
     102              44 :     async fn list(
     103              44 :         &self,
     104              44 :         prefix: Option<&RemotePath>,
     105              44 :         mode: ListingMode,
     106              44 :     ) -> Result<Listing, DownloadError> {
     107              44 :         self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
     108              87 :         self.inner.list(prefix, mode).await
     109              88 :     }
     110                 : 
     111            5010 :     async fn upload(
     112            5010 :         &self,
     113            5010 :         data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     114            5010 :         // S3 PUT request requires the content length to be specified,
     115            5010 :         // otherwise it starts to fail with the concurrent connection count increasing.
     116            5010 :         data_size_bytes: usize,
     117            5010 :         to: &RemotePath,
     118            5010 :         metadata: Option<StorageMetadata>,
     119            5010 :     ) -> anyhow::Result<()> {
     120            5010 :         self.attempt(RemoteOp::Upload(to.clone()))?;
     121           17966 :         self.inner.upload(data, data_size_bytes, to, metadata).await
     122           10020 :     }
     123                 : 
     124             244 :     async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
     125             244 :         self.attempt(RemoteOp::Download(from.clone()))?;
     126             327 :         self.inner.download(from).await
     127             488 :     }
     128                 : 
     129 UBC           0 :     async fn download_byte_range(
     130               0 :         &self,
     131               0 :         from: &RemotePath,
     132               0 :         start_inclusive: u64,
     133               0 :         end_exclusive: Option<u64>,
     134               0 :     ) -> Result<Download, DownloadError> {
     135                 :         // Note: We treat any download_byte_range as an "attempt" of the same
     136                 :         // operation. We don't pay attention to the ranges. That's good enough
     137                 :         // for now.
     138               0 :         self.attempt(RemoteOp::Download(from.clone()))?;
     139               0 :         self.inner
     140               0 :             .download_byte_range(from, start_inclusive, end_exclusive)
     141               0 :             .await
     142               0 :     }
     143                 : 
     144 CBC          68 :     async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
     145             140 :         self.delete_inner(path, true).await
     146             136 :     }
     147                 : 
     148             226 :     async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
     149             226 :         self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?;
     150             113 :         let mut error_counter = 0;
     151            1958 :         for path in paths {
     152                 :             // Dont record attempt because it was already recorded above
     153            7532 :             if (self.delete_inner(path, false).await).is_err() {
     154 UBC           0 :                 error_counter += 1;
     155 CBC        1845 :             }
     156                 :         }
     157             113 :         if error_counter > 0 {
     158 UBC           0 :             return Err(anyhow::anyhow!(
     159               0 :                 "failed to delete {} objects",
     160               0 :                 error_counter
     161               0 :             ));
     162 CBC         113 :         }
     163             113 :         Ok(())
     164             452 :     }
     165                 : 
     166 UBC           0 :     async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
     167                 :         // copy is equivalent to download + upload
     168               0 :         self.attempt(RemoteOp::Download(from.clone()))?;
     169               0 :         self.attempt(RemoteOp::Upload(to.clone()))?;
     170               0 :         self.inner.copy_object(from, to).await
     171               0 :     }
     172                 : }
        

Generated by: LCOV version 2.1-beta