Line data Source code
1 : //! This module provides a wrapper around a real RemoteStorage implementation that
2 : //! causes the first N attempts at each upload or download operatio to fail. For
3 : //! testing purposes.
4 : use bytes::Bytes;
5 : use futures::stream::Stream;
6 : use std::collections::HashMap;
7 : use std::sync::Mutex;
8 : use std::time::SystemTime;
9 : use std::{collections::hash_map::Entry, sync::Arc};
10 : use tokio_util::sync::CancellationToken;
11 :
12 : use crate::{
13 : Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage,
14 : StorageMetadata, TimeTravelError,
15 : };
16 :
17 : pub struct UnreliableWrapper {
18 : inner: GenericRemoteStorage<Arc<VoidStorage>>,
19 :
20 : // This many attempts of each operation will fail, then we let it succeed.
21 : attempts_to_fail: u64,
22 :
23 : // Tracks how many failed attempts of each operation has been made.
24 : attempts: Mutex<HashMap<RemoteOp, u64>>,
25 : }
26 :
27 : /// Used to identify retries of different unique operation.
28 7313 : #[derive(Debug, Hash, Eq, PartialEq)]
29 : enum RemoteOp {
30 : ListPrefixes(Option<RemotePath>),
31 : Upload(RemotePath),
32 : Download(RemotePath),
33 : Delete(RemotePath),
34 : DeleteObjects(Vec<RemotePath>),
35 : TimeTravelRecover(Option<RemotePath>),
36 : }
37 :
38 : impl UnreliableWrapper {
39 67 : pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
40 67 : assert!(attempts_to_fail > 0);
41 67 : let inner = match inner {
42 52 : GenericRemoteStorage::AwsS3(s) => GenericRemoteStorage::AwsS3(s),
43 0 : GenericRemoteStorage::AzureBlob(s) => GenericRemoteStorage::AzureBlob(s),
44 15 : GenericRemoteStorage::LocalFs(s) => GenericRemoteStorage::LocalFs(s),
45 : // We could also make this a no-op, as in, extract the inner of the passed generic remote storage
46 0 : GenericRemoteStorage::Unreliable(_s) => {
47 0 : panic!("Can't wrap unreliable wrapper unreliably")
48 : }
49 : };
50 67 : UnreliableWrapper {
51 67 : inner,
52 67 : attempts_to_fail,
53 67 : attempts: Mutex::new(HashMap::new()),
54 67 : }
55 67 : }
56 :
57 : ///
58 : /// Common functionality for all operations.
59 : ///
60 : /// On the first attempts of this operation, return an error. After 'attempts_to_fail'
61 : /// attempts, let the operation go ahead, and clear the counter.
62 : ///
63 6021 : fn attempt(&self, op: RemoteOp) -> Result<u64, DownloadError> {
64 6021 : let mut attempts = self.attempts.lock().unwrap();
65 6021 :
66 6021 : match attempts.entry(op) {
67 2959 : Entry::Occupied(mut e) => {
68 2959 : let attempts_before_this = {
69 2959 : let p = e.get_mut();
70 2959 : *p += 1;
71 2959 : *p
72 2959 : };
73 2959 :
74 2959 : if attempts_before_this >= self.attempts_to_fail {
75 : // let it succeed
76 2959 : e.remove();
77 2959 : Ok(attempts_before_this)
78 : } else {
79 0 : let error =
80 0 : anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
81 0 : Err(DownloadError::Other(error))
82 : }
83 : }
84 3062 : Entry::Vacant(e) => {
85 3062 : let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
86 3062 : e.insert(1);
87 3062 : Err(DownloadError::Other(error))
88 : }
89 : }
90 6021 : }
91 :
92 2012 : async fn delete_inner(&self, path: &RemotePath, attempt: bool) -> anyhow::Result<()> {
93 2012 : if attempt {
94 68 : self.attempt(RemoteOp::Delete(path.clone()))?;
95 1944 : }
96 8102 : self.inner.delete(path).await
97 2012 : }
98 : }
99 :
100 : // We never construct this, so the type is not important, just has to not be UnreliableWrapper and impl RemoteStorage.
101 : type VoidStorage = crate::LocalFs;
102 :
103 : impl RemoteStorage for UnreliableWrapper {
104 0 : async fn list_prefixes(
105 0 : &self,
106 0 : prefix: Option<&RemotePath>,
107 0 : ) -> Result<Vec<RemotePath>, DownloadError> {
108 0 : self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
109 0 : self.inner.list_prefixes(prefix).await
110 0 : }
111 :
112 128 : async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
113 128 : self.attempt(RemoteOp::ListPrefixes(folder.cloned()))?;
114 262 : self.inner.list_files(folder).await
115 128 : }
116 :
117 126 : async fn list(
118 126 : &self,
119 126 : prefix: Option<&RemotePath>,
120 126 : mode: ListingMode,
121 126 : ) -> Result<Listing, DownloadError> {
122 126 : self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
123 221 : self.inner.list(prefix, mode).await
124 126 : }
125 :
126 5306 : async fn upload(
127 5306 : &self,
128 5306 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
129 5306 : // S3 PUT request requires the content length to be specified,
130 5306 : // otherwise it starts to fail with the concurrent connection count increasing.
131 5306 : data_size_bytes: usize,
132 5306 : to: &RemotePath,
133 5306 : metadata: Option<StorageMetadata>,
134 5306 : ) -> anyhow::Result<()> {
135 5306 : self.attempt(RemoteOp::Upload(to.clone()))?;
136 19068 : self.inner.upload(data, data_size_bytes, to, metadata).await
137 5306 : }
138 :
139 165 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
140 165 : self.attempt(RemoteOp::Download(from.clone()))?;
141 186 : self.inner.download(from).await
142 165 : }
143 :
144 0 : async fn download_byte_range(
145 0 : &self,
146 0 : from: &RemotePath,
147 0 : start_inclusive: u64,
148 0 : end_exclusive: Option<u64>,
149 0 : ) -> Result<Download, DownloadError> {
150 0 : // Note: We treat any download_byte_range as an "attempt" of the same
151 0 : // operation. We don't pay attention to the ranges. That's good enough
152 0 : // for now.
153 0 : self.attempt(RemoteOp::Download(from.clone()))?;
154 0 : self.inner
155 0 : .download_byte_range(from, start_inclusive, end_exclusive)
156 0 : .await
157 0 : }
158 :
159 68 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
160 140 : self.delete_inner(path, true).await
161 68 : }
162 :
163 228 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
164 228 : self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?;
165 114 : let mut error_counter = 0;
166 2058 : for path in paths {
167 : // Dont record attempt because it was already recorded above
168 7962 : if (self.delete_inner(path, false).await).is_err() {
169 0 : error_counter += 1;
170 1944 : }
171 : }
172 114 : if error_counter > 0 {
173 0 : return Err(anyhow::anyhow!(
174 0 : "failed to delete {} objects",
175 0 : error_counter
176 0 : ));
177 114 : }
178 114 : Ok(())
179 228 : }
180 :
181 0 : async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
182 0 : // copy is equivalent to download + upload
183 0 : self.attempt(RemoteOp::Download(from.clone()))?;
184 0 : self.attempt(RemoteOp::Upload(to.clone()))?;
185 0 : self.inner.copy_object(from, to).await
186 0 : }
187 :
188 0 : async fn time_travel_recover(
189 0 : &self,
190 0 : prefix: Option<&RemotePath>,
191 0 : timestamp: SystemTime,
192 0 : done_if_after: SystemTime,
193 0 : cancel: &CancellationToken,
194 0 : ) -> Result<(), TimeTravelError> {
195 0 : self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))
196 0 : .map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?;
197 0 : self.inner
198 0 : .time_travel_recover(prefix, timestamp, done_if_after, cancel)
199 0 : .await
200 0 : }
201 : }
|