LCOV - differential code coverage report
Current view: top level - s3_scrubber/src - delete_batch_producer.rs (source / functions) Coverage Total Hit UBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 0.0 % 238 0 238
Current Date: 2023-10-19 02:04:12 Functions: 0.0 % 54 0 54
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : mod tenant_batch;
       2                 : mod timeline_batch;
       3                 : 
       4                 : use std::future::Future;
       5                 : use std::str::FromStr;
       6                 : use std::sync::Arc;
       7                 : use std::time::Duration;
       8                 : 
       9                 : use anyhow::Context;
      10                 : use aws_sdk_s3::Client;
      11                 : use either::Either;
      12                 : use tokio::sync::mpsc::UnboundedReceiver;
      13                 : use tokio::sync::Mutex;
      14                 : use tokio::task::{JoinHandle, JoinSet};
      15                 : use tracing::{error, info, info_span, Instrument};
      16                 : 
      17                 : use crate::cloud_admin_api::{BranchData, CloudAdminApiClient, ProjectData};
      18                 : use crate::{list_objects_with_retries, RootTarget, S3Target, TraversingDepth, MAX_RETRIES};
      19                 : use utils::id::{TenantId, TenantTimelineId};
      20                 : 
      21                 : /// Typical tenant to remove contains 1 layer and 1 index_part.json blobs
      22                 : /// Also, there are some non-standard tenants to remove, having more layers.
      23                 : /// delete_objects request allows up to 1000 keys, so be on a safe side and allow most
      24                 : /// batch processing tasks to do 1 delete objects request only.
      25                 : ///
      26                 : /// Every batch item will be additionally S3 LS'ed later, so keep the batch size
      27                 : /// even lower to allow multiple concurrent tasks do the LS requests.
      28                 : const BATCH_SIZE: usize = 100;
      29                 : 
      30                 : pub struct DeleteBatchProducer {
      31                 :     delete_tenants_sender_task: JoinHandle<anyhow::Result<ProcessedS3List<TenantId, ProjectData>>>,
      32                 :     delete_timelines_sender_task:
      33                 :         JoinHandle<anyhow::Result<ProcessedS3List<TenantTimelineId, BranchData>>>,
      34                 :     delete_batch_creator_task: JoinHandle<()>,
      35                 :     delete_batch_receiver: Arc<Mutex<UnboundedReceiver<DeleteBatch>>>,
      36                 : }
      37                 : 
      38                 : pub struct DeleteProducerStats {
      39                 :     pub tenant_stats: ProcessedS3List<TenantId, ProjectData>,
      40                 :     pub timeline_stats: Option<ProcessedS3List<TenantTimelineId, BranchData>>,
      41                 : }
      42                 : 
      43                 : impl DeleteProducerStats {
      44 UBC           0 :     pub fn tenants_checked(&self) -> usize {
      45               0 :         self.tenant_stats.entries_total
      46               0 :     }
      47                 : 
      48               0 :     pub fn active_tenants(&self) -> usize {
      49               0 :         self.tenant_stats.active_entries.len()
      50               0 :     }
      51                 : 
      52               0 :     pub fn timelines_checked(&self) -> usize {
      53               0 :         self.timeline_stats
      54               0 :             .as_ref()
      55               0 :             .map(|stats| stats.entries_total)
      56               0 :             .unwrap_or(0)
      57               0 :     }
      58                 : }
      59                 : 
      60               0 : #[derive(Debug, Default, Clone)]
      61                 : pub struct DeleteBatch {
      62                 :     pub tenants: Vec<TenantId>,
      63                 :     pub timelines: Vec<TenantTimelineId>,
      64                 : }
      65                 : 
      66                 : impl DeleteBatch {
      67               0 :     pub fn merge(&mut self, other: Self) {
      68               0 :         self.tenants.extend(other.tenants);
      69               0 :         self.timelines.extend(other.timelines);
      70               0 :     }
      71                 : 
      72               0 :     pub fn len(&self) -> usize {
      73               0 :         self.tenants.len() + self.timelines.len()
      74               0 :     }
      75                 : 
      76               0 :     pub fn is_empty(&self) -> bool {
      77               0 :         self.len() == 0
      78               0 :     }
      79                 : }
      80                 : 
      81                 : impl DeleteBatchProducer {
      82               0 :     pub fn start(
      83               0 :         admin_client: Arc<CloudAdminApiClient>,
      84               0 :         s3_client: Arc<Client>,
      85               0 :         s3_root_target: RootTarget,
      86               0 :         traversing_depth: TraversingDepth,
      87               0 :     ) -> Self {
      88               0 :         let (delete_elements_sender, mut delete_elements_receiver) =
      89               0 :             tokio::sync::mpsc::unbounded_channel();
      90               0 :         let delete_elements_sender = Arc::new(delete_elements_sender);
      91               0 :         let admin_client = Arc::new(admin_client);
      92               0 : 
      93               0 :         let (projects_to_check_sender, mut projects_to_check_receiver) =
      94               0 :             tokio::sync::mpsc::unbounded_channel();
      95               0 :         let delete_tenants_root_target = s3_root_target.clone();
      96               0 :         let delete_tenants_client = Arc::clone(&s3_client);
      97               0 :         let delete_tenants_admin_client = Arc::clone(&admin_client);
      98               0 :         let delete_sender = Arc::clone(&delete_elements_sender);
      99               0 :         let delete_tenants_sender_task = tokio::spawn(
     100               0 :             async move {
     101               0 :                 tenant_batch::schedule_cleanup_deleted_tenants(
     102               0 :                     &delete_tenants_root_target,
     103               0 :                     &delete_tenants_client,
     104               0 :                     &delete_tenants_admin_client,
     105               0 :                     projects_to_check_sender,
     106               0 :                     delete_sender,
     107               0 :                     traversing_depth,
     108               0 :                 )
     109               0 :                 .await
     110               0 :             }
     111               0 :             .instrument(info_span!("delete_tenants_sender")),
     112                 :         );
     113               0 :         let delete_timelines_sender_task = tokio::spawn(async move {
     114               0 :             timeline_batch::schedule_cleanup_deleted_timelines(
     115               0 :                 &s3_root_target,
     116               0 :                 &s3_client,
     117               0 :                 &admin_client,
     118               0 :                 &mut projects_to_check_receiver,
     119               0 :                 delete_elements_sender,
     120               0 :             )
     121               0 :             .in_current_span()
     122               0 :             .await
     123               0 :         });
     124               0 : 
     125               0 :         let (delete_batch_sender, delete_batch_receiver) = tokio::sync::mpsc::unbounded_channel();
     126               0 :         let delete_batch_creator_task = tokio::spawn(
     127               0 :             async move {
     128               0 :                 'outer: loop {
     129               0 :                     let mut delete_batch = DeleteBatch::default();
     130               0 :                     while delete_batch.len() < BATCH_SIZE {
     131               0 :                         match delete_elements_receiver.recv().await {
     132               0 :                             Some(new_task) => match new_task {
     133               0 :                                 Either::Left(tenant_id) => delete_batch.tenants.push(tenant_id),
     134               0 :                                 Either::Right(timeline_id) => {
     135               0 :                                     delete_batch.timelines.push(timeline_id)
     136               0 :                                 }
     137               0 :                             },
     138               0 :                             None => {
     139               0 :                                 info!("Task finished: sender dropped");
     140               0 :                                 delete_batch_sender.send(delete_batch).ok();
     141               0 :                                 break 'outer;
     142               0 :                             }
     143               0 :                         }
     144               0 :                     }
     145               0 : 
     146               0 :                     if !delete_batch.is_empty() {
     147               0 :                         delete_batch_sender.send(delete_batch).ok();
     148               0 :                     }
     149               0 :                 }
     150               0 :             }
     151               0 :             .instrument(info_span!("delete batch creator")),
     152                 :         );
     153                 : 
     154               0 :         Self {
     155               0 :             delete_tenants_sender_task,
     156               0 :             delete_timelines_sender_task,
     157               0 :             delete_batch_creator_task,
     158               0 :             delete_batch_receiver: Arc::new(Mutex::new(delete_batch_receiver)),
     159               0 :         }
     160               0 :     }
     161                 : 
     162               0 :     pub fn subscribe(&self) -> Arc<Mutex<UnboundedReceiver<DeleteBatch>>> {
     163               0 :         self.delete_batch_receiver.clone()
     164               0 :     }
     165                 : 
     166               0 :     pub async fn join(self) -> anyhow::Result<DeleteProducerStats> {
     167               0 :         let (delete_tenants_task_result, delete_timelines_task_result, batch_task_result) = tokio::join!(
     168               0 :             self.delete_tenants_sender_task,
     169               0 :             self.delete_timelines_sender_task,
     170               0 :             self.delete_batch_creator_task,
     171               0 :         );
     172                 : 
     173               0 :         let tenant_stats = match delete_tenants_task_result {
     174               0 :             Ok(Ok(stats)) => stats,
     175               0 :             Ok(Err(tenant_deletion_error)) => return Err(tenant_deletion_error),
     176               0 :             Err(join_error) => {
     177               0 :                 anyhow::bail!("Failed to join the delete tenant producing task: {join_error}")
     178                 :             }
     179                 :         };
     180                 : 
     181               0 :         let timeline_stats = match delete_timelines_task_result {
     182               0 :             Ok(Ok(stats)) => Some(stats),
     183               0 :             Ok(Err(timeline_deletion_error)) => return Err(timeline_deletion_error),
     184               0 :             Err(join_error) => {
     185               0 :                 anyhow::bail!("Failed to join the delete timeline producing task: {join_error}")
     186                 :             }
     187                 :         };
     188                 : 
     189               0 :         match batch_task_result {
     190               0 :             Ok(()) => (),
     191               0 :             Err(join_error) => anyhow::bail!("Failed to join the batch forming task: {join_error}"),
     192                 :         };
     193                 : 
     194               0 :         Ok(DeleteProducerStats {
     195               0 :             tenant_stats,
     196               0 :             timeline_stats,
     197               0 :         })
     198               0 :     }
     199                 : }
     200                 : 
     201                 : pub struct ProcessedS3List<I, A> {
     202                 :     pub entries_total: usize,
     203                 :     pub entries_to_delete: Vec<I>,
     204                 :     pub active_entries: Vec<A>,
     205                 : }
     206                 : 
     207                 : impl<I, A> Default for ProcessedS3List<I, A> {
     208               0 :     fn default() -> Self {
     209               0 :         Self {
     210               0 :             entries_total: 0,
     211               0 :             entries_to_delete: Vec::new(),
     212               0 :             active_entries: Vec::new(),
     213               0 :         }
     214               0 :     }
     215                 : }
     216                 : 
     217                 : impl<I, A> ProcessedS3List<I, A> {
     218               0 :     fn merge(&mut self, other: Self) {
     219               0 :         self.entries_total += other.entries_total;
     220               0 :         self.entries_to_delete.extend(other.entries_to_delete);
     221               0 :         self.active_entries.extend(other.active_entries);
     222               0 :     }
     223                 : 
     224               0 :     fn change_ids<NewI>(self, transform: impl Fn(I) -> NewI) -> ProcessedS3List<NewI, A> {
     225               0 :         ProcessedS3List {
     226               0 :             entries_total: self.entries_total,
     227               0 :             entries_to_delete: self.entries_to_delete.into_iter().map(transform).collect(),
     228               0 :             active_entries: self.active_entries,
     229               0 :         }
     230               0 :     }
     231                 : }
     232                 : 
     233               0 : async fn process_s3_target_recursively<F, Fut, I, E, A>(
     234               0 :     s3_client: &Client,
     235               0 :     target: &S3Target,
     236               0 :     find_active_and_deleted_entries: F,
     237               0 : ) -> anyhow::Result<ProcessedS3List<I, A>>
     238               0 : where
     239               0 :     I: FromStr<Err = E> + Send + Sync,
     240               0 :     E: Send + Sync + std::error::Error + 'static,
     241               0 :     F: FnOnce(Vec<I>) -> Fut + Clone,
     242               0 :     Fut: Future<Output = anyhow::Result<ProcessedS3List<I, A>>>,
     243               0 : {
     244               0 :     let mut continuation_token = None;
     245               0 :     let mut total_entries = ProcessedS3List::default();
     246                 : 
     247                 :     loop {
     248               0 :         let fetch_response =
     249               0 :             list_objects_with_retries(s3_client, target, continuation_token.clone()).await?;
     250                 : 
     251               0 :         let new_entry_ids = fetch_response
     252               0 :             .common_prefixes()
     253               0 :             .unwrap_or_default()
     254               0 :             .iter()
     255               0 :             .filter_map(|prefix| prefix.prefix())
     256               0 :             .filter_map(|prefix| -> Option<&str> {
     257               0 :                 prefix
     258               0 :                     .strip_prefix(&target.prefix_in_bucket)?
     259               0 :                     .strip_suffix('/')
     260               0 :             })
     261               0 :             .map(|entry_id_str| {
     262               0 :                 entry_id_str
     263               0 :                     .parse()
     264               0 :                     .with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
     265               0 :             })
     266               0 :             .collect::<anyhow::Result<Vec<I>>>()
     267               0 :             .context("list and parse bucket's entry ids")?;
     268                 : 
     269               0 :         total_entries.merge(
     270               0 :             (find_active_and_deleted_entries.clone())(new_entry_ids)
     271               0 :                 .await
     272               0 :                 .context("filter active and deleted entry ids")?,
     273                 :         );
     274                 : 
     275               0 :         match fetch_response.next_continuation_token {
     276               0 :             Some(new_token) => continuation_token = Some(new_token),
     277               0 :             None => break,
     278               0 :         }
     279               0 :     }
     280               0 : 
     281               0 :     Ok(total_entries)
     282               0 : }
     283                 : 
     284                 : enum FetchResult<A> {
     285                 :     Found(A),
     286                 :     Deleted,
     287                 :     Absent,
     288                 : }
     289                 : 
     290               0 : async fn split_to_active_and_deleted_entries<I, A, F, Fut>(
     291               0 :     new_entry_ids: Vec<I>,
     292               0 :     find_active_entry: F,
     293               0 : ) -> anyhow::Result<ProcessedS3List<I, A>>
     294               0 : where
     295               0 :     I: std::fmt::Display + Send + Sync + 'static + Copy,
     296               0 :     A: Send + 'static,
     297               0 :     F: FnOnce(I) -> Fut + Send + Sync + 'static + Clone,
     298               0 :     Fut: Future<Output = anyhow::Result<FetchResult<A>>> + Send,
     299               0 : {
     300               0 :     let entries_total = new_entry_ids.len();
     301               0 :     let mut check_tasks = JoinSet::new();
     302               0 :     let mut active_entries = Vec::with_capacity(entries_total);
     303               0 :     let mut entries_to_delete = Vec::with_capacity(entries_total);
     304                 : 
     305               0 :     for new_entry_id in new_entry_ids {
     306               0 :         let check_closure = find_active_entry.clone();
     307               0 :         check_tasks.spawn(
     308               0 :             async move {
     309               0 :                 (
     310               0 :                     new_entry_id,
     311               0 :                     async {
     312               0 :                         for _ in 0..MAX_RETRIES {
     313               0 :                             let closure_clone = check_closure.clone();
     314               0 :                             match closure_clone(new_entry_id).await {
     315               0 :                                 Ok(active_entry) => return Ok(active_entry),
     316               0 :                                 Err(e) => {
     317               0 :                                     error!("find active entry admin API call failed: {e}");
     318               0 :                                     tokio::time::sleep(Duration::from_secs(1)).await;
     319                 :                                 }
     320                 :                             }
     321                 :                         }
     322                 : 
     323               0 :                         anyhow::bail!("Failed to check entry {new_entry_id} {MAX_RETRIES} times")
     324               0 :                     }
     325               0 :                     .await,
     326                 :                 )
     327               0 :             }
     328               0 :             .instrument(info_span!("filter_active_entries")),
     329                 :         );
     330                 :     }
     331                 : 
     332               0 :     while let Some(task_result) = check_tasks.join_next().await {
     333               0 :         let (entry_id, entry_data_fetch_result) = task_result.context("task join")?;
     334               0 :         match entry_data_fetch_result.context("entry data fetch")? {
     335               0 :             FetchResult::Found(active_entry) => {
     336               0 :                 info!("Entry {entry_id} is alive, cannot delete");
     337               0 :                 active_entries.push(active_entry);
     338                 :             }
     339                 :             FetchResult::Deleted => {
     340               0 :                 info!("Entry {entry_id} deleted in the admin data, can safely delete");
     341               0 :                 entries_to_delete.push(entry_id);
     342                 :             }
     343                 :             FetchResult::Absent => {
     344               0 :                 info!("Entry {entry_id} absent in the admin data, can safely delete");
     345               0 :                 entries_to_delete.push(entry_id);
     346                 :             }
     347                 :         }
     348                 :     }
     349               0 :     Ok(ProcessedS3List {
     350               0 :         entries_total,
     351               0 :         entries_to_delete,
     352               0 :         active_entries,
     353               0 :     })
     354               0 : }
        

Generated by: LCOV version 2.1-beta