LCOV - code coverage report
Current view: top level - compute_tools/src/bin/fast_import - aws_s3_sync.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 0.0 % 73 0
Test Date: 2025-02-20 13:11:02 Functions: 0.0 % 6 0

            Line data    Source code
       1              : use camino::{Utf8Path, Utf8PathBuf};
       2              : use tokio::task::JoinSet;
       3              : use walkdir::WalkDir;
       4              : 
       5              : use super::s3_uri::S3Uri;
       6              : 
       7              : use tracing::{info, warn};
       8              : 
       9              : const MAX_PARALLEL_UPLOADS: usize = 10;
      10              : 
      11              : /// Upload all files from 'local' to 'remote'
      12            0 : pub(crate) async fn upload_dir_recursive(
      13            0 :     s3_client: &aws_sdk_s3::Client,
      14            0 :     local: &Utf8Path,
      15            0 :     remote: &S3Uri,
      16            0 : ) -> anyhow::Result<()> {
      17            0 :     // Recursively scan directory
      18            0 :     let mut dirwalker = WalkDir::new(local)
      19            0 :         .into_iter()
      20            0 :         .map(|entry| {
      21            0 :             let entry = entry?;
      22            0 :             let file_type = entry.file_type();
      23            0 :             let path = <&Utf8Path>::try_from(entry.path())?.to_path_buf();
      24            0 :             Ok((file_type, path))
      25            0 :         })
      26            0 :         .filter_map(|e: anyhow::Result<(std::fs::FileType, Utf8PathBuf)>| {
      27            0 :             match e {
      28            0 :                 Ok((file_type, path)) if file_type.is_file() => Some(Ok(path)),
      29            0 :                 Ok((file_type, _path)) if file_type.is_dir() => {
      30            0 :                     // The WalkDir iterator will recurse into directories, but we don't want
      31            0 :                     // to do anything with directories as such. There's no concept of uploading
      32            0 :                     // an empty directory to S3.
      33            0 :                     None
      34              :                 }
      35            0 :                 Ok((file_type, path)) if file_type.is_symlink() => {
      36            0 :                     // huh, didn't expect a symlink. Can't upload that to S3. Warn and skip.
      37            0 :                     warn!("cannot upload symlink ({})", path);
      38            0 :                     None
      39              :                 }
      40            0 :                 Ok((_file_type, path)) => {
      41            0 :                     // should not happen
      42            0 :                     warn!("directory entry has unexpected type ({})", path);
      43            0 :                     None
      44              :                 }
      45            0 :                 Err(e) => Some(Err(e)),
      46              :             }
      47            0 :         });
      48            0 : 
      49            0 :     // Spawn upload tasks for each file, keeping MAX_PARALLEL_UPLOADS active in
      50            0 :     // parallel.
      51            0 :     let mut joinset = JoinSet::new();
      52              :     loop {
      53              :         // Could we upload more?
      54            0 :         while joinset.len() < MAX_PARALLEL_UPLOADS {
      55            0 :             if let Some(full_local_path) = dirwalker.next() {
      56            0 :                 let full_local_path = full_local_path?;
      57            0 :                 let relative_local_path = full_local_path
      58            0 :                     .strip_prefix(local)
      59            0 :                     .expect("all paths start from the walkdir root");
      60            0 :                 let remote_path = remote.append(relative_local_path.as_str());
      61            0 :                 info!(
      62            0 :                     "starting upload of {} to {}",
      63            0 :                     &full_local_path, &remote_path
      64              :                 );
      65            0 :                 let upload_task = upload_file(s3_client.clone(), full_local_path, remote_path);
      66            0 :                 joinset.spawn(upload_task);
      67              :             } else {
      68            0 :                 info!("draining upload tasks");
      69            0 :                 break;
      70              :             }
      71              :         }
      72              : 
      73              :         // Wait for an upload to complete
      74            0 :         if let Some(res) = joinset.join_next().await {
      75            0 :             let _ = res?;
      76              :         } else {
      77              :             // all done!
      78            0 :             break;
      79            0 :         }
      80            0 :     }
      81            0 :     Ok(())
      82            0 : }
      83              : 
      84            0 : pub(crate) async fn upload_file(
      85            0 :     s3_client: aws_sdk_s3::Client,
      86            0 :     local_path: Utf8PathBuf,
      87            0 :     remote: S3Uri,
      88            0 : ) -> anyhow::Result<()> {
      89              :     use aws_smithy_types::byte_stream::ByteStream;
      90            0 :     let stream = ByteStream::from_path(&local_path).await?;
      91              : 
      92            0 :     let _result = s3_client
      93            0 :         .put_object()
      94            0 :         .bucket(remote.bucket)
      95            0 :         .key(&remote.key)
      96            0 :         .body(stream)
      97            0 :         .send()
      98            0 :         .await?;
      99            0 :     info!("upload of {} to {} finished", &local_path, &remote.key);
     100              : 
     101            0 :     Ok(())
     102            0 : }
        

Generated by: LCOV version 2.1-beta