LCOV - differential code coverage report
Current view: top level - s3_scrubber/src - s3_deletion.rs (source / functions) Coverage Total Hit UBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 0.0 % 314 0 314
Current Date: 2023-10-19 02:04:12 Functions: 0.0 % 68 0 68
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use std::collections::BTreeMap;
       2                 : use std::num::NonZeroUsize;
       3                 : use std::sync::Arc;
       4                 : use std::time::Duration;
       5                 : 
       6                 : use anyhow::Context;
       7                 : use aws_sdk_s3::types::{Delete, ObjectIdentifier};
       8                 : use aws_sdk_s3::Client;
       9                 : use tokio::sync::mpsc::error::TryRecvError;
      10                 : use tokio::sync::mpsc::UnboundedReceiver;
      11                 : use tokio::sync::Mutex;
      12                 : use tokio::task::JoinSet;
      13                 : use tracing::{debug, error, info, info_span, Instrument};
      14                 : 
      15                 : use crate::delete_batch_producer::DeleteBatch;
      16                 : use crate::{list_objects_with_retries, RootTarget, S3Target, TenantId, MAX_RETRIES};
      17                 : use utils::id::TenantTimelineId;
      18                 : 
      19                 : pub struct S3Deleter {
      20                 :     dry_run: bool,
      21                 :     concurrent_tasks_count: NonZeroUsize,
      22                 :     delete_batch_receiver: Arc<Mutex<UnboundedReceiver<DeleteBatch>>>,
      23                 :     s3_client: Arc<Client>,
      24                 :     s3_target: RootTarget,
      25                 : }
      26                 : 
      27                 : impl S3Deleter {
      28 UBC           0 :     pub fn new(
      29               0 :         dry_run: bool,
      30               0 :         concurrent_tasks_count: NonZeroUsize,
      31               0 :         s3_client: Arc<Client>,
      32               0 :         delete_batch_receiver: Arc<Mutex<UnboundedReceiver<DeleteBatch>>>,
      33               0 :         s3_target: RootTarget,
      34               0 :     ) -> Self {
      35               0 :         Self {
      36               0 :             dry_run,
      37               0 :             concurrent_tasks_count,
      38               0 :             delete_batch_receiver,
      39               0 :             s3_client,
      40               0 :             s3_target,
      41               0 :         }
      42               0 :     }
      43                 : 
      44               0 :     pub async fn remove_all(self) -> anyhow::Result<DeletionStats> {
      45               0 :         let mut deletion_tasks = JoinSet::new();
      46               0 :         for id in 0..self.concurrent_tasks_count.get() {
      47               0 :             let closure_client = Arc::clone(&self.s3_client);
      48               0 :             let closure_s3_target = self.s3_target.clone();
      49               0 :             let closure_batch_receiver = Arc::clone(&self.delete_batch_receiver);
      50               0 :             let dry_run = self.dry_run;
      51               0 :             deletion_tasks.spawn(
      52               0 :                 async move {
      53               0 :                     info!("Task started");
      54                 :                     (
      55               0 :                         id,
      56               0 :                         async move {
      57               0 :                             let mut task_stats = DeletionStats::default();
      58                 :                             loop {
      59               0 :                                 let mut guard = closure_batch_receiver.lock().await;
      60               0 :                                 let receiver_result = guard.try_recv();
      61               0 :                                 drop(guard);
      62               0 :                                 match receiver_result {
      63               0 :                                     Ok(batch) => {
      64               0 :                                         let stats = delete_batch(
      65               0 :                                             &closure_client,
      66               0 :                                             &closure_s3_target,
      67               0 :                                             batch,
      68               0 :                                             dry_run,
      69               0 :                                         )
      70               0 :                                         .await
      71               0 :                                         .context("batch deletion")?;
      72               0 :                                         debug!(
      73               0 :                                             "Batch processed, number of objects deleted per tenant in the batch is: {}, per timeline — {}",
      74               0 :                                             stats.deleted_tenant_keys.len(),
      75               0 :                                             stats.deleted_timeline_keys.len(),
      76               0 :                                         );
      77               0 :                                         task_stats.merge(stats);
      78                 :                                     }
      79                 :                                     Err(TryRecvError::Empty) => {
      80               0 :                                         debug!("No tasks yet, waiting");
      81               0 :                                         tokio::time::sleep(Duration::from_secs(1)).await;
      82               0 :                                         continue;
      83                 :                                     }
      84                 :                                     Err(TryRecvError::Disconnected) => {
      85               0 :                                         info!("Task finished: sender dropped");
      86               0 :                                         return Ok(task_stats);
      87                 :                                     }
      88                 :                                 }
      89                 :                             }
      90               0 :                         }
      91               0 :                         .in_current_span()
      92               0 :                         .await,
      93                 :                     )
      94               0 :                 }
      95               0 :                 .instrument(info_span!("deletion_task", %id)),
      96                 :             );
      97                 :         }
      98                 : 
      99               0 :         let mut total_stats = DeletionStats::default();
     100               0 :         while let Some(task_result) = deletion_tasks.join_next().await {
     101               0 :             match task_result {
     102               0 :                 Ok((id, Ok(task_stats))) => {
     103               0 :                     info!("Task {id} completed");
     104               0 :                     total_stats.merge(task_stats);
     105                 :                 }
     106               0 :                 Ok((id, Err(e))) => {
     107               0 :                     error!("Task {id} failed: {e:#}");
     108               0 :                     return Err(e);
     109                 :                 }
     110               0 :                 Err(join_error) => anyhow::bail!("Failed to join on a task: {join_error:?}"),
     111                 :             }
     112                 :         }
     113                 : 
     114               0 :         Ok(total_stats)
     115               0 :     }
     116                 : }
     117                 : 
     118                 : /// S3 delete_objects allows up to 1000 keys to be passed in a single request.
     119                 : /// Yet if you pass too many key requests, apparently S3 could return with OK and
     120                 : /// actually delete nothing, so keep the number lower.
     121                 : const MAX_ITEMS_TO_DELETE: usize = 200;
     122                 : 
     123               0 : #[derive(Debug, Default)]
     124                 : pub struct DeletionStats {
     125                 :     pub deleted_tenant_keys: BTreeMap<TenantId, usize>,
     126                 :     pub deleted_timeline_keys: BTreeMap<TenantTimelineId, usize>,
     127                 : }
     128                 : 
     129                 : impl DeletionStats {
     130               0 :     fn merge(&mut self, other: Self) {
     131               0 :         self.deleted_tenant_keys.extend(other.deleted_tenant_keys);
     132               0 :         self.deleted_timeline_keys
     133               0 :             .extend(other.deleted_timeline_keys);
     134               0 :     }
     135                 : }
     136                 : 
     137               0 : async fn delete_batch(
     138               0 :     s3_client: &Client,
     139               0 :     s3_target: &RootTarget,
     140               0 :     batch: DeleteBatch,
     141               0 :     dry_run: bool,
     142               0 : ) -> anyhow::Result<DeletionStats> {
     143               0 :     let (deleted_tenant_keys, deleted_timeline_keys) = tokio::join!(
     144               0 :         delete_tenants_batch(batch.tenants, s3_target, s3_client, dry_run),
     145               0 :         delete_timelines_batch(batch.timelines, s3_target, s3_client, dry_run),
     146               0 :     );
     147                 : 
     148                 :     Ok(DeletionStats {
     149               0 :         deleted_tenant_keys: deleted_tenant_keys.context("tenant batch deletion")?,
     150               0 :         deleted_timeline_keys: deleted_timeline_keys.context("timeline batch deletion")?,
     151                 :     })
     152               0 : }
     153                 : 
     154               0 : async fn delete_tenants_batch(
     155               0 :     batched_tenants: Vec<TenantId>,
     156               0 :     s3_target: &RootTarget,
     157               0 :     s3_client: &Client,
     158               0 :     dry_run: bool,
     159               0 : ) -> Result<BTreeMap<TenantId, usize>, anyhow::Error> {
     160               0 :     info!("Deleting tenants batch of size {}", batched_tenants.len());
     161               0 :     info!("Tenant ids to remove: {batched_tenants:?}");
     162               0 :     let deleted_keys = delete_elements(
     163               0 :         &batched_tenants,
     164               0 :         s3_target,
     165               0 :         s3_client,
     166               0 :         dry_run,
     167               0 :         |root_target, tenant_to_delete| root_target.tenant_root(&tenant_to_delete),
     168               0 :     )
     169               0 :     .await?;
     170                 : 
     171               0 :     if !dry_run {
     172               0 :         let mut last_err = None;
     173               0 :         for _ in 0..MAX_RETRIES {
     174               0 :             match ensure_tenant_batch_deleted(s3_client, s3_target, &batched_tenants).await {
     175                 :                 Ok(()) => {
     176               0 :                     last_err = None;
     177               0 :                     break;
     178                 :                 }
     179               0 :                 Err(e) => {
     180               0 :                     error!("Failed to ensure the tenant batch is deleted: {e}");
     181               0 :                     last_err = Some(e);
     182                 :                 }
     183                 :             }
     184                 :         }
     185                 : 
     186               0 :         if let Some(e) = last_err {
     187               0 :             anyhow::bail!(
     188               0 :                 "Failed to ensure that tenant batch is deleted {MAX_RETRIES} times: {e:?}"
     189               0 :             );
     190               0 :         }
     191               0 :     }
     192                 : 
     193               0 :     Ok(deleted_keys)
     194               0 : }
     195                 : 
     196               0 : async fn delete_timelines_batch(
     197               0 :     batched_timelines: Vec<TenantTimelineId>,
     198               0 :     s3_target: &RootTarget,
     199               0 :     s3_client: &Client,
     200               0 :     dry_run: bool,
     201               0 : ) -> Result<BTreeMap<TenantTimelineId, usize>, anyhow::Error> {
     202               0 :     info!(
     203               0 :         "Deleting timelines batch of size {}",
     204               0 :         batched_timelines.len()
     205               0 :     );
     206               0 :     info!(
     207               0 :         "Timeline ids to remove: {:?}",
     208               0 :         batched_timelines
     209               0 :             .iter()
     210               0 :             .map(|id| id.to_string())
     211               0 :             .collect::<Vec<_>>()
     212               0 :     );
     213               0 :     let deleted_keys = delete_elements(
     214               0 :         &batched_timelines,
     215               0 :         s3_target,
     216               0 :         s3_client,
     217               0 :         dry_run,
     218               0 :         |root_target, timeline_to_delete| root_target.timeline_root(&timeline_to_delete),
     219               0 :     )
     220               0 :     .await?;
     221                 : 
     222               0 :     if !dry_run {
     223               0 :         let mut last_err = None;
     224               0 :         for _ in 0..MAX_RETRIES {
     225               0 :             match ensure_timeline_batch_deleted(s3_client, s3_target, &batched_timelines).await {
     226                 :                 Ok(()) => {
     227               0 :                     last_err = None;
     228               0 :                     break;
     229                 :                 }
     230               0 :                 Err(e) => {
     231               0 :                     error!("Failed to ensure the timelines batch is deleted: {e}");
     232               0 :                     last_err = Some(e);
     233                 :                 }
     234                 :             }
     235                 :         }
     236                 : 
     237               0 :         if let Some(e) = last_err {
     238               0 :             anyhow::bail!(
     239               0 :                 "Failed to ensure that timeline batch is deleted {MAX_RETRIES} times: {e:?}"
     240               0 :             );
     241               0 :         }
     242               0 :     }
     243               0 :     Ok(deleted_keys)
     244               0 : }
     245                 : 
     246               0 : async fn delete_elements<I>(
     247               0 :     batched_ids: &Vec<I>,
     248               0 :     s3_target: &RootTarget,
     249               0 :     s3_client: &Client,
     250               0 :     dry_run: bool,
     251               0 :     target_producer: impl Fn(&RootTarget, I) -> S3Target,
     252               0 : ) -> Result<BTreeMap<I, usize>, anyhow::Error>
     253               0 : where
     254               0 :     I: Ord + PartialOrd + Copy,
     255               0 : {
     256               0 :     let mut deleted_keys = BTreeMap::new();
     257               0 :     let mut object_ids_to_delete = Vec::with_capacity(MAX_ITEMS_TO_DELETE);
     258               0 :     for &id_to_delete in batched_ids {
     259               0 :         let mut continuation_token = None;
     260               0 :         let mut subtargets = vec![target_producer(s3_target, id_to_delete)];
     261               0 :         while let Some(current_target) = subtargets.pop() {
     262                 :             loop {
     263               0 :                 let fetch_response = list_objects_with_retries(
     264               0 :                     s3_client,
     265               0 :                     &current_target,
     266               0 :                     continuation_token.clone(),
     267               0 :                 )
     268               0 :                 .await?;
     269                 : 
     270               0 :                 for object_id in fetch_response
     271               0 :                     .contents()
     272               0 :                     .unwrap_or_default()
     273               0 :                     .iter()
     274               0 :                     .filter_map(|object| object.key())
     275               0 :                     .map(|key| ObjectIdentifier::builder().key(key).build())
     276                 :                 {
     277               0 :                     if object_ids_to_delete.len() >= MAX_ITEMS_TO_DELETE {
     278               0 :                         let object_ids_for_request = std::mem::replace(
     279               0 :                             &mut object_ids_to_delete,
     280               0 :                             Vec::with_capacity(MAX_ITEMS_TO_DELETE),
     281               0 :                         );
     282               0 :                         send_delete_request(
     283               0 :                             s3_client,
     284               0 :                             s3_target.bucket_name(),
     285               0 :                             object_ids_for_request,
     286               0 :                             dry_run,
     287               0 :                         )
     288               0 :                         .await
     289               0 :                         .context("object ids deletion")?;
     290               0 :                     }
     291                 : 
     292               0 :                     object_ids_to_delete.push(object_id);
     293               0 :                     *deleted_keys.entry(id_to_delete).or_default() += 1;
     294                 :                 }
     295                 : 
     296               0 :                 subtargets.extend(
     297               0 :                     fetch_response
     298               0 :                         .common_prefixes()
     299               0 :                         .unwrap_or_default()
     300               0 :                         .iter()
     301               0 :                         .filter_map(|common_prefix| common_prefix.prefix())
     302               0 :                         .map(|prefix| {
     303               0 :                             let mut new_target = current_target.clone();
     304               0 :                             new_target.prefix_in_bucket = prefix.to_string();
     305               0 :                             new_target
     306               0 :                         }),
     307               0 :                 );
     308               0 : 
     309               0 :                 match fetch_response.next_continuation_token {
     310               0 :                     Some(new_token) => continuation_token = Some(new_token),
     311               0 :                     None => break,
     312                 :                 }
     313                 :             }
     314                 :         }
     315                 :     }
     316               0 :     if !object_ids_to_delete.is_empty() {
     317               0 :         info!("Removing last objects of the batch");
     318               0 :         send_delete_request(
     319               0 :             s3_client,
     320               0 :             s3_target.bucket_name(),
     321               0 :             object_ids_to_delete,
     322               0 :             dry_run,
     323               0 :         )
     324               0 :         .await
     325               0 :         .context("Last object ids deletion")?;
     326               0 :     }
     327               0 :     Ok(deleted_keys)
     328               0 : }
     329                 : 
     330               0 : pub async fn send_delete_request(
     331               0 :     s3_client: &Client,
     332               0 :     bucket_name: &str,
     333               0 :     ids: Vec<ObjectIdentifier>,
     334               0 :     dry_run: bool,
     335               0 : ) -> anyhow::Result<()> {
     336               0 :     info!("Removing {} object ids from S3", ids.len());
     337               0 :     info!("Object ids to remove: {ids:?}");
     338               0 :     let delete_request = s3_client
     339               0 :         .delete_objects()
     340               0 :         .bucket(bucket_name)
     341               0 :         .delete(Delete::builder().set_objects(Some(ids)).build());
     342               0 :     if dry_run {
     343               0 :         info!("Dry run, skipping the actual removal");
     344               0 :         Ok(())
     345                 :     } else {
     346               0 :         let original_request = delete_request.clone();
     347                 : 
     348               0 :         for _ in 0..MAX_RETRIES {
     349               0 :             match delete_request
     350               0 :                 .clone()
     351               0 :                 .send()
     352               0 :                 .await
     353               0 :                 .context("delete request processing")
     354                 :             {
     355               0 :                 Ok(delete_response) => {
     356               0 :                     info!("Delete response: {delete_response:?}");
     357               0 :                     match delete_response.errors() {
     358               0 :                         Some(delete_errors) => {
     359               0 :                             error!("Delete request returned errors: {delete_errors:?}");
     360               0 :                             tokio::time::sleep(Duration::from_secs(1)).await;
     361                 :                         }
     362                 :                         None => {
     363               0 :                             info!("Successfully removed an object batch from S3");
     364               0 :                             return Ok(());
     365                 :                         }
     366                 :                     }
     367                 :                 }
     368               0 :                 Err(e) => {
     369               0 :                     error!("Failed to send a delete request: {e:#}");
     370               0 :                     tokio::time::sleep(Duration::from_secs(1)).await;
     371                 :                 }
     372                 :             }
     373                 :         }
     374                 : 
     375               0 :         error!("Failed to do deletion, request: {original_request:?}");
     376               0 :         anyhow::bail!("Failed to run deletion request {MAX_RETRIES} times");
     377                 :     }
     378               0 : }
     379                 : 
     380               0 : async fn ensure_tenant_batch_deleted(
     381               0 :     s3_client: &Client,
     382               0 :     s3_target: &RootTarget,
     383               0 :     batch: &[TenantId],
     384               0 : ) -> anyhow::Result<()> {
     385               0 :     let mut not_deleted_tenants = Vec::with_capacity(batch.len());
     386                 : 
     387               0 :     for &tenant_id in batch {
     388               0 :         let fetch_response =
     389               0 :             list_objects_with_retries(s3_client, &s3_target.tenant_root(&tenant_id), None).await?;
     390                 : 
     391               0 :         if fetch_response.is_truncated()
     392               0 :             || fetch_response.contents().is_some()
     393               0 :             || fetch_response.common_prefixes().is_some()
     394                 :         {
     395               0 :             error!(
     396               0 :                 "Tenant {tenant_id} should be deleted, but its list response is {fetch_response:?}"
     397               0 :             );
     398               0 :             not_deleted_tenants.push(tenant_id);
     399               0 :         }
     400                 :     }
     401                 : 
     402               0 :     anyhow::ensure!(
     403               0 :         not_deleted_tenants.is_empty(),
     404               0 :         "Failed to delete all tenants in a batch. Tenants {not_deleted_tenants:?} should be deleted."
     405                 :     );
     406               0 :     Ok(())
     407               0 : }
     408                 : 
     409               0 : async fn ensure_timeline_batch_deleted(
     410               0 :     s3_client: &Client,
     411               0 :     s3_target: &RootTarget,
     412               0 :     batch: &[TenantTimelineId],
     413               0 : ) -> anyhow::Result<()> {
     414               0 :     let mut not_deleted_timelines = Vec::with_capacity(batch.len());
     415                 : 
     416               0 :     for &id in batch {
     417               0 :         let fetch_response =
     418               0 :             list_objects_with_retries(s3_client, &s3_target.timeline_root(&id), None).await?;
     419                 : 
     420               0 :         if fetch_response.is_truncated()
     421               0 :             || fetch_response.contents().is_some()
     422               0 :             || fetch_response.common_prefixes().is_some()
     423                 :         {
     424               0 :             error!("Timeline {id} should be deleted, but its list response is {fetch_response:?}");
     425               0 :             not_deleted_timelines.push(id);
     426               0 :         }
     427                 :     }
     428                 : 
     429               0 :     anyhow::ensure!(
     430               0 :         not_deleted_timelines.is_empty(),
     431               0 :         "Failed to delete all timelines in a batch"
     432                 :     );
     433               0 :     Ok(())
     434               0 : }
        

Generated by: LCOV version 2.1-beta