LCOV - code coverage report
Current view: top level - libs/remote_storage/tests/common - mod.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 92.2 % 166 153
Test Date: 2025-07-16 12:29:03 Functions: 85.0 % 40 34

            Line data    Source code
       1              : use std::collections::HashSet;
       2              : use std::ops::ControlFlow;
       3              : use std::path::PathBuf;
       4              : use std::sync::Arc;
       5              : 
       6              : use anyhow::Context;
       7              : use bytes::Bytes;
       8              : use camino::Utf8Path;
       9              : use futures::stream::Stream;
      10              : use once_cell::sync::OnceCell;
      11              : use remote_storage::{Download, GenericRemoteStorage, RemotePath};
      12              : use tokio::task::JoinSet;
      13              : use tokio_util::sync::CancellationToken;
      14              : use tracing::{debug, error, info};
      15              : 
      16              : static LOGGING_DONE: OnceCell<()> = OnceCell::new();
      17              : 
      18          269 : pub(crate) fn upload_stream(
      19          269 :     content: std::borrow::Cow<'static, [u8]>,
      20          269 : ) -> (
      21          269 :     impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
      22          269 :     usize,
      23          269 : ) {
      24              :     use std::borrow::Cow;
      25              : 
      26          269 :     let content = match content {
      27           17 :         Cow::Borrowed(x) => Bytes::from_static(x),
      28          252 :         Cow::Owned(vec) => Bytes::from(vec),
      29              :     };
      30          269 :     wrap_stream(content)
      31          269 : }
      32              : 
      33          287 : pub(crate) fn wrap_stream(
      34          287 :     content: bytes::Bytes,
      35          287 : ) -> (
      36          287 :     impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
      37          287 :     usize,
      38          287 : ) {
      39          287 :     let len = content.len();
      40          287 :     let content = futures::future::ready(Ok(content));
      41              : 
      42          287 :     (futures::stream::once(content), len)
      43          287 : }
      44              : 
      45           25 : pub(crate) async fn download_to_vec(dl: Download) -> anyhow::Result<Vec<u8>> {
      46           25 :     let mut buf = Vec::new();
      47           25 :     tokio::io::copy_buf(
      48           25 :         &mut tokio_util::io::StreamReader::new(dl.download_stream),
      49           25 :         &mut buf,
      50           25 :     )
      51           25 :     .await?;
      52           25 :     Ok(buf)
      53           25 : }
      54              : 
      55              : // Uploads files `folder{j}/blob{i}.txt`. See test description for more details.
      56            9 : pub(crate) async fn upload_simple_remote_data(
      57            9 :     client: &Arc<GenericRemoteStorage>,
      58            9 :     upload_tasks_count: usize,
      59            9 : ) -> ControlFlow<HashSet<RemotePath>, HashSet<RemotePath>> {
      60            9 :     info!("Creating {upload_tasks_count} remote files");
      61            9 :     let mut upload_tasks = JoinSet::new();
      62            9 :     let cancel = CancellationToken::new();
      63              : 
      64          189 :     for i in 1..upload_tasks_count + 1 {
      65          189 :         let task_client = Arc::clone(client);
      66          189 :         let cancel = cancel.clone();
      67              : 
      68          189 :         upload_tasks.spawn(async move {
      69          189 :             let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i));
      70          189 :             let blob_path = RemotePath::new(
      71          189 :                 Utf8Path::from_path(blob_path.as_path()).expect("must be valid blob path"),
      72              :             )
      73          189 :             .with_context(|| format!("{blob_path:?} to RemotePath conversion"))?;
      74          189 :             debug!("Creating remote item {i} at path {blob_path:?}");
      75              : 
      76          189 :             let (data, len) = upload_stream(format!("remote blob data {i}").into_bytes().into());
      77          189 :             task_client
      78          189 :                 .upload(data, len, &blob_path, None, &cancel)
      79          189 :                 .await?;
      80              : 
      81          189 :             Ok::<_, anyhow::Error>(blob_path)
      82          189 :         });
      83              :     }
      84              : 
      85            9 :     let mut upload_tasks_failed = false;
      86            9 :     let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
      87          198 :     while let Some(task_run_result) = upload_tasks.join_next().await {
      88          189 :         match task_run_result
      89          189 :             .context("task join failed")
      90          189 :             .and_then(|task_result| task_result.context("upload task failed"))
      91              :         {
      92          189 :             Ok(upload_path) => {
      93          189 :                 uploaded_blobs.insert(upload_path);
      94          189 :             }
      95            0 :             Err(e) => {
      96            0 :                 error!("Upload task failed: {e:?}");
      97            0 :                 upload_tasks_failed = true;
      98              :             }
      99              :         }
     100              :     }
     101              : 
     102            9 :     if upload_tasks_failed {
     103            0 :         ControlFlow::Break(uploaded_blobs)
     104              :     } else {
     105            9 :         ControlFlow::Continue(uploaded_blobs)
     106              :     }
     107            9 : }
     108              : 
     109           12 : pub(crate) async fn cleanup(
     110           12 :     client: &Arc<GenericRemoteStorage>,
     111           12 :     objects_to_delete: HashSet<RemotePath>,
     112           12 : ) {
     113           12 :     info!(
     114            0 :         "Removing {} objects from the remote storage during cleanup",
     115            0 :         objects_to_delete.len()
     116              :     );
     117           12 :     let cancel = CancellationToken::new();
     118           12 :     let mut delete_tasks = JoinSet::new();
     119          264 :     for object_to_delete in objects_to_delete {
     120          252 :         let task_client = Arc::clone(client);
     121          252 :         let cancel = cancel.clone();
     122          252 :         delete_tasks.spawn(async move {
     123          252 :             debug!("Deleting remote item at path {object_to_delete:?}");
     124          252 :             task_client
     125          252 :                 .delete(&object_to_delete, &cancel)
     126          252 :                 .await
     127          252 :                 .with_context(|| format!("{object_to_delete:?} removal"))
     128          252 :         });
     129              :     }
     130              : 
     131          264 :     while let Some(task_run_result) = delete_tasks.join_next().await {
     132          252 :         match task_run_result {
     133          252 :             Ok(task_result) => match task_result {
     134          252 :                 Ok(()) => {}
     135            0 :                 Err(e) => error!("Delete task failed: {e:?}"),
     136              :             },
     137            0 :             Err(join_err) => error!("Delete task did not finish correctly: {join_err}"),
     138              :         }
     139              :     }
     140           12 : }
     141              : pub(crate) struct Uploads {
     142              :     pub(crate) prefixes: HashSet<RemotePath>,
     143              :     pub(crate) blobs: HashSet<RemotePath>,
     144              : }
     145              : 
     146            3 : pub(crate) async fn upload_remote_data(
     147            3 :     client: &Arc<GenericRemoteStorage>,
     148            3 :     base_prefix_str: &'static str,
     149            3 :     upload_tasks_count: usize,
     150            3 : ) -> ControlFlow<Uploads, Uploads> {
     151            3 :     info!("Creating {upload_tasks_count} remote files");
     152            3 :     let mut upload_tasks = JoinSet::new();
     153            3 :     let cancel = CancellationToken::new();
     154              : 
     155           63 :     for i in 1..=upload_tasks_count {
     156           63 :         let task_client = Arc::clone(client);
     157           63 :         let cancel = cancel.clone();
     158              : 
     159           63 :         upload_tasks.spawn(async move {
     160           63 :             let prefix = format!("{base_prefix_str}/sub_prefix_{i}/");
     161           63 :             let blob_prefix = RemotePath::new(Utf8Path::new(&prefix))
     162           63 :                 .with_context(|| format!("{prefix:?} to RemotePath conversion"))?;
     163           63 :             let blob_path = blob_prefix.join(Utf8Path::new(&format!("blob_{i}")));
     164           63 :             debug!("Creating remote item {i} at path {blob_path:?}");
     165              : 
     166           63 :             let (data, data_len) =
     167           63 :                 upload_stream(format!("remote blob data {i}").into_bytes().into());
     168              : 
     169              :             /* BEGIN_HADRON */
     170           63 :             let mut metadata = None;
     171           63 :             if matches!(&*task_client, GenericRemoteStorage::AzureBlob(_)) {
     172           21 :                 let file_path = "/tmp/dbx_upload_tmp_file.txt";
     173              :                 {
     174              :                     // Open the file in append mode
     175           21 :                     let mut file = std::fs::OpenOptions::new()
     176           21 :                         .append(true)
     177           21 :                         .create(true) // Create the file if it doesn't exist
     178           21 :                         .open(file_path)?;
     179              :                     // Append some bytes to the file
     180           21 :                     std::io::Write::write_all(
     181           21 :                         &mut file,
     182           21 :                         &format!("remote blob data {i}").into_bytes(),
     183            0 :                     )?;
     184           21 :                     file.sync_all()?;
     185              :                 }
     186           21 :                 metadata = Some(remote_storage::StorageMetadata::from([(
     187           21 :                     "databricks_azure_put_block",
     188           21 :                     file_path,
     189           21 :                 )]));
     190           42 :             }
     191              :             /* END_HADRON */
     192              : 
     193           63 :             task_client
     194           63 :                 .upload(data, data_len, &blob_path, metadata, &cancel)
     195           63 :                 .await?;
     196              : 
     197              :             // TODO: Check upload is using the put_block upload.
     198              :             // We cannot consume data here since data is moved inside the upload.
     199              :             // let total_bytes = data.fold(0, |acc, chunk| async move {
     200              :             //     acc + chunk.map(|bytes| bytes.len()).unwrap_or(0)
     201              :             // }).await;
     202              :             // assert_eq!(total_bytes, data_len);
     203              : 
     204           63 :             Ok::<_, anyhow::Error>((blob_prefix, blob_path))
     205           63 :         });
     206              :     }
     207              : 
     208            3 :     let mut upload_tasks_failed = false;
     209            3 :     let mut uploaded_prefixes = HashSet::with_capacity(upload_tasks_count);
     210            3 :     let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
     211           66 :     while let Some(task_run_result) = upload_tasks.join_next().await {
     212           63 :         match task_run_result
     213           63 :             .context("task join failed")
     214           63 :             .and_then(|task_result| task_result.context("upload task failed"))
     215              :         {
     216           63 :             Ok((upload_prefix, upload_path)) => {
     217           63 :                 uploaded_prefixes.insert(upload_prefix);
     218           63 :                 uploaded_blobs.insert(upload_path);
     219           63 :             }
     220            0 :             Err(e) => {
     221            0 :                 error!("Upload task failed: {e:?}");
     222            0 :                 upload_tasks_failed = true;
     223              :             }
     224              :         }
     225              :     }
     226              : 
     227            3 :     let uploads = Uploads {
     228            3 :         prefixes: uploaded_prefixes,
     229            3 :         blobs: uploaded_blobs,
     230            3 :     };
     231            3 :     if upload_tasks_failed {
     232            0 :         ControlFlow::Break(uploads)
     233              :     } else {
     234            3 :         ControlFlow::Continue(uploads)
     235              :     }
     236            3 : }
     237              : 
     238           69 : pub(crate) fn ensure_logging_ready() {
     239           69 :     LOGGING_DONE.get_or_init(|| {
     240           69 :         utils::logging::init(
     241           69 :             utils::logging::LogFormat::Test,
     242           69 :             utils::logging::TracingErrorLayerEnablement::Disabled,
     243           69 :             utils::logging::Output::Stdout,
     244              :         )
     245           69 :         .expect("logging init failed");
     246           69 :     });
     247           69 : }
        

Generated by: LCOV version 2.1-beta