LCOV - code coverage report
Current view: top level - s3_scrubber/src - delete_batch_producer.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 0.0 % 237 0
Test Date: 2023-09-06 10:18:01 Functions: 0.0 % 54 0

            Line data    Source code
       1              : mod tenant_batch;
       2              : mod timeline_batch;
       3              : 
       4              : use std::future::Future;
       5              : use std::str::FromStr;
       6              : use std::sync::Arc;
       7              : use std::time::Duration;
       8              : 
       9              : use anyhow::Context;
      10              : use aws_sdk_s3::Client;
      11              : use either::Either;
      12              : use tokio::sync::mpsc::UnboundedReceiver;
      13              : use tokio::sync::Mutex;
      14              : use tokio::task::{JoinHandle, JoinSet};
      15              : use tracing::{error, info, info_span, Instrument};
      16              : 
      17              : use crate::cloud_admin_api::{BranchData, CloudAdminApiClient, ProjectData};
      18              : use crate::{list_objects_with_retries, RootTarget, S3Target, TraversingDepth, MAX_RETRIES};
      19              : use utils::id::{TenantId, TenantTimelineId};
      20              : 
      21              : /// Typical tenant to remove contains 1 layer and 1 index_part.json blobs
      22              : /// Also, there are some non-standard tenants to remove, having more layers.
      23              : /// delete_objects request allows up to 1000 keys, so be on a safe side and allow most
      24              : /// batch processing tasks to do 1 delete objects request only.
      25              : ///
      26              : /// Every batch item will be additionally S3 LS'ed later, so keep the batch size
      27              : /// even lower to allow multiple concurrent tasks do the LS requests.
      28              : const BATCH_SIZE: usize = 100;
      29              : 
      30              : pub struct DeleteBatchProducer {
      31              :     delete_tenants_sender_task: JoinHandle<anyhow::Result<ProcessedS3List<TenantId, ProjectData>>>,
      32              :     delete_timelines_sender_task:
      33              :         JoinHandle<anyhow::Result<ProcessedS3List<TenantTimelineId, BranchData>>>,
      34              :     delete_batch_creator_task: JoinHandle<()>,
      35              :     delete_batch_receiver: Arc<Mutex<UnboundedReceiver<DeleteBatch>>>,
      36              : }
      37              : 
      38              : pub struct DeleteProducerStats {
      39              :     pub tenant_stats: ProcessedS3List<TenantId, ProjectData>,
      40              :     pub timeline_stats: Option<ProcessedS3List<TenantTimelineId, BranchData>>,
      41              : }
      42              : 
      43              : impl DeleteProducerStats {
      44            0 :     pub fn tenants_checked(&self) -> usize {
      45            0 :         self.tenant_stats.entries_total
      46            0 :     }
      47              : 
      48            0 :     pub fn active_tenants(&self) -> usize {
      49            0 :         self.tenant_stats.active_entries.len()
      50            0 :     }
      51              : 
      52            0 :     pub fn timelines_checked(&self) -> usize {
      53            0 :         self.timeline_stats
      54            0 :             .as_ref()
      55            0 :             .map(|stats| stats.entries_total)
      56            0 :             .unwrap_or(0)
      57            0 :     }
      58              : }
      59              : 
      60            0 : #[derive(Debug, Default, Clone)]
      61              : pub struct DeleteBatch {
      62              :     pub tenants: Vec<TenantId>,
      63              :     pub timelines: Vec<TenantTimelineId>,
      64              : }
      65              : 
      66              : impl DeleteBatch {
      67            0 :     pub fn merge(&mut self, other: Self) {
      68            0 :         self.tenants.extend(other.tenants);
      69            0 :         self.timelines.extend(other.timelines);
      70            0 :     }
      71              : 
      72            0 :     pub fn len(&self) -> usize {
      73            0 :         self.tenants.len() + self.timelines.len()
      74            0 :     }
      75              : 
      76            0 :     pub fn is_empty(&self) -> bool {
      77            0 :         self.len() == 0
      78            0 :     }
      79              : }
      80              : 
      81              : impl DeleteBatchProducer {
      82            0 :     pub fn start(
      83            0 :         admin_client: Arc<CloudAdminApiClient>,
      84            0 :         s3_client: Arc<Client>,
      85            0 :         s3_root_target: RootTarget,
      86            0 :         traversing_depth: TraversingDepth,
      87            0 :     ) -> Self {
      88            0 :         let (delete_elements_sender, mut delete_elements_receiver) =
      89            0 :             tokio::sync::mpsc::unbounded_channel();
      90            0 :         let delete_elements_sender = Arc::new(delete_elements_sender);
      91            0 :         let admin_client = Arc::new(admin_client);
      92            0 : 
      93            0 :         let (projects_to_check_sender, mut projects_to_check_receiver) =
      94            0 :             tokio::sync::mpsc::unbounded_channel();
      95            0 :         let delete_tenants_root_target = s3_root_target.clone();
      96            0 :         let delete_tenants_client = Arc::clone(&s3_client);
      97            0 :         let delete_tenants_admin_client = Arc::clone(&admin_client);
      98            0 :         let delete_sender = Arc::clone(&delete_elements_sender);
      99            0 :         let delete_tenants_sender_task = tokio::spawn(
     100            0 :             async move {
     101            0 :                 tenant_batch::schedule_cleanup_deleted_tenants(
     102            0 :                     &delete_tenants_root_target,
     103            0 :                     &delete_tenants_client,
     104            0 :                     &delete_tenants_admin_client,
     105            0 :                     projects_to_check_sender,
     106            0 :                     delete_sender,
     107            0 :                     traversing_depth,
     108            0 :                 )
     109            0 :                 .await
     110            0 :             }
     111            0 :             .instrument(info_span!("delete_tenants_sender")),
     112              :         );
     113            0 :         let delete_timelines_sender_task = tokio::spawn(async move {
     114            0 :             timeline_batch::schedule_cleanup_deleted_timelines(
     115            0 :                 &s3_root_target,
     116            0 :                 &s3_client,
     117            0 :                 &admin_client,
     118            0 :                 &mut projects_to_check_receiver,
     119            0 :                 delete_elements_sender,
     120            0 :             )
     121            0 :             .in_current_span()
     122            0 :             .await
     123            0 :         });
     124            0 : 
     125            0 :         let (delete_batch_sender, delete_batch_receiver) = tokio::sync::mpsc::unbounded_channel();
     126            0 :         let delete_batch_creator_task = tokio::spawn(
     127            0 :             async move {
     128            0 :                 'outer: loop {
     129            0 :                     let mut delete_batch = DeleteBatch::default();
     130            0 :                     while delete_batch.len() < BATCH_SIZE {
     131            0 :                         match delete_elements_receiver.recv().await {
     132            0 :                             Some(new_task) => match new_task {
     133            0 :                                 Either::Left(tenant_id) => delete_batch.tenants.push(tenant_id),
     134            0 :                                 Either::Right(timeline_id) => {
     135            0 :                                     delete_batch.timelines.push(timeline_id)
     136            0 :                                 }
     137            0 :                             },
     138            0 :                             None => {
     139            0 :                                 info!("Task finished: sender dropped");
     140            0 :                                 delete_batch_sender.send(delete_batch).ok();
     141            0 :                                 break 'outer;
     142            0 :                             }
     143            0 :                         }
     144            0 :                     }
     145            0 : 
     146            0 :                     if !delete_batch.is_empty() {
     147            0 :                         delete_batch_sender.send(delete_batch).ok();
     148            0 :                     }
     149            0 :                 }
     150            0 :             }
     151            0 :             .instrument(info_span!("delete batch creator")),
     152              :         );
     153              : 
     154            0 :         Self {
     155            0 :             delete_tenants_sender_task,
     156            0 :             delete_timelines_sender_task,
     157            0 :             delete_batch_creator_task,
     158            0 :             delete_batch_receiver: Arc::new(Mutex::new(delete_batch_receiver)),
     159            0 :         }
     160            0 :     }
     161              : 
     162            0 :     pub fn subscribe(&self) -> Arc<Mutex<UnboundedReceiver<DeleteBatch>>> {
     163            0 :         self.delete_batch_receiver.clone()
     164            0 :     }
     165              : 
     166            0 :     pub async fn join(self) -> anyhow::Result<DeleteProducerStats> {
     167            0 :         let (delete_tenants_task_result, delete_timelines_task_result, batch_task_result) = tokio::join!(
     168            0 :             self.delete_tenants_sender_task,
     169            0 :             self.delete_timelines_sender_task,
     170            0 :             self.delete_batch_creator_task,
     171            0 :         );
     172              : 
     173            0 :         let tenant_stats = match delete_tenants_task_result {
     174            0 :             Ok(Ok(stats)) => stats,
     175            0 :             Ok(Err(tenant_deletion_error)) => return Err(tenant_deletion_error),
     176            0 :             Err(join_error) => {
     177            0 :                 anyhow::bail!("Failed to join the delete tenant producing task: {join_error}")
     178              :             }
     179              :         };
     180              : 
     181            0 :         let timeline_stats = match delete_timelines_task_result {
     182            0 :             Ok(Ok(stats)) => Some(stats),
     183            0 :             Ok(Err(timeline_deletion_error)) => return Err(timeline_deletion_error),
     184            0 :             Err(join_error) => {
     185            0 :                 anyhow::bail!("Failed to join the delete timeline producing task: {join_error}")
     186              :             }
     187              :         };
     188              : 
     189            0 :         match batch_task_result {
     190            0 :             Ok(()) => (),
     191            0 :             Err(join_error) => anyhow::bail!("Failed to join the batch forming task: {join_error}"),
     192              :         };
     193              : 
     194            0 :         Ok(DeleteProducerStats {
     195            0 :             tenant_stats,
     196            0 :             timeline_stats,
     197            0 :         })
     198            0 :     }
     199              : }
     200              : 
     201              : pub struct ProcessedS3List<I, A> {
     202              :     pub entries_total: usize,
     203              :     pub entries_to_delete: Vec<I>,
     204              :     pub active_entries: Vec<A>,
     205              : }
     206              : 
     207              : impl<I, A> Default for ProcessedS3List<I, A> {
     208            0 :     fn default() -> Self {
     209            0 :         Self {
     210            0 :             entries_total: 0,
     211            0 :             entries_to_delete: Vec::new(),
     212            0 :             active_entries: Vec::new(),
     213            0 :         }
     214            0 :     }
     215              : }
     216              : 
     217              : impl<I, A> ProcessedS3List<I, A> {
     218            0 :     fn merge(&mut self, other: Self) {
     219            0 :         self.entries_total += other.entries_total;
     220            0 :         self.entries_to_delete.extend(other.entries_to_delete);
     221            0 :         self.active_entries.extend(other.active_entries);
     222            0 :     }
     223              : 
     224            0 :     fn change_ids<NewI>(self, transform: impl Fn(I) -> NewI) -> ProcessedS3List<NewI, A> {
     225            0 :         ProcessedS3List {
     226            0 :             entries_total: self.entries_total,
     227            0 :             entries_to_delete: self.entries_to_delete.into_iter().map(transform).collect(),
     228            0 :             active_entries: self.active_entries,
     229            0 :         }
     230            0 :     }
     231              : }
     232              : 
     233            0 : async fn process_s3_target_recursively<F, Fut, I, E, A>(
     234            0 :     s3_client: &Client,
     235            0 :     target: &S3Target,
     236            0 :     find_active_and_deleted_entries: F,
     237            0 : ) -> anyhow::Result<ProcessedS3List<I, A>>
     238            0 : where
     239            0 :     I: FromStr<Err = E> + Send + Sync,
     240            0 :     E: Send + Sync + std::error::Error + 'static,
     241            0 :     F: FnOnce(Vec<I>) -> Fut + Clone,
     242            0 :     Fut: Future<Output = anyhow::Result<ProcessedS3List<I, A>>>,
     243            0 : {
     244            0 :     let mut continuation_token = None;
     245            0 :     let mut total_entries = ProcessedS3List::default();
     246              : 
     247              :     loop {
     248            0 :         let fetch_response =
     249            0 :             list_objects_with_retries(s3_client, target, continuation_token.clone()).await?;
     250              : 
     251            0 :         let new_entry_ids = fetch_response
     252            0 :             .common_prefixes()
     253            0 :             .unwrap_or_default()
     254            0 :             .iter()
     255            0 :             .filter_map(|prefix| prefix.prefix())
     256            0 :             .filter_map(|prefix| -> Option<&str> {
     257            0 :                 prefix
     258            0 :                     .strip_prefix(&target.prefix_in_bucket)?
     259            0 :                     .strip_suffix('/')
     260            0 :             })
     261            0 :             .map(|entry_id_str| {
     262            0 :                 entry_id_str
     263            0 :                     .parse()
     264            0 :                     .with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
     265            0 :             })
     266            0 :             .collect::<anyhow::Result<Vec<I>>>()
     267            0 :             .context("list and parse bucket's entry ids")?;
     268              : 
     269              :         total_entries.merge(
     270            0 :             (find_active_and_deleted_entries.clone())(new_entry_ids)
     271            0 :                 .await
     272            0 :                 .context("filter active and deleted entry ids")?,
     273              :         );
     274              : 
     275            0 :         match fetch_response.next_continuation_token {
     276            0 :             Some(new_token) => continuation_token = Some(new_token),
     277            0 :             None => break,
     278            0 :         }
     279            0 :     }
     280            0 : 
     281            0 :     Ok(total_entries)
     282            0 : }
     283              : 
     284              : enum FetchResult<A> {
     285              :     Found(A),
     286              :     Deleted,
     287              :     Absent,
     288              : }
     289              : 
     290            0 : async fn split_to_active_and_deleted_entries<I, A, F, Fut>(
     291            0 :     new_entry_ids: Vec<I>,
     292            0 :     find_active_entry: F,
     293            0 : ) -> anyhow::Result<ProcessedS3List<I, A>>
     294            0 : where
     295            0 :     I: std::fmt::Display + Send + Sync + 'static + Copy,
     296            0 :     A: Send + 'static,
     297            0 :     F: FnOnce(I) -> Fut + Send + Sync + 'static + Clone,
     298            0 :     Fut: Future<Output = anyhow::Result<FetchResult<A>>> + Send,
     299            0 : {
     300            0 :     let entries_total = new_entry_ids.len();
     301            0 :     let mut check_tasks = JoinSet::new();
     302            0 :     let mut active_entries = Vec::with_capacity(entries_total);
     303            0 :     let mut entries_to_delete = Vec::with_capacity(entries_total);
     304              : 
     305            0 :     for new_entry_id in new_entry_ids {
     306            0 :         let check_closure = find_active_entry.clone();
     307            0 :         check_tasks.spawn(
     308            0 :             async move {
     309            0 :                 (
     310            0 :                     new_entry_id,
     311            0 :                     async {
     312            0 :                         for _ in 0..MAX_RETRIES {
     313            0 :                             let closure_clone = check_closure.clone();
     314            0 :                             match closure_clone(new_entry_id).await {
     315            0 :                                 Ok(active_entry) => return Ok(active_entry),
     316            0 :                                 Err(e) => {
     317            0 :                                     error!("find active entry admin API call failed: {e}");
     318            0 :                                     tokio::time::sleep(Duration::from_secs(1)).await;
     319              :                                 }
     320              :                             }
     321              :                         }
     322              : 
     323            0 :                         anyhow::bail!("Failed to check entry {new_entry_id} {MAX_RETRIES} times")
     324            0 :                     }
     325            0 :                     .await,
     326              :                 )
     327            0 :             }
     328            0 :             .instrument(info_span!("filter_active_entries")),
     329              :         );
     330              :     }
     331              : 
     332            0 :     while let Some(task_result) = check_tasks.join_next().await {
     333            0 :         let (entry_id, entry_data_fetch_result) = task_result.context("task join")?;
     334            0 :         match entry_data_fetch_result.context("entry data fetch")? {
     335            0 :             FetchResult::Found(active_entry) => {
     336            0 :                 info!("Entry {entry_id} is alive, cannot delete");
     337            0 :                 active_entries.push(active_entry);
     338              :             }
     339              :             FetchResult::Deleted => {
     340            0 :                 info!("Entry {entry_id} deleted in the admin data, can safely delete");
     341            0 :                 entries_to_delete.push(entry_id);
     342              :             }
     343              :             FetchResult::Absent => {
     344            0 :                 info!("Entry {entry_id} absent in the admin data, can safely delete");
     345            0 :                 entries_to_delete.push(entry_id);
     346              :             }
     347              :         }
     348              :     }
     349            0 :     Ok(ProcessedS3List {
     350            0 :         entries_total,
     351            0 :         entries_to_delete,
     352            0 :         active_entries,
     353            0 :     })
     354            0 : }
        

Generated by: LCOV version 2.1-beta