Line data Source code
1 : //! This module provides a wrapper around a real RemoteStorage implementation that
2 : //! causes the first N attempts at each upload or download operatio to fail. For
3 : //! testing purposes.
4 : use bytes::Bytes;
5 : use futures::stream::Stream;
6 : use futures::StreamExt;
7 : use std::collections::HashMap;
8 : use std::num::NonZeroU32;
9 : use std::sync::Mutex;
10 : use std::time::SystemTime;
11 : use std::{collections::hash_map::Entry, sync::Arc};
12 : use tokio_util::sync::CancellationToken;
13 :
14 : use crate::{
15 : Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage,
16 : StorageMetadata, TimeTravelError,
17 : };
18 :
19 : pub struct UnreliableWrapper {
20 : inner: GenericRemoteStorage<Arc<VoidStorage>>,
21 :
22 : // This many attempts of each operation will fail, then we let it succeed.
23 : attempts_to_fail: u64,
24 :
25 : // Tracks how many failed attempts of each operation has been made.
26 : attempts: Mutex<HashMap<RemoteOp, u64>>,
27 : }
28 :
29 : /// Used to identify retries of different unique operation.
30 : #[derive(Debug, Hash, Eq, PartialEq)]
31 : enum RemoteOp {
32 : ListPrefixes(Option<RemotePath>),
33 : Upload(RemotePath),
34 : Download(RemotePath),
35 : Delete(RemotePath),
36 : DeleteObjects(Vec<RemotePath>),
37 : TimeTravelRecover(Option<RemotePath>),
38 : }
39 :
40 : impl UnreliableWrapper {
41 4 : pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
42 4 : assert!(attempts_to_fail > 0);
43 4 : let inner = match inner {
44 0 : GenericRemoteStorage::AwsS3(s) => GenericRemoteStorage::AwsS3(s),
45 0 : GenericRemoteStorage::AzureBlob(s) => GenericRemoteStorage::AzureBlob(s),
46 4 : GenericRemoteStorage::LocalFs(s) => GenericRemoteStorage::LocalFs(s),
47 : // We could also make this a no-op, as in, extract the inner of the passed generic remote storage
48 0 : GenericRemoteStorage::Unreliable(_s) => {
49 0 : panic!("Can't wrap unreliable wrapper unreliably")
50 : }
51 : };
52 4 : UnreliableWrapper {
53 4 : inner,
54 4 : attempts_to_fail,
55 4 : attempts: Mutex::new(HashMap::new()),
56 4 : }
57 4 : }
58 :
59 : ///
60 : /// Common functionality for all operations.
61 : ///
62 : /// On the first attempts of this operation, return an error. After 'attempts_to_fail'
63 : /// attempts, let the operation go ahead, and clear the counter.
64 : ///
65 48 : fn attempt(&self, op: RemoteOp) -> anyhow::Result<u64> {
66 48 : let mut attempts = self.attempts.lock().unwrap();
67 48 :
68 48 : match attempts.entry(op) {
69 24 : Entry::Occupied(mut e) => {
70 24 : let attempts_before_this = {
71 24 : let p = e.get_mut();
72 24 : *p += 1;
73 24 : *p
74 24 : };
75 24 :
76 24 : if attempts_before_this >= self.attempts_to_fail {
77 : // let it succeed
78 24 : e.remove();
79 24 : Ok(attempts_before_this)
80 : } else {
81 0 : let error =
82 0 : anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
83 0 : Err(error)
84 : }
85 : }
86 24 : Entry::Vacant(e) => {
87 24 : let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
88 24 : e.insert(1);
89 24 : Err(error)
90 : }
91 : }
92 48 : }
93 :
94 0 : async fn delete_inner(
95 0 : &self,
96 0 : path: &RemotePath,
97 0 : attempt: bool,
98 0 : cancel: &CancellationToken,
99 0 : ) -> anyhow::Result<()> {
100 0 : if attempt {
101 0 : self.attempt(RemoteOp::Delete(path.clone()))?;
102 0 : }
103 0 : self.inner.delete(path, cancel).await
104 0 : }
105 : }
106 :
107 : // We never construct this, so the type is not important, just has to not be UnreliableWrapper and impl RemoteStorage.
108 : type VoidStorage = crate::LocalFs;
109 :
110 : impl RemoteStorage for UnreliableWrapper {
111 0 : fn list_streaming(
112 0 : &self,
113 0 : prefix: Option<&RemotePath>,
114 0 : mode: ListingMode,
115 0 : max_keys: Option<NonZeroU32>,
116 0 : cancel: &CancellationToken,
117 0 : ) -> impl Stream<Item = Result<Listing, DownloadError>> + Send {
118 : async_stream::stream! {
119 : self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
120 : .map_err(DownloadError::Other)?;
121 : let mut stream = self.inner
122 : .list_streaming(prefix, mode, max_keys, cancel);
123 : while let Some(item) = stream.next().await {
124 : yield item;
125 : }
126 : }
127 0 : }
128 0 : async fn list(
129 0 : &self,
130 0 : prefix: Option<&RemotePath>,
131 0 : mode: ListingMode,
132 0 : max_keys: Option<NonZeroU32>,
133 0 : cancel: &CancellationToken,
134 0 : ) -> Result<Listing, DownloadError> {
135 0 : self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
136 0 : .map_err(DownloadError::Other)?;
137 0 : self.inner.list(prefix, mode, max_keys, cancel).await
138 0 : }
139 :
140 48 : async fn upload(
141 48 : &self,
142 48 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
143 48 : // S3 PUT request requires the content length to be specified,
144 48 : // otherwise it starts to fail with the concurrent connection count increasing.
145 48 : data_size_bytes: usize,
146 48 : to: &RemotePath,
147 48 : metadata: Option<StorageMetadata>,
148 48 : cancel: &CancellationToken,
149 48 : ) -> anyhow::Result<()> {
150 48 : self.attempt(RemoteOp::Upload(to.clone()))?;
151 24 : self.inner
152 24 : .upload(data, data_size_bytes, to, metadata, cancel)
153 76 : .await
154 48 : }
155 :
156 0 : async fn download(
157 0 : &self,
158 0 : from: &RemotePath,
159 0 : cancel: &CancellationToken,
160 0 : ) -> Result<Download, DownloadError> {
161 0 : self.attempt(RemoteOp::Download(from.clone()))
162 0 : .map_err(DownloadError::Other)?;
163 0 : self.inner.download(from, cancel).await
164 0 : }
165 :
166 0 : async fn download_byte_range(
167 0 : &self,
168 0 : from: &RemotePath,
169 0 : start_inclusive: u64,
170 0 : end_exclusive: Option<u64>,
171 0 : cancel: &CancellationToken,
172 0 : ) -> Result<Download, DownloadError> {
173 0 : // Note: We treat any download_byte_range as an "attempt" of the same
174 0 : // operation. We don't pay attention to the ranges. That's good enough
175 0 : // for now.
176 0 : self.attempt(RemoteOp::Download(from.clone()))
177 0 : .map_err(DownloadError::Other)?;
178 0 : self.inner
179 0 : .download_byte_range(from, start_inclusive, end_exclusive, cancel)
180 0 : .await
181 0 : }
182 :
183 0 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
184 0 : self.delete_inner(path, true, cancel).await
185 0 : }
186 :
187 0 : async fn delete_objects<'a>(
188 0 : &self,
189 0 : paths: &'a [RemotePath],
190 0 : cancel: &CancellationToken,
191 0 : ) -> anyhow::Result<()> {
192 0 : self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?;
193 0 : let mut error_counter = 0;
194 0 : for path in paths {
195 : // Dont record attempt because it was already recorded above
196 0 : if (self.delete_inner(path, false, cancel).await).is_err() {
197 0 : error_counter += 1;
198 0 : }
199 : }
200 0 : if error_counter > 0 {
201 0 : return Err(anyhow::anyhow!(
202 0 : "failed to delete {} objects",
203 0 : error_counter
204 0 : ));
205 0 : }
206 0 : Ok(())
207 0 : }
208 :
209 0 : async fn copy(
210 0 : &self,
211 0 : from: &RemotePath,
212 0 : to: &RemotePath,
213 0 : cancel: &CancellationToken,
214 0 : ) -> anyhow::Result<()> {
215 0 : // copy is equivalent to download + upload
216 0 : self.attempt(RemoteOp::Download(from.clone()))?;
217 0 : self.attempt(RemoteOp::Upload(to.clone()))?;
218 0 : self.inner.copy_object(from, to, cancel).await
219 0 : }
220 :
221 0 : async fn time_travel_recover(
222 0 : &self,
223 0 : prefix: Option<&RemotePath>,
224 0 : timestamp: SystemTime,
225 0 : done_if_after: SystemTime,
226 0 : cancel: &CancellationToken,
227 0 : ) -> Result<(), TimeTravelError> {
228 0 : self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))
229 0 : .map_err(TimeTravelError::Other)?;
230 0 : self.inner
231 0 : .time_travel_recover(prefix, timestamp, done_if_after, cancel)
232 0 : .await
233 0 : }
234 : }
|