TLA Line data Source code
1 : //! This module provides a wrapper around a real RemoteStorage implementation that
2 : //! causes the first N attempts at each upload or download operatio to fail. For
3 : //! testing purposes.
4 : use bytes::Bytes;
5 : use futures::stream::Stream;
6 : use std::collections::hash_map::Entry;
7 : use std::collections::HashMap;
8 : use std::sync::Mutex;
9 :
10 : use crate::{
11 : Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata,
12 : };
13 :
14 : pub struct UnreliableWrapper {
15 : inner: crate::GenericRemoteStorage,
16 :
17 : // This many attempts of each operation will fail, then we let it succeed.
18 : attempts_to_fail: u64,
19 :
20 : // Tracks how many failed attempts of each operation has been made.
21 : attempts: Mutex<HashMap<RemoteOp, u64>>,
22 : }
23 :
24 : /// Used to identify retries of different unique operation.
25 CBC 7027 : #[derive(Debug, Hash, Eq, PartialEq)]
26 : enum RemoteOp {
27 : ListPrefixes(Option<RemotePath>),
28 : Upload(RemotePath),
29 : Download(RemotePath),
30 : Delete(RemotePath),
31 : DeleteObjects(Vec<RemotePath>),
32 : }
33 :
34 : impl UnreliableWrapper {
35 65 : pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
36 65 : assert!(attempts_to_fail > 0);
37 65 : UnreliableWrapper {
38 65 : inner,
39 65 : attempts_to_fail,
40 65 : attempts: Mutex::new(HashMap::new()),
41 65 : }
42 65 : }
43 :
44 : ///
45 : /// Common functionality for all operations.
46 : ///
47 : /// On the first attempts of this operation, return an error. After 'attempts_to_fail'
48 : /// attempts, let the operation go ahead, and clear the counter.
49 : ///
50 5738 : fn attempt(&self, op: RemoteOp) -> Result<u64, DownloadError> {
51 5738 : let mut attempts = self.attempts.lock().unwrap();
52 5738 :
53 5738 : match attempts.entry(op) {
54 2734 : Entry::Occupied(mut e) => {
55 2734 : let attempts_before_this = {
56 2734 : let p = e.get_mut();
57 2734 : *p += 1;
58 2734 : *p
59 2734 : };
60 2734 :
61 2734 : if attempts_before_this >= self.attempts_to_fail {
62 : // let it succeed
63 2734 : e.remove();
64 2734 : Ok(attempts_before_this)
65 : } else {
66 UBC 0 : let error =
67 0 : anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
68 0 : Err(DownloadError::Other(error))
69 : }
70 : }
71 CBC 3004 : Entry::Vacant(e) => {
72 3004 : let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
73 3004 : e.insert(1);
74 3004 : Err(DownloadError::Other(error))
75 : }
76 : }
77 5738 : }
78 :
79 1913 : async fn delete_inner(&self, path: &RemotePath, attempt: bool) -> anyhow::Result<()> {
80 1913 : if attempt {
81 68 : self.attempt(RemoteOp::Delete(path.clone()))?;
82 1845 : }
83 7672 : self.inner.delete(path).await
84 1913 : }
85 : }
86 :
87 : #[async_trait::async_trait]
88 : impl RemoteStorage for UnreliableWrapper {
89 UBC 0 : async fn list_prefixes(
90 0 : &self,
91 0 : prefix: Option<&RemotePath>,
92 0 : ) -> Result<Vec<RemotePath>, DownloadError> {
93 0 : self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
94 0 : self.inner.list_prefixes(prefix).await
95 0 : }
96 :
97 CBC 146 : async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
98 146 : self.attempt(RemoteOp::ListPrefixes(folder.cloned()))?;
99 295 : self.inner.list_files(folder).await
100 292 : }
101 :
102 44 : async fn list(
103 44 : &self,
104 44 : prefix: Option<&RemotePath>,
105 44 : mode: ListingMode,
106 44 : ) -> Result<Listing, DownloadError> {
107 44 : self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
108 87 : self.inner.list(prefix, mode).await
109 88 : }
110 :
111 5010 : async fn upload(
112 5010 : &self,
113 5010 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
114 5010 : // S3 PUT request requires the content length to be specified,
115 5010 : // otherwise it starts to fail with the concurrent connection count increasing.
116 5010 : data_size_bytes: usize,
117 5010 : to: &RemotePath,
118 5010 : metadata: Option<StorageMetadata>,
119 5010 : ) -> anyhow::Result<()> {
120 5010 : self.attempt(RemoteOp::Upload(to.clone()))?;
121 17966 : self.inner.upload(data, data_size_bytes, to, metadata).await
122 10020 : }
123 :
124 244 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
125 244 : self.attempt(RemoteOp::Download(from.clone()))?;
126 327 : self.inner.download(from).await
127 488 : }
128 :
129 UBC 0 : async fn download_byte_range(
130 0 : &self,
131 0 : from: &RemotePath,
132 0 : start_inclusive: u64,
133 0 : end_exclusive: Option<u64>,
134 0 : ) -> Result<Download, DownloadError> {
135 : // Note: We treat any download_byte_range as an "attempt" of the same
136 : // operation. We don't pay attention to the ranges. That's good enough
137 : // for now.
138 0 : self.attempt(RemoteOp::Download(from.clone()))?;
139 0 : self.inner
140 0 : .download_byte_range(from, start_inclusive, end_exclusive)
141 0 : .await
142 0 : }
143 :
144 CBC 68 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
145 140 : self.delete_inner(path, true).await
146 136 : }
147 :
148 226 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
149 226 : self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?;
150 113 : let mut error_counter = 0;
151 1958 : for path in paths {
152 : // Dont record attempt because it was already recorded above
153 7532 : if (self.delete_inner(path, false).await).is_err() {
154 UBC 0 : error_counter += 1;
155 CBC 1845 : }
156 : }
157 113 : if error_counter > 0 {
158 UBC 0 : return Err(anyhow::anyhow!(
159 0 : "failed to delete {} objects",
160 0 : error_counter
161 0 : ));
162 CBC 113 : }
163 113 : Ok(())
164 452 : }
165 :
166 UBC 0 : async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
167 : // copy is equivalent to download + upload
168 0 : self.attempt(RemoteOp::Download(from.clone()))?;
169 0 : self.attempt(RemoteOp::Upload(to.clone()))?;
170 0 : self.inner.copy_object(from, to).await
171 0 : }
172 : }
|