TLA Line data Source code
1 : //! This module provides a wrapper around a real RemoteStorage implementation that
2 : //! causes the first N attempts at each upload or download operatio to fail. For
3 : //! testing purposes.
4 : use std::collections::hash_map::Entry;
5 : use std::collections::HashMap;
6 : use std::sync::Mutex;
7 :
8 : use crate::{Download, DownloadError, RemotePath, RemoteStorage, StorageMetadata};
9 :
10 : pub struct UnreliableWrapper {
11 : inner: crate::GenericRemoteStorage,
12 :
13 : // This many attempts of each operation will fail, then we let it succeed.
14 : attempts_to_fail: u64,
15 :
16 : // Tracks how many failed attempts of each operation has been made.
17 : attempts: Mutex<HashMap<RemoteOp, u64>>,
18 : }
19 :
20 : /// Used to identify retries of different unique operation.
21 CBC 7421 : #[derive(Debug, Hash, Eq, PartialEq)]
22 : enum RemoteOp {
23 : ListPrefixes(Option<RemotePath>),
24 : Upload(RemotePath),
25 : Download(RemotePath),
26 : Delete(RemotePath),
27 : DeleteObjects(Vec<RemotePath>),
28 : }
29 :
30 : impl UnreliableWrapper {
31 65 : pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
32 65 : assert!(attempts_to_fail > 0);
33 65 : UnreliableWrapper {
34 65 : inner,
35 65 : attempts_to_fail,
36 65 : attempts: Mutex::new(HashMap::new()),
37 65 : }
38 65 : }
39 :
40 : ///
41 : /// Common functionality for all operations.
42 : ///
43 : /// On the first attempts of this operation, return an error. After 'attempts_to_fail'
44 : /// attempts, let the operation go ahead, and clear the counter.
45 : ///
46 6127 : fn attempt(&self, op: RemoteOp) -> Result<u64, DownloadError> {
47 6127 : let mut attempts = self.attempts.lock().unwrap();
48 6127 :
49 6127 : match attempts.entry(op) {
50 2996 : Entry::Occupied(mut e) => {
51 2996 : let attempts_before_this = {
52 2996 : let p = e.get_mut();
53 2996 : *p += 1;
54 2996 : *p
55 2996 : };
56 2996 :
57 2996 : if attempts_before_this >= self.attempts_to_fail {
58 : // let it succeed
59 2996 : e.remove();
60 2996 : Ok(attempts_before_this)
61 : } else {
62 UBC 0 : let error =
63 0 : anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
64 0 : Err(DownloadError::Other(error))
65 : }
66 : }
67 CBC 3131 : Entry::Vacant(e) => {
68 3131 : let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
69 3131 : e.insert(1);
70 3131 : Err(DownloadError::Other(error))
71 : }
72 : }
73 6127 : }
74 :
75 1973 : async fn delete_inner(&self, path: &RemotePath, attempt: bool) -> anyhow::Result<()> {
76 1973 : if attempt {
77 72 : self.attempt(RemoteOp::Delete(path.clone()))?;
78 1901 : }
79 7777 : self.inner.delete(path).await
80 1973 : }
81 : }
82 :
83 : #[async_trait::async_trait]
84 : impl RemoteStorage for UnreliableWrapper {
85 18 : async fn list_prefixes(
86 18 : &self,
87 18 : prefix: Option<&RemotePath>,
88 18 : ) -> Result<Vec<RemotePath>, DownloadError> {
89 18 : self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
90 27 : self.inner.list_prefixes(prefix).await
91 36 : }
92 :
93 134 : async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
94 134 : self.attempt(RemoteOp::ListPrefixes(folder.cloned()))?;
95 272 : self.inner.list_files(folder).await
96 268 : }
97 :
98 5335 : async fn upload(
99 5335 : &self,
100 5335 : data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
101 5335 : // S3 PUT request requires the content length to be specified,
102 5335 : // otherwise it starts to fail with the concurrent connection count increasing.
103 5335 : data_size_bytes: usize,
104 5335 : to: &RemotePath,
105 5335 : metadata: Option<StorageMetadata>,
106 5335 : ) -> anyhow::Result<()> {
107 5335 : self.attempt(RemoteOp::Upload(to.clone()))?;
108 43379 : self.inner.upload(data, data_size_bytes, to, metadata).await
109 10670 : }
110 :
111 286 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
112 286 : self.attempt(RemoteOp::Download(from.clone()))?;
113 430 : self.inner.download(from).await
114 572 : }
115 :
116 UBC 0 : async fn download_byte_range(
117 0 : &self,
118 0 : from: &RemotePath,
119 0 : start_inclusive: u64,
120 0 : end_exclusive: Option<u64>,
121 0 : ) -> Result<Download, DownloadError> {
122 : // Note: We treat any download_byte_range as an "attempt" of the same
123 : // operation. We don't pay attention to the ranges. That's good enough
124 : // for now.
125 0 : self.attempt(RemoteOp::Download(from.clone()))?;
126 0 : self.inner
127 0 : .download_byte_range(from, start_inclusive, end_exclusive)
128 0 : .await
129 0 : }
130 :
131 CBC 72 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
132 146 : self.delete_inner(path, true).await
133 144 : }
134 :
135 282 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
136 282 : self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?;
137 141 : let mut error_counter = 0;
138 2042 : for path in paths {
139 : // Dont record attempt because it was already recorded above
140 7631 : if (self.delete_inner(path, false).await).is_err() {
141 UBC 0 : error_counter += 1;
142 CBC 1901 : }
143 : }
144 141 : if error_counter > 0 {
145 UBC 0 : return Err(anyhow::anyhow!(
146 0 : "failed to delete {} objects",
147 0 : error_counter
148 0 : ));
149 CBC 141 : }
150 141 : Ok(())
151 564 : }
152 : }
|