LCOV - differential code coverage report
Current view: top level - s3_scrubber/src/delete_batch_producer - timeline_batch.rs (source / functions) Coverage Total Hit UBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 0.0 % 76 0 76
Current Date: 2023-10-19 02:04:12 Functions: 0.0 % 11 0 11
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use std::sync::Arc;
       2                 : 
       3                 : use anyhow::Context;
       4                 : use aws_sdk_s3::Client;
       5                 : use either::Either;
       6                 : use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
       7                 : use tracing::{info, info_span, Instrument};
       8                 : 
       9                 : use crate::cloud_admin_api::{BranchData, CloudAdminApiClient, ProjectData};
      10                 : use crate::delete_batch_producer::{FetchResult, ProcessedS3List};
      11                 : use crate::RootTarget;
      12                 : use utils::id::{TenantId, TenantTimelineId};
      13                 : 
      14 UBC           0 : pub async fn schedule_cleanup_deleted_timelines(
      15               0 :     s3_root_target: &RootTarget,
      16               0 :     s3_client: &Arc<Client>,
      17               0 :     admin_client: &Arc<CloudAdminApiClient>,
      18               0 :     projects_to_check_receiver: &mut UnboundedReceiver<ProjectData>,
      19               0 :     delete_elements_sender: Arc<UnboundedSender<Either<TenantId, TenantTimelineId>>>,
      20               0 : ) -> anyhow::Result<ProcessedS3List<TenantTimelineId, BranchData>> {
      21               0 :     info!(
      22               0 :         "Starting to list the bucket from root {}",
      23               0 :         s3_root_target.bucket_name()
      24               0 :     );
      25               0 :     s3_client
      26               0 :         .head_bucket()
      27               0 :         .bucket(s3_root_target.bucket_name())
      28               0 :         .send()
      29               0 :         .await
      30               0 :         .with_context(|| format!("bucket {} was not found", s3_root_target.bucket_name()))?;
      31                 : 
      32               0 :     let mut timeline_stats = ProcessedS3List::default();
      33               0 :     while let Some(project_to_check) = projects_to_check_receiver.recv().await {
      34               0 :         let check_client = Arc::clone(admin_client);
      35               0 : 
      36               0 :         let check_s3_client = Arc::clone(s3_client);
      37               0 : 
      38               0 :         let check_delete_sender = Arc::clone(&delete_elements_sender);
      39               0 : 
      40               0 :         let check_root = s3_root_target.clone();
      41                 : 
      42               0 :         let new_stats = async move {
      43               0 :             let tenant_id_to_check = project_to_check.tenant;
      44               0 :             let check_target = check_root.timelines_root(&tenant_id_to_check);
      45               0 :             let stats = super::process_s3_target_recursively(
      46               0 :                 &check_s3_client,
      47               0 :                 &check_target,
      48               0 :                 |s3_timelines| async move {
      49               0 :                     let another_client = check_client.clone();
      50               0 :                     super::split_to_active_and_deleted_entries(
      51               0 :                         s3_timelines,
      52               0 :                         move |timeline_id| async move {
      53               0 :                             let console_branch = another_client
      54               0 :                                 .find_timeline_branch(timeline_id)
      55               0 :                                 .await
      56               0 :                                 .map_err(|e| {
      57               0 :                                     anyhow::anyhow!(
      58               0 :                                         "Timeline {timeline_id} branch admin check: {e}"
      59               0 :                                     )
      60               0 :                                 })?;
      61                 : 
      62               0 :                             let id = TenantTimelineId::new(tenant_id_to_check, timeline_id);
      63               0 :                             Ok(match console_branch {
      64               0 :                                 Some(console_branch) => {
      65               0 :                                     if console_branch.deleted {
      66               0 :                                         check_delete_sender.send(Either::Right(id)).ok();
      67               0 :                                         FetchResult::Deleted
      68                 :                                     } else {
      69               0 :                                         FetchResult::Found(console_branch)
      70                 :                                     }
      71                 :                                 }
      72                 :                                 None => {
      73               0 :                                     check_delete_sender.send(Either::Right(id)).ok();
      74               0 :                                     FetchResult::Absent
      75                 :                                 }
      76                 :                             })
      77               0 :                         },
      78               0 :                     )
      79               0 :                     .await
      80               0 :                 },
      81               0 :             )
      82               0 :             .await
      83               0 :             .with_context(|| format!("tenant {tenant_id_to_check} timeline batch processing"))?
      84               0 :             .change_ids(|timeline_id| TenantTimelineId::new(tenant_id_to_check, timeline_id));
      85               0 : 
      86               0 :             Ok::<_, anyhow::Error>(stats)
      87               0 :         }
      88               0 :         .instrument(info_span!("delete_timelines_sender", tenant = %project_to_check.tenant))
      89               0 :         .await?;
      90                 : 
      91               0 :         timeline_stats.merge(new_stats);
      92                 :     }
      93                 : 
      94               0 :     info!(
      95               0 :         "Among {} timelines, found {} timelines to delete and {} active ones",
      96               0 :         timeline_stats.entries_total,
      97               0 :         timeline_stats.entries_to_delete.len(),
      98               0 :         timeline_stats.active_entries.len(),
      99               0 :     );
     100                 : 
     101               0 :     Ok(timeline_stats)
     102               0 : }
        

Generated by: LCOV version 2.1-beta