Line data Source code
1 : use std::collections::HashSet;
2 : use std::ops::ControlFlow;
3 : use std::path::PathBuf;
4 : use std::sync::Arc;
5 :
6 : use anyhow::Context;
7 : use bytes::Bytes;
8 : use camino::Utf8Path;
9 : use futures::stream::Stream;
10 : use once_cell::sync::OnceCell;
11 : use remote_storage::{Download, GenericRemoteStorage, RemotePath};
12 : use tokio::task::JoinSet;
13 : use tracing::{debug, error, info};
14 :
15 : static LOGGING_DONE: OnceCell<()> = OnceCell::new();
16 :
17 0 : pub(crate) fn upload_stream(
18 0 : content: std::borrow::Cow<'static, [u8]>,
19 0 : ) -> (
20 0 : impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
21 0 : usize,
22 0 : ) {
23 : use std::borrow::Cow;
24 :
25 0 : let content = match content {
26 0 : Cow::Borrowed(x) => Bytes::from_static(x),
27 0 : Cow::Owned(vec) => Bytes::from(vec),
28 : };
29 0 : wrap_stream(content)
30 0 : }
31 :
32 0 : pub(crate) fn wrap_stream(
33 0 : content: bytes::Bytes,
34 0 : ) -> (
35 0 : impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
36 0 : usize,
37 0 : ) {
38 0 : let len = content.len();
39 0 : let content = futures::future::ready(Ok(content));
40 0 :
41 0 : (futures::stream::once(content), len)
42 0 : }
43 :
44 0 : pub(crate) async fn download_to_vec(dl: Download) -> anyhow::Result<Vec<u8>> {
45 0 : let mut buf = Vec::new();
46 0 : tokio::io::copy_buf(
47 0 : &mut tokio_util::io::StreamReader::new(dl.download_stream),
48 0 : &mut buf,
49 0 : )
50 0 : .await?;
51 0 : Ok(buf)
52 0 : }
53 :
54 : // Uploads files `folder{j}/blob{i}.txt`. See test description for more details.
55 0 : pub(crate) async fn upload_simple_remote_data(
56 0 : client: &Arc<GenericRemoteStorage>,
57 0 : upload_tasks_count: usize,
58 0 : ) -> ControlFlow<HashSet<RemotePath>, HashSet<RemotePath>> {
59 0 : info!("Creating {upload_tasks_count} remote files");
60 0 : let mut upload_tasks = JoinSet::new();
61 0 : for i in 1..upload_tasks_count + 1 {
62 0 : let task_client = Arc::clone(client);
63 0 : upload_tasks.spawn(async move {
64 0 : let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i));
65 0 : let blob_path = RemotePath::new(
66 0 : Utf8Path::from_path(blob_path.as_path()).expect("must be valid blob path"),
67 0 : )
68 0 : .with_context(|| format!("{blob_path:?} to RemotePath conversion"))?;
69 0 : debug!("Creating remote item {i} at path {blob_path:?}");
70 :
71 0 : let (data, len) = upload_stream(format!("remote blob data {i}").into_bytes().into());
72 0 : task_client.upload(data, len, &blob_path, None).await?;
73 :
74 0 : Ok::<_, anyhow::Error>(blob_path)
75 0 : });
76 0 : }
77 :
78 0 : let mut upload_tasks_failed = false;
79 0 : let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
80 0 : while let Some(task_run_result) = upload_tasks.join_next().await {
81 0 : match task_run_result
82 0 : .context("task join failed")
83 0 : .and_then(|task_result| task_result.context("upload task failed"))
84 : {
85 0 : Ok(upload_path) => {
86 0 : uploaded_blobs.insert(upload_path);
87 0 : }
88 0 : Err(e) => {
89 0 : error!("Upload task failed: {e:?}");
90 0 : upload_tasks_failed = true;
91 : }
92 : }
93 : }
94 :
95 0 : if upload_tasks_failed {
96 0 : ControlFlow::Break(uploaded_blobs)
97 : } else {
98 0 : ControlFlow::Continue(uploaded_blobs)
99 : }
100 0 : }
101 :
102 0 : pub(crate) async fn cleanup(
103 0 : client: &Arc<GenericRemoteStorage>,
104 0 : objects_to_delete: HashSet<RemotePath>,
105 0 : ) {
106 0 : info!(
107 0 : "Removing {} objects from the remote storage during cleanup",
108 0 : objects_to_delete.len()
109 0 : );
110 0 : let mut delete_tasks = JoinSet::new();
111 0 : for object_to_delete in objects_to_delete {
112 0 : let task_client = Arc::clone(client);
113 0 : delete_tasks.spawn(async move {
114 0 : debug!("Deleting remote item at path {object_to_delete:?}");
115 0 : task_client
116 0 : .delete(&object_to_delete)
117 0 : .await
118 0 : .with_context(|| format!("{object_to_delete:?} removal"))
119 0 : });
120 0 : }
121 :
122 0 : while let Some(task_run_result) = delete_tasks.join_next().await {
123 0 : match task_run_result {
124 0 : Ok(task_result) => match task_result {
125 0 : Ok(()) => {}
126 0 : Err(e) => error!("Delete task failed: {e:?}"),
127 : },
128 0 : Err(join_err) => error!("Delete task did not finish correctly: {join_err}"),
129 : }
130 : }
131 0 : }
132 : pub(crate) struct Uploads {
133 : pub(crate) prefixes: HashSet<RemotePath>,
134 : pub(crate) blobs: HashSet<RemotePath>,
135 : }
136 :
137 0 : pub(crate) async fn upload_remote_data(
138 0 : client: &Arc<GenericRemoteStorage>,
139 0 : base_prefix_str: &'static str,
140 0 : upload_tasks_count: usize,
141 0 : ) -> ControlFlow<Uploads, Uploads> {
142 0 : info!("Creating {upload_tasks_count} remote files");
143 0 : let mut upload_tasks = JoinSet::new();
144 0 : for i in 1..upload_tasks_count + 1 {
145 0 : let task_client = Arc::clone(client);
146 0 : upload_tasks.spawn(async move {
147 0 : let prefix = format!("{base_prefix_str}/sub_prefix_{i}/");
148 0 : let blob_prefix = RemotePath::new(Utf8Path::new(&prefix))
149 0 : .with_context(|| format!("{prefix:?} to RemotePath conversion"))?;
150 0 : let blob_path = blob_prefix.join(Utf8Path::new(&format!("blob_{i}")));
151 0 : debug!("Creating remote item {i} at path {blob_path:?}");
152 :
153 0 : let (data, data_len) =
154 0 : upload_stream(format!("remote blob data {i}").into_bytes().into());
155 0 : task_client.upload(data, data_len, &blob_path, None).await?;
156 :
157 0 : Ok::<_, anyhow::Error>((blob_prefix, blob_path))
158 0 : });
159 0 : }
160 :
161 0 : let mut upload_tasks_failed = false;
162 0 : let mut uploaded_prefixes = HashSet::with_capacity(upload_tasks_count);
163 0 : let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
164 0 : while let Some(task_run_result) = upload_tasks.join_next().await {
165 0 : match task_run_result
166 0 : .context("task join failed")
167 0 : .and_then(|task_result| task_result.context("upload task failed"))
168 : {
169 0 : Ok((upload_prefix, upload_path)) => {
170 0 : uploaded_prefixes.insert(upload_prefix);
171 0 : uploaded_blobs.insert(upload_path);
172 0 : }
173 0 : Err(e) => {
174 0 : error!("Upload task failed: {e:?}");
175 0 : upload_tasks_failed = true;
176 : }
177 : }
178 : }
179 :
180 0 : let uploads = Uploads {
181 0 : prefixes: uploaded_prefixes,
182 0 : blobs: uploaded_blobs,
183 0 : };
184 0 : if upload_tasks_failed {
185 0 : ControlFlow::Break(uploads)
186 : } else {
187 0 : ControlFlow::Continue(uploads)
188 : }
189 0 : }
190 :
191 26 : pub(crate) fn ensure_logging_ready() {
192 26 : LOGGING_DONE.get_or_init(|| {
193 26 : utils::logging::init(
194 26 : utils::logging::LogFormat::Test,
195 26 : utils::logging::TracingErrorLayerEnablement::Disabled,
196 26 : utils::logging::Output::Stdout,
197 26 : )
198 26 : .expect("logging init failed");
199 26 : });
200 26 : }
|