Line data Source code
1 : use std::collections::HashSet;
2 : use std::ops::ControlFlow;
3 : use std::path::PathBuf;
4 : use std::sync::Arc;
5 :
6 : use anyhow::Context;
7 : use bytes::Bytes;
8 : use camino::Utf8Path;
9 : use futures::stream::Stream;
10 : use once_cell::sync::OnceCell;
11 : use remote_storage::{Download, GenericRemoteStorage, RemotePath};
12 : use tokio::task::JoinSet;
13 : use tokio_util::sync::CancellationToken;
14 : use tracing::{debug, error, info};
15 :
16 : static LOGGING_DONE: OnceCell<()> = OnceCell::new();
17 :
18 269 : pub(crate) fn upload_stream(
19 269 : content: std::borrow::Cow<'static, [u8]>,
20 269 : ) -> (
21 269 : impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
22 269 : usize,
23 269 : ) {
24 : use std::borrow::Cow;
25 :
26 269 : let content = match content {
27 17 : Cow::Borrowed(x) => Bytes::from_static(x),
28 252 : Cow::Owned(vec) => Bytes::from(vec),
29 : };
30 269 : wrap_stream(content)
31 269 : }
32 :
33 287 : pub(crate) fn wrap_stream(
34 287 : content: bytes::Bytes,
35 287 : ) -> (
36 287 : impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
37 287 : usize,
38 287 : ) {
39 287 : let len = content.len();
40 287 : let content = futures::future::ready(Ok(content));
41 :
42 287 : (futures::stream::once(content), len)
43 287 : }
44 :
45 25 : pub(crate) async fn download_to_vec(dl: Download) -> anyhow::Result<Vec<u8>> {
46 25 : let mut buf = Vec::new();
47 25 : tokio::io::copy_buf(
48 25 : &mut tokio_util::io::StreamReader::new(dl.download_stream),
49 25 : &mut buf,
50 25 : )
51 25 : .await?;
52 25 : Ok(buf)
53 25 : }
54 :
55 : // Uploads files `folder{j}/blob{i}.txt`. See test description for more details.
56 9 : pub(crate) async fn upload_simple_remote_data(
57 9 : client: &Arc<GenericRemoteStorage>,
58 9 : upload_tasks_count: usize,
59 9 : ) -> ControlFlow<HashSet<RemotePath>, HashSet<RemotePath>> {
60 9 : info!("Creating {upload_tasks_count} remote files");
61 9 : let mut upload_tasks = JoinSet::new();
62 9 : let cancel = CancellationToken::new();
63 :
64 189 : for i in 1..upload_tasks_count + 1 {
65 189 : let task_client = Arc::clone(client);
66 189 : let cancel = cancel.clone();
67 :
68 189 : upload_tasks.spawn(async move {
69 189 : let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i));
70 189 : let blob_path = RemotePath::new(
71 189 : Utf8Path::from_path(blob_path.as_path()).expect("must be valid blob path"),
72 : )
73 189 : .with_context(|| format!("{blob_path:?} to RemotePath conversion"))?;
74 189 : debug!("Creating remote item {i} at path {blob_path:?}");
75 :
76 189 : let (data, len) = upload_stream(format!("remote blob data {i}").into_bytes().into());
77 189 : task_client
78 189 : .upload(data, len, &blob_path, None, &cancel)
79 189 : .await?;
80 :
81 189 : Ok::<_, anyhow::Error>(blob_path)
82 189 : });
83 : }
84 :
85 9 : let mut upload_tasks_failed = false;
86 9 : let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
87 198 : while let Some(task_run_result) = upload_tasks.join_next().await {
88 189 : match task_run_result
89 189 : .context("task join failed")
90 189 : .and_then(|task_result| task_result.context("upload task failed"))
91 : {
92 189 : Ok(upload_path) => {
93 189 : uploaded_blobs.insert(upload_path);
94 189 : }
95 0 : Err(e) => {
96 0 : error!("Upload task failed: {e:?}");
97 0 : upload_tasks_failed = true;
98 : }
99 : }
100 : }
101 :
102 9 : if upload_tasks_failed {
103 0 : ControlFlow::Break(uploaded_blobs)
104 : } else {
105 9 : ControlFlow::Continue(uploaded_blobs)
106 : }
107 9 : }
108 :
109 12 : pub(crate) async fn cleanup(
110 12 : client: &Arc<GenericRemoteStorage>,
111 12 : objects_to_delete: HashSet<RemotePath>,
112 12 : ) {
113 12 : info!(
114 0 : "Removing {} objects from the remote storage during cleanup",
115 0 : objects_to_delete.len()
116 : );
117 12 : let cancel = CancellationToken::new();
118 12 : let mut delete_tasks = JoinSet::new();
119 264 : for object_to_delete in objects_to_delete {
120 252 : let task_client = Arc::clone(client);
121 252 : let cancel = cancel.clone();
122 252 : delete_tasks.spawn(async move {
123 252 : debug!("Deleting remote item at path {object_to_delete:?}");
124 252 : task_client
125 252 : .delete(&object_to_delete, &cancel)
126 252 : .await
127 252 : .with_context(|| format!("{object_to_delete:?} removal"))
128 252 : });
129 : }
130 :
131 264 : while let Some(task_run_result) = delete_tasks.join_next().await {
132 252 : match task_run_result {
133 252 : Ok(task_result) => match task_result {
134 252 : Ok(()) => {}
135 0 : Err(e) => error!("Delete task failed: {e:?}"),
136 : },
137 0 : Err(join_err) => error!("Delete task did not finish correctly: {join_err}"),
138 : }
139 : }
140 12 : }
141 : pub(crate) struct Uploads {
142 : pub(crate) prefixes: HashSet<RemotePath>,
143 : pub(crate) blobs: HashSet<RemotePath>,
144 : }
145 :
146 3 : pub(crate) async fn upload_remote_data(
147 3 : client: &Arc<GenericRemoteStorage>,
148 3 : base_prefix_str: &'static str,
149 3 : upload_tasks_count: usize,
150 3 : ) -> ControlFlow<Uploads, Uploads> {
151 3 : info!("Creating {upload_tasks_count} remote files");
152 3 : let mut upload_tasks = JoinSet::new();
153 3 : let cancel = CancellationToken::new();
154 :
155 63 : for i in 1..=upload_tasks_count {
156 63 : let task_client = Arc::clone(client);
157 63 : let cancel = cancel.clone();
158 :
159 63 : upload_tasks.spawn(async move {
160 63 : let prefix = format!("{base_prefix_str}/sub_prefix_{i}/");
161 63 : let blob_prefix = RemotePath::new(Utf8Path::new(&prefix))
162 63 : .with_context(|| format!("{prefix:?} to RemotePath conversion"))?;
163 63 : let blob_path = blob_prefix.join(Utf8Path::new(&format!("blob_{i}")));
164 63 : debug!("Creating remote item {i} at path {blob_path:?}");
165 :
166 63 : let (data, data_len) =
167 63 : upload_stream(format!("remote blob data {i}").into_bytes().into());
168 :
169 : /* BEGIN_HADRON */
170 63 : let mut metadata = None;
171 63 : if matches!(&*task_client, GenericRemoteStorage::AzureBlob(_)) {
172 21 : let file_path = "/tmp/dbx_upload_tmp_file.txt";
173 : {
174 : // Open the file in append mode
175 21 : let mut file = std::fs::OpenOptions::new()
176 21 : .append(true)
177 21 : .create(true) // Create the file if it doesn't exist
178 21 : .open(file_path)?;
179 : // Append some bytes to the file
180 21 : std::io::Write::write_all(
181 21 : &mut file,
182 21 : &format!("remote blob data {i}").into_bytes(),
183 0 : )?;
184 21 : file.sync_all()?;
185 : }
186 21 : metadata = Some(remote_storage::StorageMetadata::from([(
187 21 : "databricks_azure_put_block",
188 21 : file_path,
189 21 : )]));
190 42 : }
191 : /* END_HADRON */
192 :
193 63 : task_client
194 63 : .upload(data, data_len, &blob_path, metadata, &cancel)
195 63 : .await?;
196 :
197 : // TODO: Check upload is using the put_block upload.
198 : // We cannot consume data here since data is moved inside the upload.
199 : // let total_bytes = data.fold(0, |acc, chunk| async move {
200 : // acc + chunk.map(|bytes| bytes.len()).unwrap_or(0)
201 : // }).await;
202 : // assert_eq!(total_bytes, data_len);
203 :
204 63 : Ok::<_, anyhow::Error>((blob_prefix, blob_path))
205 63 : });
206 : }
207 :
208 3 : let mut upload_tasks_failed = false;
209 3 : let mut uploaded_prefixes = HashSet::with_capacity(upload_tasks_count);
210 3 : let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
211 66 : while let Some(task_run_result) = upload_tasks.join_next().await {
212 63 : match task_run_result
213 63 : .context("task join failed")
214 63 : .and_then(|task_result| task_result.context("upload task failed"))
215 : {
216 63 : Ok((upload_prefix, upload_path)) => {
217 63 : uploaded_prefixes.insert(upload_prefix);
218 63 : uploaded_blobs.insert(upload_path);
219 63 : }
220 0 : Err(e) => {
221 0 : error!("Upload task failed: {e:?}");
222 0 : upload_tasks_failed = true;
223 : }
224 : }
225 : }
226 :
227 3 : let uploads = Uploads {
228 3 : prefixes: uploaded_prefixes,
229 3 : blobs: uploaded_blobs,
230 3 : };
231 3 : if upload_tasks_failed {
232 0 : ControlFlow::Break(uploads)
233 : } else {
234 3 : ControlFlow::Continue(uploads)
235 : }
236 3 : }
237 :
238 69 : pub(crate) fn ensure_logging_ready() {
239 69 : LOGGING_DONE.get_or_init(|| {
240 69 : utils::logging::init(
241 69 : utils::logging::LogFormat::Test,
242 69 : utils::logging::TracingErrorLayerEnablement::Disabled,
243 69 : utils::logging::Output::Stdout,
244 : )
245 69 : .expect("logging init failed");
246 69 : });
247 69 : }
|