LCOV - code coverage report
Current view: top level - libs/remote_storage/src - simulate_failures.rs (source / functions) Coverage Total Hit
Test: fc67f8dc6087a0b4f4f0bcd74f6e1dc25fab8cf3.info Lines: 25.9 % 170 44
Test Date: 2024-09-24 13:57:57 Functions: 12.5 % 32 4

            Line data    Source code
       1              : //! This module provides a wrapper around a real RemoteStorage implementation that
       2              : //! causes the first N attempts at each upload or download operatio to fail. For
       3              : //! testing purposes.
       4              : use bytes::Bytes;
       5              : use futures::stream::Stream;
       6              : use futures::StreamExt;
       7              : use std::collections::HashMap;
       8              : use std::num::NonZeroU32;
       9              : use std::sync::Mutex;
      10              : use std::time::SystemTime;
      11              : use std::{collections::hash_map::Entry, sync::Arc};
      12              : use tokio_util::sync::CancellationToken;
      13              : 
      14              : use crate::{
      15              :     Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage,
      16              :     StorageMetadata, TimeTravelError,
      17              : };
      18              : 
      19              : pub struct UnreliableWrapper {
      20              :     inner: GenericRemoteStorage<Arc<VoidStorage>>,
      21              : 
      22              :     // This many attempts of each operation will fail, then we let it succeed.
      23              :     attempts_to_fail: u64,
      24              : 
      25              :     // Tracks how many failed attempts of each operation has been made.
      26              :     attempts: Mutex<HashMap<RemoteOp, u64>>,
      27              : }
      28              : 
      29              : /// Used to identify retries of different unique operation.
      30              : #[derive(Debug, Hash, Eq, PartialEq)]
      31              : enum RemoteOp {
      32              :     ListPrefixes(Option<RemotePath>),
      33              :     HeadObject(RemotePath),
      34              :     Upload(RemotePath),
      35              :     Download(RemotePath),
      36              :     Delete(RemotePath),
      37              :     DeleteObjects(Vec<RemotePath>),
      38              :     TimeTravelRecover(Option<RemotePath>),
      39              : }
      40              : 
      41              : impl UnreliableWrapper {
      42            2 :     pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
      43            2 :         assert!(attempts_to_fail > 0);
      44            2 :         let inner = match inner {
      45            0 :             GenericRemoteStorage::AwsS3(s) => GenericRemoteStorage::AwsS3(s),
      46            0 :             GenericRemoteStorage::AzureBlob(s) => GenericRemoteStorage::AzureBlob(s),
      47            2 :             GenericRemoteStorage::LocalFs(s) => GenericRemoteStorage::LocalFs(s),
      48              :             // We could also make this a no-op, as in, extract the inner of the passed generic remote storage
      49            0 :             GenericRemoteStorage::Unreliable(_s) => {
      50            0 :                 panic!("Can't wrap unreliable wrapper unreliably")
      51              :             }
      52              :         };
      53            2 :         UnreliableWrapper {
      54            2 :             inner,
      55            2 :             attempts_to_fail,
      56            2 :             attempts: Mutex::new(HashMap::new()),
      57            2 :         }
      58            2 :     }
      59              : 
      60              :     ///
      61              :     /// Common functionality for all operations.
      62              :     ///
      63              :     /// On the first attempts of this operation, return an error. After 'attempts_to_fail'
      64              :     /// attempts, let the operation go ahead, and clear the counter.
      65              :     ///
      66           24 :     fn attempt(&self, op: RemoteOp) -> anyhow::Result<u64> {
      67           24 :         let mut attempts = self.attempts.lock().unwrap();
      68           24 : 
      69           24 :         match attempts.entry(op) {
      70           12 :             Entry::Occupied(mut e) => {
      71           12 :                 let attempts_before_this = {
      72           12 :                     let p = e.get_mut();
      73           12 :                     *p += 1;
      74           12 :                     *p
      75           12 :                 };
      76           12 : 
      77           12 :                 if attempts_before_this >= self.attempts_to_fail {
      78              :                     // let it succeed
      79           12 :                     e.remove();
      80           12 :                     Ok(attempts_before_this)
      81              :                 } else {
      82            0 :                     let error =
      83            0 :                         anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
      84            0 :                     Err(error)
      85              :                 }
      86              :             }
      87           12 :             Entry::Vacant(e) => {
      88           12 :                 let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
      89           12 :                 e.insert(1);
      90           12 :                 Err(error)
      91              :             }
      92              :         }
      93           24 :     }
      94              : 
      95            0 :     async fn delete_inner(
      96            0 :         &self,
      97            0 :         path: &RemotePath,
      98            0 :         attempt: bool,
      99            0 :         cancel: &CancellationToken,
     100            0 :     ) -> anyhow::Result<()> {
     101            0 :         if attempt {
     102            0 :             self.attempt(RemoteOp::Delete(path.clone()))?;
     103            0 :         }
     104            0 :         self.inner.delete(path, cancel).await
     105            0 :     }
     106              : }
     107              : 
     108              : // We never construct this, so the type is not important, just has to not be UnreliableWrapper and impl RemoteStorage.
     109              : type VoidStorage = crate::LocalFs;
     110              : 
     111              : impl RemoteStorage for UnreliableWrapper {
     112            0 :     fn list_streaming(
     113            0 :         &self,
     114            0 :         prefix: Option<&RemotePath>,
     115            0 :         mode: ListingMode,
     116            0 :         max_keys: Option<NonZeroU32>,
     117            0 :         cancel: &CancellationToken,
     118            0 :     ) -> impl Stream<Item = Result<Listing, DownloadError>> + Send {
     119            0 :         async_stream::stream! {
     120            0 :             self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
     121            0 :                 .map_err(DownloadError::Other)?;
     122            0 :             let mut stream = self.inner
     123            0 :                 .list_streaming(prefix, mode, max_keys, cancel);
     124            0 :             while let Some(item) = stream.next().await {
     125            0 :                 yield item;
     126            0 :             }
     127            0 :         }
     128            0 :     }
     129            0 :     async fn list(
     130            0 :         &self,
     131            0 :         prefix: Option<&RemotePath>,
     132            0 :         mode: ListingMode,
     133            0 :         max_keys: Option<NonZeroU32>,
     134            0 :         cancel: &CancellationToken,
     135            0 :     ) -> Result<Listing, DownloadError> {
     136            0 :         self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
     137            0 :             .map_err(DownloadError::Other)?;
     138            0 :         self.inner.list(prefix, mode, max_keys, cancel).await
     139            0 :     }
     140              : 
     141            0 :     async fn head_object(
     142            0 :         &self,
     143            0 :         key: &RemotePath,
     144            0 :         cancel: &CancellationToken,
     145            0 :     ) -> Result<crate::ListingObject, DownloadError> {
     146            0 :         self.attempt(RemoteOp::HeadObject(key.clone()))
     147            0 :             .map_err(DownloadError::Other)?;
     148            0 :         self.inner.head_object(key, cancel).await
     149            0 :     }
     150              : 
     151           24 :     async fn upload(
     152           24 :         &self,
     153           24 :         data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     154           24 :         // S3 PUT request requires the content length to be specified,
     155           24 :         // otherwise it starts to fail with the concurrent connection count increasing.
     156           24 :         data_size_bytes: usize,
     157           24 :         to: &RemotePath,
     158           24 :         metadata: Option<StorageMetadata>,
     159           24 :         cancel: &CancellationToken,
     160           24 :     ) -> anyhow::Result<()> {
     161           24 :         self.attempt(RemoteOp::Upload(to.clone()))?;
     162           12 :         self.inner
     163           12 :             .upload(data, data_size_bytes, to, metadata, cancel)
     164           38 :             .await
     165           24 :     }
     166              : 
     167            0 :     async fn download(
     168            0 :         &self,
     169            0 :         from: &RemotePath,
     170            0 :         cancel: &CancellationToken,
     171            0 :     ) -> Result<Download, DownloadError> {
     172            0 :         self.attempt(RemoteOp::Download(from.clone()))
     173            0 :             .map_err(DownloadError::Other)?;
     174            0 :         self.inner.download(from, cancel).await
     175            0 :     }
     176              : 
     177            0 :     async fn download_byte_range(
     178            0 :         &self,
     179            0 :         from: &RemotePath,
     180            0 :         start_inclusive: u64,
     181            0 :         end_exclusive: Option<u64>,
     182            0 :         cancel: &CancellationToken,
     183            0 :     ) -> Result<Download, DownloadError> {
     184            0 :         // Note: We treat any download_byte_range as an "attempt" of the same
     185            0 :         // operation. We don't pay attention to the ranges. That's good enough
     186            0 :         // for now.
     187            0 :         self.attempt(RemoteOp::Download(from.clone()))
     188            0 :             .map_err(DownloadError::Other)?;
     189            0 :         self.inner
     190            0 :             .download_byte_range(from, start_inclusive, end_exclusive, cancel)
     191            0 :             .await
     192            0 :     }
     193              : 
     194            0 :     async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
     195            0 :         self.delete_inner(path, true, cancel).await
     196            0 :     }
     197              : 
     198            0 :     async fn delete_objects<'a>(
     199            0 :         &self,
     200            0 :         paths: &'a [RemotePath],
     201            0 :         cancel: &CancellationToken,
     202            0 :     ) -> anyhow::Result<()> {
     203            0 :         self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?;
     204            0 :         let mut error_counter = 0;
     205            0 :         for path in paths {
     206              :             // Dont record attempt because it was already recorded above
     207            0 :             if (self.delete_inner(path, false, cancel).await).is_err() {
     208            0 :                 error_counter += 1;
     209            0 :             }
     210              :         }
     211            0 :         if error_counter > 0 {
     212            0 :             return Err(anyhow::anyhow!(
     213            0 :                 "failed to delete {} objects",
     214            0 :                 error_counter
     215            0 :             ));
     216            0 :         }
     217            0 :         Ok(())
     218            0 :     }
     219              : 
     220            0 :     async fn copy(
     221            0 :         &self,
     222            0 :         from: &RemotePath,
     223            0 :         to: &RemotePath,
     224            0 :         cancel: &CancellationToken,
     225            0 :     ) -> anyhow::Result<()> {
     226            0 :         // copy is equivalent to download + upload
     227            0 :         self.attempt(RemoteOp::Download(from.clone()))?;
     228            0 :         self.attempt(RemoteOp::Upload(to.clone()))?;
     229            0 :         self.inner.copy_object(from, to, cancel).await
     230            0 :     }
     231              : 
     232            0 :     async fn time_travel_recover(
     233            0 :         &self,
     234            0 :         prefix: Option<&RemotePath>,
     235            0 :         timestamp: SystemTime,
     236            0 :         done_if_after: SystemTime,
     237            0 :         cancel: &CancellationToken,
     238            0 :     ) -> Result<(), TimeTravelError> {
     239            0 :         self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))
     240            0 :             .map_err(TimeTravelError::Other)?;
     241            0 :         self.inner
     242            0 :             .time_travel_recover(prefix, timestamp, done_if_after, cancel)
     243            0 :             .await
     244            0 :     }
     245              : }
        

Generated by: LCOV version 2.1-beta