Line data Source code
1 : use camino::{Utf8Path, Utf8PathBuf};
2 : use tokio::task::JoinSet;
3 : use walkdir::WalkDir;
4 :
5 : use super::s3_uri::S3Uri;
6 :
7 : use tracing::{info, warn};
8 :
9 : const MAX_PARALLEL_UPLOADS: usize = 10;
10 :
11 : /// Upload all files from 'local' to 'remote'
12 0 : pub(crate) async fn upload_dir_recursive(
13 0 : s3_client: &aws_sdk_s3::Client,
14 0 : local: &Utf8Path,
15 0 : remote: &S3Uri,
16 0 : ) -> anyhow::Result<()> {
17 0 : // Recursively scan directory
18 0 : let mut dirwalker = WalkDir::new(local)
19 0 : .into_iter()
20 0 : .map(|entry| {
21 0 : let entry = entry?;
22 0 : let file_type = entry.file_type();
23 0 : let path = <&Utf8Path>::try_from(entry.path())?.to_path_buf();
24 0 : Ok((file_type, path))
25 0 : })
26 0 : .filter_map(|e: anyhow::Result<(std::fs::FileType, Utf8PathBuf)>| {
27 0 : match e {
28 0 : Ok((file_type, path)) if file_type.is_file() => Some(Ok(path)),
29 0 : Ok((file_type, _path)) if file_type.is_dir() => {
30 0 : // The WalkDir iterator will recurse into directories, but we don't want
31 0 : // to do anything with directories as such. There's no concept of uploading
32 0 : // an empty directory to S3.
33 0 : None
34 : }
35 0 : Ok((file_type, path)) if file_type.is_symlink() => {
36 0 : // huh, didn't expect a symlink. Can't upload that to S3. Warn and skip.
37 0 : warn!("cannot upload symlink ({})", path);
38 0 : None
39 : }
40 0 : Ok((_file_type, path)) => {
41 0 : // should not happen
42 0 : warn!("directory entry has unexpected type ({})", path);
43 0 : None
44 : }
45 0 : Err(e) => Some(Err(e)),
46 : }
47 0 : });
48 0 :
49 0 : // Spawn upload tasks for each file, keeping MAX_PARALLEL_UPLOADS active in
50 0 : // parallel.
51 0 : let mut joinset = JoinSet::new();
52 : loop {
53 : // Could we upload more?
54 0 : while joinset.len() < MAX_PARALLEL_UPLOADS {
55 0 : if let Some(full_local_path) = dirwalker.next() {
56 0 : let full_local_path = full_local_path?;
57 0 : let relative_local_path = full_local_path
58 0 : .strip_prefix(local)
59 0 : .expect("all paths start from the walkdir root");
60 0 : let remote_path = remote.append(relative_local_path.as_str());
61 0 : info!(
62 0 : "starting upload of {} to {}",
63 0 : &full_local_path, &remote_path
64 : );
65 0 : let upload_task = upload_file(s3_client.clone(), full_local_path, remote_path);
66 0 : joinset.spawn(upload_task);
67 : } else {
68 0 : info!("draining upload tasks");
69 0 : break;
70 : }
71 : }
72 :
73 : // Wait for an upload to complete
74 0 : if let Some(res) = joinset.join_next().await {
75 0 : let _ = res?;
76 : } else {
77 : // all done!
78 0 : break;
79 0 : }
80 0 : }
81 0 : Ok(())
82 0 : }
83 :
84 0 : pub(crate) async fn upload_file(
85 0 : s3_client: aws_sdk_s3::Client,
86 0 : local_path: Utf8PathBuf,
87 0 : remote: S3Uri,
88 0 : ) -> anyhow::Result<()> {
89 : use aws_smithy_types::byte_stream::ByteStream;
90 0 : let stream = ByteStream::from_path(&local_path).await?;
91 :
92 0 : let _result = s3_client
93 0 : .put_object()
94 0 : .bucket(remote.bucket)
95 0 : .key(&remote.key)
96 0 : .body(stream)
97 0 : .send()
98 0 : .await?;
99 0 : info!("upload of {} to {} finished", &local_path, &remote.key);
100 :
101 0 : Ok(())
102 0 : }
|