Line data Source code
1 : //! This module provides a wrapper around a real RemoteStorage implementation that
2 : //! causes the first N attempts at each upload or download operatio to fail. For
3 : //! testing purposes.
4 : use rand::Rng;
5 : use std::cmp;
6 : use std::collections::HashMap;
7 : use std::collections::hash_map::Entry;
8 : use std::num::NonZeroU32;
9 : use std::sync::{Arc, Mutex};
10 : use std::time::SystemTime;
11 :
12 : use bytes::Bytes;
13 : use futures::StreamExt;
14 : use futures::stream::Stream;
15 : use tokio_util::sync::CancellationToken;
16 :
17 : use crate::{
18 : Download, DownloadError, DownloadOpts, GenericRemoteStorage, Listing, ListingMode, RemotePath,
19 : RemoteStorage, StorageMetadata, TimeTravelError,
20 : };
21 :
22 : pub struct UnreliableWrapper {
23 : inner: GenericRemoteStorage<Arc<VoidStorage>>,
24 :
25 : // This many attempts of each operation will fail, then we let it succeed.
26 : attempts_to_fail: u64,
27 :
28 : // Tracks how many failed attempts of each operation has been made.
29 : attempts: Mutex<HashMap<RemoteOp, u64>>,
30 :
31 : /* BEGIN_HADRON */
32 : // This the probability of failure for each operation, ranged from [0, 100].
33 : // The probability is default to 100, which means that all operations will fail.
34 : // Storage will fail by probability up to attempts_to_fail times.
35 : attempt_failure_probability: u64,
36 : /* END_HADRON */
37 : }
38 :
39 : /// Used to identify retries of different unique operation.
40 : #[derive(Debug, Hash, Eq, PartialEq)]
41 : enum RemoteOp {
42 : ListPrefixes(Option<RemotePath>),
43 : HeadObject(RemotePath),
44 : Upload(RemotePath),
45 : Download(RemotePath),
46 : Delete(RemotePath),
47 : DeleteObjects(Vec<RemotePath>),
48 : TimeTravelRecover(Option<RemotePath>),
49 : }
50 :
51 : impl UnreliableWrapper {
52 2 : pub fn new(
53 2 : inner: crate::GenericRemoteStorage,
54 2 : attempts_to_fail: u64,
55 2 : attempt_failure_probability: u64,
56 2 : ) -> Self {
57 2 : assert!(attempts_to_fail > 0);
58 2 : let inner = match inner {
59 0 : GenericRemoteStorage::AwsS3(s) => GenericRemoteStorage::AwsS3(s),
60 0 : GenericRemoteStorage::AzureBlob(s) => GenericRemoteStorage::AzureBlob(s),
61 2 : GenericRemoteStorage::LocalFs(s) => GenericRemoteStorage::LocalFs(s),
62 : // We could also make this a no-op, as in, extract the inner of the passed generic remote storage
63 0 : GenericRemoteStorage::Unreliable(_s) => {
64 0 : panic!("Can't wrap unreliable wrapper unreliably")
65 : }
66 : };
67 2 : let actual_attempt_failure_probability = cmp::min(attempt_failure_probability, 100);
68 2 : UnreliableWrapper {
69 2 : inner,
70 2 : attempts_to_fail,
71 2 : attempt_failure_probability: actual_attempt_failure_probability,
72 2 : attempts: Mutex::new(HashMap::new()),
73 2 : }
74 2 : }
75 :
76 : ///
77 : /// Common functionality for all operations.
78 : ///
79 : /// On the first attempts of this operation, return an error. After 'attempts_to_fail'
80 : /// attempts, let the operation go ahead, and clear the counter.
81 : ///
82 24 : fn attempt(&self, op: RemoteOp) -> anyhow::Result<u64> {
83 24 : let mut attempts = self.attempts.lock().unwrap();
84 24 : let mut rng = rand::thread_rng();
85 :
86 24 : match attempts.entry(op) {
87 12 : Entry::Occupied(mut e) => {
88 12 : let attempts_before_this = {
89 12 : let p = e.get_mut();
90 12 : *p += 1;
91 12 : *p
92 : };
93 :
94 : /* BEGIN_HADRON */
95 : // If there are more attempts to fail, fail the request by probability.
96 12 : if (attempts_before_this < self.attempts_to_fail)
97 0 : && (rng.gen_range(0..=100) < self.attempt_failure_probability)
98 : {
99 0 : let error =
100 0 : anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
101 0 : Err(error)
102 : } else {
103 12 : e.remove();
104 12 : Ok(attempts_before_this)
105 : }
106 : /* END_HADRON */
107 : }
108 12 : Entry::Vacant(e) => {
109 12 : let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
110 12 : e.insert(1);
111 12 : Err(error)
112 : }
113 : }
114 24 : }
115 :
116 0 : async fn delete_inner(
117 0 : &self,
118 0 : path: &RemotePath,
119 0 : attempt: bool,
120 0 : cancel: &CancellationToken,
121 0 : ) -> anyhow::Result<()> {
122 0 : if attempt {
123 0 : self.attempt(RemoteOp::Delete(path.clone()))?;
124 0 : }
125 0 : self.inner.delete(path, cancel).await
126 0 : }
127 : }
128 :
129 : // We never construct this, so the type is not important, just has to not be UnreliableWrapper and impl RemoteStorage.
130 : type VoidStorage = crate::LocalFs;
131 :
132 : impl RemoteStorage for UnreliableWrapper {
133 0 : fn list_streaming(
134 0 : &self,
135 0 : prefix: Option<&RemotePath>,
136 0 : mode: ListingMode,
137 0 : max_keys: Option<NonZeroU32>,
138 0 : cancel: &CancellationToken,
139 0 : ) -> impl Stream<Item = Result<Listing, DownloadError>> + Send {
140 0 : async_stream::stream! {
141 : self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
142 : .map_err(DownloadError::Other)?;
143 : let mut stream = self.inner
144 : .list_streaming(prefix, mode, max_keys, cancel);
145 : while let Some(item) = stream.next().await {
146 : yield item;
147 : }
148 : }
149 0 : }
150 0 : async fn list(
151 0 : &self,
152 0 : prefix: Option<&RemotePath>,
153 0 : mode: ListingMode,
154 0 : max_keys: Option<NonZeroU32>,
155 0 : cancel: &CancellationToken,
156 0 : ) -> Result<Listing, DownloadError> {
157 0 : self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
158 0 : .map_err(DownloadError::Other)?;
159 0 : self.inner.list(prefix, mode, max_keys, cancel).await
160 0 : }
161 :
162 0 : async fn list_versions(
163 0 : &self,
164 0 : prefix: Option<&RemotePath>,
165 0 : mode: ListingMode,
166 0 : max_keys: Option<NonZeroU32>,
167 0 : cancel: &CancellationToken,
168 0 : ) -> Result<crate::VersionListing, DownloadError> {
169 0 : self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
170 0 : .map_err(DownloadError::Other)?;
171 0 : self.inner
172 0 : .list_versions(prefix, mode, max_keys, cancel)
173 0 : .await
174 0 : }
175 :
176 0 : async fn head_object(
177 0 : &self,
178 0 : key: &RemotePath,
179 0 : cancel: &CancellationToken,
180 0 : ) -> Result<crate::ListingObject, DownloadError> {
181 0 : self.attempt(RemoteOp::HeadObject(key.clone()))
182 0 : .map_err(DownloadError::Other)?;
183 0 : self.inner.head_object(key, cancel).await
184 0 : }
185 :
186 24 : async fn upload(
187 24 : &self,
188 24 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
189 24 : // S3 PUT request requires the content length to be specified,
190 24 : // otherwise it starts to fail with the concurrent connection count increasing.
191 24 : data_size_bytes: usize,
192 24 : to: &RemotePath,
193 24 : metadata: Option<StorageMetadata>,
194 24 : cancel: &CancellationToken,
195 24 : ) -> anyhow::Result<()> {
196 24 : self.attempt(RemoteOp::Upload(to.clone()))?;
197 12 : self.inner
198 12 : .upload(data, data_size_bytes, to, metadata, cancel)
199 12 : .await
200 0 : }
201 :
202 0 : async fn download(
203 0 : &self,
204 0 : from: &RemotePath,
205 0 : opts: &DownloadOpts,
206 0 : cancel: &CancellationToken,
207 0 : ) -> Result<Download, DownloadError> {
208 : // Note: We treat any byte range as an "attempt" of the same operation.
209 : // We don't pay attention to the ranges. That's good enough for now.
210 0 : self.attempt(RemoteOp::Download(from.clone()))
211 0 : .map_err(DownloadError::Other)?;
212 0 : self.inner.download(from, opts, cancel).await
213 0 : }
214 :
215 0 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
216 0 : self.delete_inner(path, true, cancel).await
217 0 : }
218 :
219 0 : async fn delete_objects(
220 0 : &self,
221 0 : paths: &[RemotePath],
222 0 : cancel: &CancellationToken,
223 0 : ) -> anyhow::Result<()> {
224 0 : self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?;
225 0 : let mut error_counter = 0;
226 0 : for path in paths {
227 : // Dont record attempt because it was already recorded above
228 0 : if (self.delete_inner(path, false, cancel).await).is_err() {
229 0 : error_counter += 1;
230 0 : }
231 : }
232 0 : if error_counter > 0 {
233 0 : return Err(anyhow::anyhow!(
234 0 : "failed to delete {} objects",
235 0 : error_counter
236 0 : ));
237 0 : }
238 0 : Ok(())
239 0 : }
240 :
241 0 : fn max_keys_per_delete(&self) -> usize {
242 0 : self.inner.max_keys_per_delete()
243 0 : }
244 :
245 0 : async fn copy(
246 0 : &self,
247 0 : from: &RemotePath,
248 0 : to: &RemotePath,
249 0 : cancel: &CancellationToken,
250 0 : ) -> anyhow::Result<()> {
251 : // copy is equivalent to download + upload
252 0 : self.attempt(RemoteOp::Download(from.clone()))?;
253 0 : self.attempt(RemoteOp::Upload(to.clone()))?;
254 0 : self.inner.copy_object(from, to, cancel).await
255 0 : }
256 :
257 0 : async fn time_travel_recover(
258 0 : &self,
259 0 : prefix: Option<&RemotePath>,
260 0 : timestamp: SystemTime,
261 0 : done_if_after: SystemTime,
262 0 : cancel: &CancellationToken,
263 0 : complexity_limit: Option<NonZeroU32>,
264 0 : ) -> Result<(), TimeTravelError> {
265 0 : self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))
266 0 : .map_err(TimeTravelError::Other)?;
267 0 : self.inner
268 0 : .time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
269 0 : .await
270 0 : }
271 : }
|