Line data Source code
1 : //! This module provides a wrapper around a real RemoteStorage implementation that
2 : //! causes the first N attempts at each upload or download operatio to fail. For
3 : //! testing purposes.
4 : use std::collections::HashMap;
5 : use std::collections::hash_map::Entry;
6 : use std::num::NonZeroU32;
7 : use std::sync::{Arc, Mutex};
8 : use std::time::SystemTime;
9 :
10 : use bytes::Bytes;
11 : use futures::StreamExt;
12 : use futures::stream::Stream;
13 : use tokio_util::sync::CancellationToken;
14 :
15 : use crate::{
16 : Download, DownloadError, DownloadOpts, GenericRemoteStorage, Listing, ListingMode, RemotePath,
17 : RemoteStorage, StorageMetadata, TimeTravelError,
18 : };
19 :
20 : pub struct UnreliableWrapper {
21 : inner: GenericRemoteStorage<Arc<VoidStorage>>,
22 :
23 : // This many attempts of each operation will fail, then we let it succeed.
24 : attempts_to_fail: u64,
25 :
26 : // Tracks how many failed attempts of each operation has been made.
27 : attempts: Mutex<HashMap<RemoteOp, u64>>,
28 : }
29 :
30 : /// Used to identify retries of different unique operation.
31 : #[derive(Debug, Hash, Eq, PartialEq)]
32 : enum RemoteOp {
33 : ListPrefixes(Option<RemotePath>),
34 : HeadObject(RemotePath),
35 : Upload(RemotePath),
36 : Download(RemotePath),
37 : Delete(RemotePath),
38 : DeleteObjects(Vec<RemotePath>),
39 : TimeTravelRecover(Option<RemotePath>),
40 : }
41 :
42 : impl UnreliableWrapper {
43 2 : pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
44 2 : assert!(attempts_to_fail > 0);
45 2 : let inner = match inner {
46 0 : GenericRemoteStorage::AwsS3(s) => GenericRemoteStorage::AwsS3(s),
47 0 : GenericRemoteStorage::AzureBlob(s) => GenericRemoteStorage::AzureBlob(s),
48 2 : GenericRemoteStorage::LocalFs(s) => GenericRemoteStorage::LocalFs(s),
49 : // We could also make this a no-op, as in, extract the inner of the passed generic remote storage
50 0 : GenericRemoteStorage::Unreliable(_s) => {
51 0 : panic!("Can't wrap unreliable wrapper unreliably")
52 : }
53 : };
54 2 : UnreliableWrapper {
55 2 : inner,
56 2 : attempts_to_fail,
57 2 : attempts: Mutex::new(HashMap::new()),
58 2 : }
59 2 : }
60 :
61 : ///
62 : /// Common functionality for all operations.
63 : ///
64 : /// On the first attempts of this operation, return an error. After 'attempts_to_fail'
65 : /// attempts, let the operation go ahead, and clear the counter.
66 : ///
67 24 : fn attempt(&self, op: RemoteOp) -> anyhow::Result<u64> {
68 24 : let mut attempts = self.attempts.lock().unwrap();
69 24 :
70 24 : match attempts.entry(op) {
71 12 : Entry::Occupied(mut e) => {
72 12 : let attempts_before_this = {
73 12 : let p = e.get_mut();
74 12 : *p += 1;
75 12 : *p
76 12 : };
77 12 :
78 12 : if attempts_before_this >= self.attempts_to_fail {
79 : // let it succeed
80 12 : e.remove();
81 12 : Ok(attempts_before_this)
82 : } else {
83 0 : let error =
84 0 : anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
85 0 : Err(error)
86 : }
87 : }
88 12 : Entry::Vacant(e) => {
89 12 : let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
90 12 : e.insert(1);
91 12 : Err(error)
92 : }
93 : }
94 24 : }
95 :
96 0 : async fn delete_inner(
97 0 : &self,
98 0 : path: &RemotePath,
99 0 : attempt: bool,
100 0 : cancel: &CancellationToken,
101 0 : ) -> anyhow::Result<()> {
102 0 : if attempt {
103 0 : self.attempt(RemoteOp::Delete(path.clone()))?;
104 0 : }
105 0 : self.inner.delete(path, cancel).await
106 0 : }
107 : }
108 :
109 : // We never construct this, so the type is not important, just has to not be UnreliableWrapper and impl RemoteStorage.
110 : type VoidStorage = crate::LocalFs;
111 :
112 : impl RemoteStorage for UnreliableWrapper {
113 0 : fn list_streaming(
114 0 : &self,
115 0 : prefix: Option<&RemotePath>,
116 0 : mode: ListingMode,
117 0 : max_keys: Option<NonZeroU32>,
118 0 : cancel: &CancellationToken,
119 0 : ) -> impl Stream<Item = Result<Listing, DownloadError>> + Send {
120 0 : async_stream::stream! {
121 0 : self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
122 0 : .map_err(DownloadError::Other)?;
123 0 : let mut stream = self.inner
124 0 : .list_streaming(prefix, mode, max_keys, cancel);
125 0 : while let Some(item) = stream.next().await {
126 0 : yield item;
127 0 : }
128 0 : }
129 0 : }
130 0 : async fn list(
131 0 : &self,
132 0 : prefix: Option<&RemotePath>,
133 0 : mode: ListingMode,
134 0 : max_keys: Option<NonZeroU32>,
135 0 : cancel: &CancellationToken,
136 0 : ) -> Result<Listing, DownloadError> {
137 0 : self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
138 0 : .map_err(DownloadError::Other)?;
139 0 : self.inner.list(prefix, mode, max_keys, cancel).await
140 0 : }
141 :
142 0 : async fn list_versions(
143 0 : &self,
144 0 : prefix: Option<&RemotePath>,
145 0 : mode: ListingMode,
146 0 : max_keys: Option<NonZeroU32>,
147 0 : cancel: &CancellationToken,
148 0 : ) -> Result<crate::VersionListing, DownloadError> {
149 0 : self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
150 0 : .map_err(DownloadError::Other)?;
151 0 : self.inner
152 0 : .list_versions(prefix, mode, max_keys, cancel)
153 0 : .await
154 0 : }
155 :
156 0 : async fn head_object(
157 0 : &self,
158 0 : key: &RemotePath,
159 0 : cancel: &CancellationToken,
160 0 : ) -> Result<crate::ListingObject, DownloadError> {
161 0 : self.attempt(RemoteOp::HeadObject(key.clone()))
162 0 : .map_err(DownloadError::Other)?;
163 0 : self.inner.head_object(key, cancel).await
164 0 : }
165 :
166 24 : async fn upload(
167 24 : &self,
168 24 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
169 24 : // S3 PUT request requires the content length to be specified,
170 24 : // otherwise it starts to fail with the concurrent connection count increasing.
171 24 : data_size_bytes: usize,
172 24 : to: &RemotePath,
173 24 : metadata: Option<StorageMetadata>,
174 24 : cancel: &CancellationToken,
175 24 : ) -> anyhow::Result<()> {
176 24 : self.attempt(RemoteOp::Upload(to.clone()))?;
177 12 : self.inner
178 12 : .upload(data, data_size_bytes, to, metadata, cancel)
179 12 : .await
180 0 : }
181 :
182 0 : async fn download(
183 0 : &self,
184 0 : from: &RemotePath,
185 0 : opts: &DownloadOpts,
186 0 : cancel: &CancellationToken,
187 0 : ) -> Result<Download, DownloadError> {
188 0 : // Note: We treat any byte range as an "attempt" of the same operation.
189 0 : // We don't pay attention to the ranges. That's good enough for now.
190 0 : self.attempt(RemoteOp::Download(from.clone()))
191 0 : .map_err(DownloadError::Other)?;
192 0 : self.inner.download(from, opts, cancel).await
193 0 : }
194 :
195 0 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
196 0 : self.delete_inner(path, true, cancel).await
197 0 : }
198 :
199 0 : async fn delete_objects(
200 0 : &self,
201 0 : paths: &[RemotePath],
202 0 : cancel: &CancellationToken,
203 0 : ) -> anyhow::Result<()> {
204 0 : self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?;
205 0 : let mut error_counter = 0;
206 0 : for path in paths {
207 : // Dont record attempt because it was already recorded above
208 0 : if (self.delete_inner(path, false, cancel).await).is_err() {
209 0 : error_counter += 1;
210 0 : }
211 : }
212 0 : if error_counter > 0 {
213 0 : return Err(anyhow::anyhow!(
214 0 : "failed to delete {} objects",
215 0 : error_counter
216 0 : ));
217 0 : }
218 0 : Ok(())
219 0 : }
220 :
221 0 : fn max_keys_per_delete(&self) -> usize {
222 0 : self.inner.max_keys_per_delete()
223 0 : }
224 :
225 0 : async fn copy(
226 0 : &self,
227 0 : from: &RemotePath,
228 0 : to: &RemotePath,
229 0 : cancel: &CancellationToken,
230 0 : ) -> anyhow::Result<()> {
231 0 : // copy is equivalent to download + upload
232 0 : self.attempt(RemoteOp::Download(from.clone()))?;
233 0 : self.attempt(RemoteOp::Upload(to.clone()))?;
234 0 : self.inner.copy_object(from, to, cancel).await
235 0 : }
236 :
237 0 : async fn time_travel_recover(
238 0 : &self,
239 0 : prefix: Option<&RemotePath>,
240 0 : timestamp: SystemTime,
241 0 : done_if_after: SystemTime,
242 0 : cancel: &CancellationToken,
243 0 : ) -> Result<(), TimeTravelError> {
244 0 : self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))
245 0 : .map_err(TimeTravelError::Other)?;
246 0 : self.inner
247 0 : .time_travel_recover(prefix, timestamp, done_if_after, cancel)
248 0 : .await
249 0 : }
250 : }
|