LCOV - code coverage report
Current view: top level - libs/remote_storage/src - simulate_failures.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 28.0 % 168 47
Test Date: 2025-07-16 12:29:03 Functions: 12.1 % 33 4

            Line data    Source code
       1              : //! This module provides a wrapper around a real RemoteStorage implementation that
       2              : //! causes the first N attempts at each upload or download operatio to fail. For
       3              : //! testing purposes.
       4              : use rand::Rng;
       5              : use std::cmp;
       6              : use std::collections::HashMap;
       7              : use std::collections::hash_map::Entry;
       8              : use std::num::NonZeroU32;
       9              : use std::sync::{Arc, Mutex};
      10              : use std::time::SystemTime;
      11              : 
      12              : use bytes::Bytes;
      13              : use futures::StreamExt;
      14              : use futures::stream::Stream;
      15              : use tokio_util::sync::CancellationToken;
      16              : 
      17              : use crate::{
      18              :     Download, DownloadError, DownloadOpts, GenericRemoteStorage, Listing, ListingMode, RemotePath,
      19              :     RemoteStorage, StorageMetadata, TimeTravelError,
      20              : };
      21              : 
      22              : pub struct UnreliableWrapper {
      23              :     inner: GenericRemoteStorage<Arc<VoidStorage>>,
      24              : 
      25              :     // This many attempts of each operation will fail, then we let it succeed.
      26              :     attempts_to_fail: u64,
      27              : 
      28              :     // Tracks how many failed attempts of each operation has been made.
      29              :     attempts: Mutex<HashMap<RemoteOp, u64>>,
      30              : 
      31              :     /* BEGIN_HADRON */
      32              :     // This the probability of failure for each operation, ranged from [0, 100].
      33              :     // The probability is default to 100, which means that all operations will fail.
      34              :     // Storage will fail by probability up to attempts_to_fail times.
      35              :     attempt_failure_probability: u64,
      36              :     /* END_HADRON */
      37              : }
      38              : 
      39              : /// Used to identify retries of different unique operation.
      40              : #[derive(Debug, Hash, Eq, PartialEq)]
      41              : enum RemoteOp {
      42              :     ListPrefixes(Option<RemotePath>),
      43              :     HeadObject(RemotePath),
      44              :     Upload(RemotePath),
      45              :     Download(RemotePath),
      46              :     Delete(RemotePath),
      47              :     DeleteObjects(Vec<RemotePath>),
      48              :     TimeTravelRecover(Option<RemotePath>),
      49              : }
      50              : 
      51              : impl UnreliableWrapper {
      52            2 :     pub fn new(
      53            2 :         inner: crate::GenericRemoteStorage,
      54            2 :         attempts_to_fail: u64,
      55            2 :         attempt_failure_probability: u64,
      56            2 :     ) -> Self {
      57            2 :         assert!(attempts_to_fail > 0);
      58            2 :         let inner = match inner {
      59            0 :             GenericRemoteStorage::AwsS3(s) => GenericRemoteStorage::AwsS3(s),
      60            0 :             GenericRemoteStorage::AzureBlob(s) => GenericRemoteStorage::AzureBlob(s),
      61            2 :             GenericRemoteStorage::LocalFs(s) => GenericRemoteStorage::LocalFs(s),
      62              :             // We could also make this a no-op, as in, extract the inner of the passed generic remote storage
      63            0 :             GenericRemoteStorage::Unreliable(_s) => {
      64            0 :                 panic!("Can't wrap unreliable wrapper unreliably")
      65              :             }
      66              :         };
      67            2 :         let actual_attempt_failure_probability = cmp::min(attempt_failure_probability, 100);
      68            2 :         UnreliableWrapper {
      69            2 :             inner,
      70            2 :             attempts_to_fail,
      71            2 :             attempt_failure_probability: actual_attempt_failure_probability,
      72            2 :             attempts: Mutex::new(HashMap::new()),
      73            2 :         }
      74            2 :     }
      75              : 
      76              :     ///
      77              :     /// Common functionality for all operations.
      78              :     ///
      79              :     /// On the first attempts of this operation, return an error. After 'attempts_to_fail'
      80              :     /// attempts, let the operation go ahead, and clear the counter.
      81              :     ///
      82           24 :     fn attempt(&self, op: RemoteOp) -> anyhow::Result<u64> {
      83           24 :         let mut attempts = self.attempts.lock().unwrap();
      84           24 :         let mut rng = rand::thread_rng();
      85              : 
      86           24 :         match attempts.entry(op) {
      87           12 :             Entry::Occupied(mut e) => {
      88           12 :                 let attempts_before_this = {
      89           12 :                     let p = e.get_mut();
      90           12 :                     *p += 1;
      91           12 :                     *p
      92              :                 };
      93              : 
      94              :                 /* BEGIN_HADRON */
      95              :                 // If there are more attempts to fail, fail the request by probability.
      96           12 :                 if (attempts_before_this < self.attempts_to_fail)
      97            0 :                     && (rng.gen_range(0..=100) < self.attempt_failure_probability)
      98              :                 {
      99            0 :                     let error =
     100            0 :                         anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
     101            0 :                     Err(error)
     102              :                 } else {
     103           12 :                     e.remove();
     104           12 :                     Ok(attempts_before_this)
     105              :                 }
     106              :                 /* END_HADRON */
     107              :             }
     108           12 :             Entry::Vacant(e) => {
     109           12 :                 let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
     110           12 :                 e.insert(1);
     111           12 :                 Err(error)
     112              :             }
     113              :         }
     114           24 :     }
     115              : 
     116            0 :     async fn delete_inner(
     117            0 :         &self,
     118            0 :         path: &RemotePath,
     119            0 :         attempt: bool,
     120            0 :         cancel: &CancellationToken,
     121            0 :     ) -> anyhow::Result<()> {
     122            0 :         if attempt {
     123            0 :             self.attempt(RemoteOp::Delete(path.clone()))?;
     124            0 :         }
     125            0 :         self.inner.delete(path, cancel).await
     126            0 :     }
     127              : }
     128              : 
     129              : // We never construct this, so the type is not important, just has to not be UnreliableWrapper and impl RemoteStorage.
     130              : type VoidStorage = crate::LocalFs;
     131              : 
     132              : impl RemoteStorage for UnreliableWrapper {
     133            0 :     fn list_streaming(
     134            0 :         &self,
     135            0 :         prefix: Option<&RemotePath>,
     136            0 :         mode: ListingMode,
     137            0 :         max_keys: Option<NonZeroU32>,
     138            0 :         cancel: &CancellationToken,
     139            0 :     ) -> impl Stream<Item = Result<Listing, DownloadError>> + Send {
     140            0 :         async_stream::stream! {
     141              :             self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
     142              :                 .map_err(DownloadError::Other)?;
     143              :             let mut stream = self.inner
     144              :                 .list_streaming(prefix, mode, max_keys, cancel);
     145              :             while let Some(item) = stream.next().await {
     146              :                 yield item;
     147              :             }
     148              :         }
     149            0 :     }
     150            0 :     async fn list(
     151            0 :         &self,
     152            0 :         prefix: Option<&RemotePath>,
     153            0 :         mode: ListingMode,
     154            0 :         max_keys: Option<NonZeroU32>,
     155            0 :         cancel: &CancellationToken,
     156            0 :     ) -> Result<Listing, DownloadError> {
     157            0 :         self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
     158            0 :             .map_err(DownloadError::Other)?;
     159            0 :         self.inner.list(prefix, mode, max_keys, cancel).await
     160            0 :     }
     161              : 
     162            0 :     async fn list_versions(
     163            0 :         &self,
     164            0 :         prefix: Option<&RemotePath>,
     165            0 :         mode: ListingMode,
     166            0 :         max_keys: Option<NonZeroU32>,
     167            0 :         cancel: &CancellationToken,
     168            0 :     ) -> Result<crate::VersionListing, DownloadError> {
     169            0 :         self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
     170            0 :             .map_err(DownloadError::Other)?;
     171            0 :         self.inner
     172            0 :             .list_versions(prefix, mode, max_keys, cancel)
     173            0 :             .await
     174            0 :     }
     175              : 
     176            0 :     async fn head_object(
     177            0 :         &self,
     178            0 :         key: &RemotePath,
     179            0 :         cancel: &CancellationToken,
     180            0 :     ) -> Result<crate::ListingObject, DownloadError> {
     181            0 :         self.attempt(RemoteOp::HeadObject(key.clone()))
     182            0 :             .map_err(DownloadError::Other)?;
     183            0 :         self.inner.head_object(key, cancel).await
     184            0 :     }
     185              : 
     186           24 :     async fn upload(
     187           24 :         &self,
     188           24 :         data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     189           24 :         // S3 PUT request requires the content length to be specified,
     190           24 :         // otherwise it starts to fail with the concurrent connection count increasing.
     191           24 :         data_size_bytes: usize,
     192           24 :         to: &RemotePath,
     193           24 :         metadata: Option<StorageMetadata>,
     194           24 :         cancel: &CancellationToken,
     195           24 :     ) -> anyhow::Result<()> {
     196           24 :         self.attempt(RemoteOp::Upload(to.clone()))?;
     197           12 :         self.inner
     198           12 :             .upload(data, data_size_bytes, to, metadata, cancel)
     199           12 :             .await
     200            0 :     }
     201              : 
     202            0 :     async fn download(
     203            0 :         &self,
     204            0 :         from: &RemotePath,
     205            0 :         opts: &DownloadOpts,
     206            0 :         cancel: &CancellationToken,
     207            0 :     ) -> Result<Download, DownloadError> {
     208              :         // Note: We treat any byte range as an "attempt" of the same operation.
     209              :         // We don't pay attention to the ranges. That's good enough for now.
     210            0 :         self.attempt(RemoteOp::Download(from.clone()))
     211            0 :             .map_err(DownloadError::Other)?;
     212            0 :         self.inner.download(from, opts, cancel).await
     213            0 :     }
     214              : 
     215            0 :     async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
     216            0 :         self.delete_inner(path, true, cancel).await
     217            0 :     }
     218              : 
     219            0 :     async fn delete_objects(
     220            0 :         &self,
     221            0 :         paths: &[RemotePath],
     222            0 :         cancel: &CancellationToken,
     223            0 :     ) -> anyhow::Result<()> {
     224            0 :         self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?;
     225            0 :         let mut error_counter = 0;
     226            0 :         for path in paths {
     227              :             // Dont record attempt because it was already recorded above
     228            0 :             if (self.delete_inner(path, false, cancel).await).is_err() {
     229            0 :                 error_counter += 1;
     230            0 :             }
     231              :         }
     232            0 :         if error_counter > 0 {
     233            0 :             return Err(anyhow::anyhow!(
     234            0 :                 "failed to delete {} objects",
     235            0 :                 error_counter
     236            0 :             ));
     237            0 :         }
     238            0 :         Ok(())
     239            0 :     }
     240              : 
     241            0 :     fn max_keys_per_delete(&self) -> usize {
     242            0 :         self.inner.max_keys_per_delete()
     243            0 :     }
     244              : 
     245            0 :     async fn copy(
     246            0 :         &self,
     247            0 :         from: &RemotePath,
     248            0 :         to: &RemotePath,
     249            0 :         cancel: &CancellationToken,
     250            0 :     ) -> anyhow::Result<()> {
     251              :         // copy is equivalent to download + upload
     252            0 :         self.attempt(RemoteOp::Download(from.clone()))?;
     253            0 :         self.attempt(RemoteOp::Upload(to.clone()))?;
     254            0 :         self.inner.copy_object(from, to, cancel).await
     255            0 :     }
     256              : 
     257            0 :     async fn time_travel_recover(
     258            0 :         &self,
     259            0 :         prefix: Option<&RemotePath>,
     260            0 :         timestamp: SystemTime,
     261            0 :         done_if_after: SystemTime,
     262            0 :         cancel: &CancellationToken,
     263            0 :         complexity_limit: Option<NonZeroU32>,
     264            0 :     ) -> Result<(), TimeTravelError> {
     265            0 :         self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))
     266            0 :             .map_err(TimeTravelError::Other)?;
     267            0 :         self.inner
     268            0 :             .time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
     269            0 :             .await
     270            0 :     }
     271              : }
        

Generated by: LCOV version 2.1-beta