LCOV - differential code coverage report
Current view: top level - libs/remote_storage/tests/common - mod.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 6.8 % 146 10 136 10
Current Date: 2024-01-09 02:06:09 Functions: 6.7 % 60 4 56 4
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use std::collections::HashSet;
       2                 : use std::ops::ControlFlow;
       3                 : use std::path::PathBuf;
       4                 : use std::sync::Arc;
       5                 : 
       6                 : use anyhow::Context;
       7                 : use bytes::Bytes;
       8                 : use camino::Utf8Path;
       9                 : use futures::stream::Stream;
      10                 : use once_cell::sync::OnceCell;
      11                 : use remote_storage::{Download, GenericRemoteStorage, RemotePath};
      12                 : use tokio::task::JoinSet;
      13                 : use tracing::{debug, error, info};
      14                 : 
      15                 : static LOGGING_DONE: OnceCell<()> = OnceCell::new();
      16                 : 
      17 UBC           0 : pub(crate) fn upload_stream(
      18               0 :     content: std::borrow::Cow<'static, [u8]>,
      19               0 : ) -> (
      20               0 :     impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
      21               0 :     usize,
      22               0 : ) {
      23                 :     use std::borrow::Cow;
      24                 : 
      25               0 :     let content = match content {
      26               0 :         Cow::Borrowed(x) => Bytes::from_static(x),
      27               0 :         Cow::Owned(vec) => Bytes::from(vec),
      28                 :     };
      29               0 :     wrap_stream(content)
      30               0 : }
      31                 : 
      32               0 : pub(crate) fn wrap_stream(
      33               0 :     content: bytes::Bytes,
      34               0 : ) -> (
      35               0 :     impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
      36               0 :     usize,
      37               0 : ) {
      38               0 :     let len = content.len();
      39               0 :     let content = futures::future::ready(Ok(content));
      40               0 : 
      41               0 :     (futures::stream::once(content), len)
      42               0 : }
      43                 : 
      44               0 : pub(crate) async fn download_to_vec(dl: Download) -> anyhow::Result<Vec<u8>> {
      45               0 :     let mut buf = Vec::new();
      46               0 :     tokio::io::copy_buf(
      47               0 :         &mut tokio_util::io::StreamReader::new(dl.download_stream),
      48               0 :         &mut buf,
      49               0 :     )
      50               0 :     .await?;
      51               0 :     Ok(buf)
      52               0 : }
      53                 : 
      54                 : // Uploads files `folder{j}/blob{i}.txt`. See test description for more details.
      55               0 : pub(crate) async fn upload_simple_remote_data(
      56               0 :     client: &Arc<GenericRemoteStorage>,
      57               0 :     upload_tasks_count: usize,
      58               0 : ) -> ControlFlow<HashSet<RemotePath>, HashSet<RemotePath>> {
      59               0 :     info!("Creating {upload_tasks_count} remote files");
      60               0 :     let mut upload_tasks = JoinSet::new();
      61               0 :     for i in 1..upload_tasks_count + 1 {
      62               0 :         let task_client = Arc::clone(client);
      63               0 :         upload_tasks.spawn(async move {
      64               0 :             let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i));
      65               0 :             let blob_path = RemotePath::new(
      66               0 :                 Utf8Path::from_path(blob_path.as_path()).expect("must be valid blob path"),
      67               0 :             )
      68               0 :             .with_context(|| format!("{blob_path:?} to RemotePath conversion"))?;
      69               0 :             debug!("Creating remote item {i} at path {blob_path:?}");
      70                 : 
      71               0 :             let (data, len) = upload_stream(format!("remote blob data {i}").into_bytes().into());
      72               0 :             task_client.upload(data, len, &blob_path, None).await?;
      73                 : 
      74               0 :             Ok::<_, anyhow::Error>(blob_path)
      75               0 :         });
      76               0 :     }
      77                 : 
      78               0 :     let mut upload_tasks_failed = false;
      79               0 :     let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
      80               0 :     while let Some(task_run_result) = upload_tasks.join_next().await {
      81               0 :         match task_run_result
      82               0 :             .context("task join failed")
      83               0 :             .and_then(|task_result| task_result.context("upload task failed"))
      84                 :         {
      85               0 :             Ok(upload_path) => {
      86               0 :                 uploaded_blobs.insert(upload_path);
      87               0 :             }
      88               0 :             Err(e) => {
      89               0 :                 error!("Upload task failed: {e:?}");
      90               0 :                 upload_tasks_failed = true;
      91                 :             }
      92                 :         }
      93                 :     }
      94                 : 
      95               0 :     if upload_tasks_failed {
      96               0 :         ControlFlow::Break(uploaded_blobs)
      97                 :     } else {
      98               0 :         ControlFlow::Continue(uploaded_blobs)
      99                 :     }
     100               0 : }
     101                 : 
     102               0 : pub(crate) async fn cleanup(
     103               0 :     client: &Arc<GenericRemoteStorage>,
     104               0 :     objects_to_delete: HashSet<RemotePath>,
     105               0 : ) {
     106               0 :     info!(
     107               0 :         "Removing {} objects from the remote storage during cleanup",
     108               0 :         objects_to_delete.len()
     109               0 :     );
     110               0 :     let mut delete_tasks = JoinSet::new();
     111               0 :     for object_to_delete in objects_to_delete {
     112               0 :         let task_client = Arc::clone(client);
     113               0 :         delete_tasks.spawn(async move {
     114               0 :             debug!("Deleting remote item at path {object_to_delete:?}");
     115               0 :             task_client
     116               0 :                 .delete(&object_to_delete)
     117               0 :                 .await
     118               0 :                 .with_context(|| format!("{object_to_delete:?} removal"))
     119               0 :         });
     120               0 :     }
     121                 : 
     122               0 :     while let Some(task_run_result) = delete_tasks.join_next().await {
     123               0 :         match task_run_result {
     124               0 :             Ok(task_result) => match task_result {
     125               0 :                 Ok(()) => {}
     126               0 :                 Err(e) => error!("Delete task failed: {e:?}"),
     127                 :             },
     128               0 :             Err(join_err) => error!("Delete task did not finish correctly: {join_err}"),
     129                 :         }
     130                 :     }
     131               0 : }
     132                 : pub(crate) struct Uploads {
     133                 :     pub(crate) prefixes: HashSet<RemotePath>,
     134                 :     pub(crate) blobs: HashSet<RemotePath>,
     135                 : }
     136                 : 
     137               0 : pub(crate) async fn upload_remote_data(
     138               0 :     client: &Arc<GenericRemoteStorage>,
     139               0 :     base_prefix_str: &'static str,
     140               0 :     upload_tasks_count: usize,
     141               0 : ) -> ControlFlow<Uploads, Uploads> {
     142               0 :     info!("Creating {upload_tasks_count} remote files");
     143               0 :     let mut upload_tasks = JoinSet::new();
     144               0 :     for i in 1..upload_tasks_count + 1 {
     145               0 :         let task_client = Arc::clone(client);
     146               0 :         upload_tasks.spawn(async move {
     147               0 :             let prefix = format!("{base_prefix_str}/sub_prefix_{i}/");
     148               0 :             let blob_prefix = RemotePath::new(Utf8Path::new(&prefix))
     149               0 :                 .with_context(|| format!("{prefix:?} to RemotePath conversion"))?;
     150               0 :             let blob_path = blob_prefix.join(Utf8Path::new(&format!("blob_{i}")));
     151               0 :             debug!("Creating remote item {i} at path {blob_path:?}");
     152                 : 
     153               0 :             let (data, data_len) =
     154               0 :                 upload_stream(format!("remote blob data {i}").into_bytes().into());
     155               0 :             task_client.upload(data, data_len, &blob_path, None).await?;
     156                 : 
     157               0 :             Ok::<_, anyhow::Error>((blob_prefix, blob_path))
     158               0 :         });
     159               0 :     }
     160                 : 
     161               0 :     let mut upload_tasks_failed = false;
     162               0 :     let mut uploaded_prefixes = HashSet::with_capacity(upload_tasks_count);
     163               0 :     let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
     164               0 :     while let Some(task_run_result) = upload_tasks.join_next().await {
     165               0 :         match task_run_result
     166               0 :             .context("task join failed")
     167               0 :             .and_then(|task_result| task_result.context("upload task failed"))
     168                 :         {
     169               0 :             Ok((upload_prefix, upload_path)) => {
     170               0 :                 uploaded_prefixes.insert(upload_prefix);
     171               0 :                 uploaded_blobs.insert(upload_path);
     172               0 :             }
     173               0 :             Err(e) => {
     174               0 :                 error!("Upload task failed: {e:?}");
     175               0 :                 upload_tasks_failed = true;
     176                 :             }
     177                 :         }
     178                 :     }
     179                 : 
     180               0 :     let uploads = Uploads {
     181               0 :         prefixes: uploaded_prefixes,
     182               0 :         blobs: uploaded_blobs,
     183               0 :     };
     184               0 :     if upload_tasks_failed {
     185               0 :         ControlFlow::Break(uploads)
     186                 :     } else {
     187               0 :         ControlFlow::Continue(uploads)
     188                 :     }
     189               0 : }
     190                 : 
     191 CBC          10 : pub(crate) fn ensure_logging_ready() {
     192              10 :     LOGGING_DONE.get_or_init(|| {
     193              10 :         utils::logging::init(
     194              10 :             utils::logging::LogFormat::Test,
     195              10 :             utils::logging::TracingErrorLayerEnablement::Disabled,
     196              10 :             utils::logging::Output::Stdout,
     197              10 :         )
     198              10 :         .expect("logging init failed");
     199              10 :     });
     200              10 : }
        

Generated by: LCOV version 2.1-beta