Line data Source code
1 : //! This module provides a wrapper around a real RemoteStorage implementation that
2 : //! causes the first N attempts at each upload or download operatio to fail. For
3 : //! testing purposes.
4 : use bytes::Bytes;
5 : use futures::stream::Stream;
6 : use std::collections::HashMap;
7 : use std::num::NonZeroU32;
8 : use std::sync::Mutex;
9 : use std::time::SystemTime;
10 : use std::{collections::hash_map::Entry, sync::Arc};
11 : use tokio_util::sync::CancellationToken;
12 :
13 : use crate::{
14 : Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage,
15 : StorageMetadata, TimeTravelError,
16 : };
17 :
18 : pub struct UnreliableWrapper {
19 : inner: GenericRemoteStorage<Arc<VoidStorage>>,
20 :
21 : // This many attempts of each operation will fail, then we let it succeed.
22 : attempts_to_fail: u64,
23 :
24 : // Tracks how many failed attempts of each operation has been made.
25 : attempts: Mutex<HashMap<RemoteOp, u64>>,
26 : }
27 :
28 : /// Used to identify retries of different unique operation.
29 6869 : #[derive(Debug, Hash, Eq, PartialEq)]
30 : enum RemoteOp {
31 : ListPrefixes(Option<RemotePath>),
32 : Upload(RemotePath),
33 : Download(RemotePath),
34 : Delete(RemotePath),
35 : DeleteObjects(Vec<RemotePath>),
36 : TimeTravelRecover(Option<RemotePath>),
37 : }
38 :
39 : impl UnreliableWrapper {
40 67 : pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
41 67 : assert!(attempts_to_fail > 0);
42 67 : let inner = match inner {
43 52 : GenericRemoteStorage::AwsS3(s) => GenericRemoteStorage::AwsS3(s),
44 0 : GenericRemoteStorage::AzureBlob(s) => GenericRemoteStorage::AzureBlob(s),
45 15 : GenericRemoteStorage::LocalFs(s) => GenericRemoteStorage::LocalFs(s),
46 : // We could also make this a no-op, as in, extract the inner of the passed generic remote storage
47 0 : GenericRemoteStorage::Unreliable(_s) => {
48 0 : panic!("Can't wrap unreliable wrapper unreliably")
49 : }
50 : };
51 67 : UnreliableWrapper {
52 67 : inner,
53 67 : attempts_to_fail,
54 67 : attempts: Mutex::new(HashMap::new()),
55 67 : }
56 67 : }
57 :
58 : ///
59 : /// Common functionality for all operations.
60 : ///
61 : /// On the first attempts of this operation, return an error. After 'attempts_to_fail'
62 : /// attempts, let the operation go ahead, and clear the counter.
63 : ///
64 6094 : fn attempt(&self, op: RemoteOp) -> anyhow::Result<u64> {
65 6094 : let mut attempts = self.attempts.lock().unwrap();
66 6094 :
67 6094 : match attempts.entry(op) {
68 3024 : Entry::Occupied(mut e) => {
69 3024 : let attempts_before_this = {
70 3024 : let p = e.get_mut();
71 3024 : *p += 1;
72 3024 : *p
73 3024 : };
74 3024 :
75 3024 : if attempts_before_this >= self.attempts_to_fail {
76 : // let it succeed
77 3024 : e.remove();
78 3024 : Ok(attempts_before_this)
79 : } else {
80 0 : let error =
81 0 : anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
82 0 : Err(error)
83 : }
84 : }
85 3070 : Entry::Vacant(e) => {
86 3070 : let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
87 3070 : e.insert(1);
88 3070 : Err(error)
89 : }
90 : }
91 6094 : }
92 :
93 2011 : async fn delete_inner(&self, path: &RemotePath, attempt: bool) -> anyhow::Result<()> {
94 2011 : if attempt {
95 68 : self.attempt(RemoteOp::Delete(path.clone()))?;
96 1943 : }
97 8123 : self.inner.delete(path).await
98 2011 : }
99 : }
100 :
101 : // We never construct this, so the type is not important, just has to not be UnreliableWrapper and impl RemoteStorage.
102 : type VoidStorage = crate::LocalFs;
103 :
104 : impl RemoteStorage for UnreliableWrapper {
105 0 : async fn list_prefixes(
106 0 : &self,
107 0 : prefix: Option<&RemotePath>,
108 0 : ) -> Result<Vec<RemotePath>, DownloadError> {
109 0 : self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
110 0 : .map_err(DownloadError::Other)?;
111 0 : self.inner.list_prefixes(prefix).await
112 0 : }
113 :
114 128 : async fn list_files(
115 128 : &self,
116 128 : folder: Option<&RemotePath>,
117 128 : max_keys: Option<NonZeroU32>,
118 128 : ) -> Result<Vec<RemotePath>, DownloadError> {
119 128 : self.attempt(RemoteOp::ListPrefixes(folder.cloned()))
120 128 : .map_err(DownloadError::Other)?;
121 262 : self.inner.list_files(folder, max_keys).await
122 128 : }
123 :
124 126 : async fn list(
125 126 : &self,
126 126 : prefix: Option<&RemotePath>,
127 126 : mode: ListingMode,
128 126 : max_keys: Option<NonZeroU32>,
129 126 : ) -> Result<Listing, DownloadError> {
130 126 : self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
131 126 : .map_err(DownloadError::Other)?;
132 224 : self.inner.list(prefix, mode, max_keys).await
133 126 : }
134 :
135 5382 : async fn upload(
136 5382 : &self,
137 5382 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
138 5382 : // S3 PUT request requires the content length to be specified,
139 5382 : // otherwise it starts to fail with the concurrent connection count increasing.
140 5382 : data_size_bytes: usize,
141 5382 : to: &RemotePath,
142 5382 : metadata: Option<StorageMetadata>,
143 5382 : ) -> anyhow::Result<()> {
144 5382 : self.attempt(RemoteOp::Upload(to.clone()))?;
145 20183 : self.inner.upload(data, data_size_bytes, to, metadata).await
146 5382 : }
147 :
148 162 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
149 162 : self.attempt(RemoteOp::Download(from.clone()))
150 162 : .map_err(DownloadError::Other)?;
151 184 : self.inner.download(from).await
152 162 : }
153 :
154 0 : async fn download_byte_range(
155 0 : &self,
156 0 : from: &RemotePath,
157 0 : start_inclusive: u64,
158 0 : end_exclusive: Option<u64>,
159 0 : ) -> Result<Download, DownloadError> {
160 0 : // Note: We treat any download_byte_range as an "attempt" of the same
161 0 : // operation. We don't pay attention to the ranges. That's good enough
162 0 : // for now.
163 0 : self.attempt(RemoteOp::Download(from.clone()))
164 0 : .map_err(DownloadError::Other)?;
165 0 : self.inner
166 0 : .download_byte_range(from, start_inclusive, end_exclusive)
167 0 : .await
168 0 : }
169 :
170 68 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
171 142 : self.delete_inner(path, true).await
172 68 : }
173 :
174 228 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
175 228 : self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?;
176 114 : let mut error_counter = 0;
177 2057 : for path in paths {
178 : // Dont record attempt because it was already recorded above
179 7981 : if (self.delete_inner(path, false).await).is_err() {
180 0 : error_counter += 1;
181 1943 : }
182 : }
183 114 : if error_counter > 0 {
184 0 : return Err(anyhow::anyhow!(
185 0 : "failed to delete {} objects",
186 0 : error_counter
187 0 : ));
188 114 : }
189 114 : Ok(())
190 228 : }
191 :
192 0 : async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
193 0 : // copy is equivalent to download + upload
194 0 : self.attempt(RemoteOp::Download(from.clone()))?;
195 0 : self.attempt(RemoteOp::Upload(to.clone()))?;
196 0 : self.inner.copy_object(from, to).await
197 0 : }
198 :
199 0 : async fn time_travel_recover(
200 0 : &self,
201 0 : prefix: Option<&RemotePath>,
202 0 : timestamp: SystemTime,
203 0 : done_if_after: SystemTime,
204 0 : cancel: &CancellationToken,
205 0 : ) -> Result<(), TimeTravelError> {
206 0 : self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))
207 0 : .map_err(TimeTravelError::Other)?;
208 0 : self.inner
209 0 : .time_travel_recover(prefix, timestamp, done_if_after, cancel)
210 0 : .await
211 0 : }
212 : }
|