LCOV - code coverage report
Current view: top level - s3_scrubber/src - s3_deletion.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 0.0 % 314 0
Test Date: 2023-09-06 10:18:01 Functions: 0.0 % 68 0

            Line data    Source code
       1              : use std::collections::BTreeMap;
       2              : use std::num::NonZeroUsize;
       3              : use std::sync::Arc;
       4              : use std::time::Duration;
       5              : 
       6              : use anyhow::Context;
       7              : use aws_sdk_s3::types::{Delete, ObjectIdentifier};
       8              : use aws_sdk_s3::Client;
       9              : use tokio::sync::mpsc::error::TryRecvError;
      10              : use tokio::sync::mpsc::UnboundedReceiver;
      11              : use tokio::sync::Mutex;
      12              : use tokio::task::JoinSet;
      13              : use tracing::{debug, error, info, info_span, Instrument};
      14              : 
      15              : use crate::delete_batch_producer::DeleteBatch;
      16              : use crate::{list_objects_with_retries, RootTarget, S3Target, TenantId, MAX_RETRIES};
      17              : use utils::id::TenantTimelineId;
      18              : 
      19              : pub struct S3Deleter {
      20              :     dry_run: bool,
      21              :     concurrent_tasks_count: NonZeroUsize,
      22              :     delete_batch_receiver: Arc<Mutex<UnboundedReceiver<DeleteBatch>>>,
      23              :     s3_client: Arc<Client>,
      24              :     s3_target: RootTarget,
      25              : }
      26              : 
      27              : impl S3Deleter {
      28            0 :     pub fn new(
      29            0 :         dry_run: bool,
      30            0 :         concurrent_tasks_count: NonZeroUsize,
      31            0 :         s3_client: Arc<Client>,
      32            0 :         delete_batch_receiver: Arc<Mutex<UnboundedReceiver<DeleteBatch>>>,
      33            0 :         s3_target: RootTarget,
      34            0 :     ) -> Self {
      35            0 :         Self {
      36            0 :             dry_run,
      37            0 :             concurrent_tasks_count,
      38            0 :             delete_batch_receiver,
      39            0 :             s3_client,
      40            0 :             s3_target,
      41            0 :         }
      42            0 :     }
      43              : 
      44            0 :     pub async fn remove_all(self) -> anyhow::Result<DeletionStats> {
      45            0 :         let mut deletion_tasks = JoinSet::new();
      46            0 :         for id in 0..self.concurrent_tasks_count.get() {
      47            0 :             let closure_client = Arc::clone(&self.s3_client);
      48            0 :             let closure_s3_target = self.s3_target.clone();
      49            0 :             let closure_batch_receiver = Arc::clone(&self.delete_batch_receiver);
      50            0 :             let dry_run = self.dry_run;
      51            0 :             deletion_tasks.spawn(
      52            0 :                 async move {
      53            0 :                     info!("Task started");
      54              :                     (
      55            0 :                         id,
      56            0 :                         async move {
      57            0 :                             let mut task_stats = DeletionStats::default();
      58              :                             loop {
      59            0 :                                 let mut guard = closure_batch_receiver.lock().await;
      60            0 :                                 let receiver_result = guard.try_recv();
      61            0 :                                 drop(guard);
      62            0 :                                 match receiver_result {
      63            0 :                                     Ok(batch) => {
      64            0 :                                         let stats = delete_batch(
      65            0 :                                             &closure_client,
      66            0 :                                             &closure_s3_target,
      67            0 :                                             batch,
      68            0 :                                             dry_run,
      69            0 :                                         )
      70            0 :                                         .await
      71            0 :                                         .context("batch deletion")?;
      72            0 :                                         debug!(
      73            0 :                                             "Batch processed, number of objects deleted per tenant in the batch is: {}, per timeline — {}",
      74            0 :                                             stats.deleted_tenant_keys.len(),
      75            0 :                                             stats.deleted_timeline_keys.len(),
      76            0 :                                         );
      77            0 :                                         task_stats.merge(stats);
      78              :                                     }
      79              :                                     Err(TryRecvError::Empty) => {
      80            0 :                                         debug!("No tasks yet, waiting");
      81            0 :                                         tokio::time::sleep(Duration::from_secs(1)).await;
      82            0 :                                         continue;
      83              :                                     }
      84              :                                     Err(TryRecvError::Disconnected) => {
      85            0 :                                         info!("Task finished: sender dropped");
      86            0 :                                         return Ok(task_stats);
      87              :                                     }
      88              :                                 }
      89              :                             }
      90            0 :                         }
      91            0 :                         .in_current_span()
      92            0 :                         .await,
      93              :                     )
      94            0 :                 }
      95            0 :                 .instrument(info_span!("deletion_task", %id)),
      96              :             );
      97              :         }
      98              : 
      99            0 :         let mut total_stats = DeletionStats::default();
     100            0 :         while let Some(task_result) = deletion_tasks.join_next().await {
     101            0 :             match task_result {
     102            0 :                 Ok((id, Ok(task_stats))) => {
     103            0 :                     info!("Task {id} completed");
     104            0 :                     total_stats.merge(task_stats);
     105              :                 }
     106            0 :                 Ok((id, Err(e))) => {
     107            0 :                     error!("Task {id} failed: {e:#}");
     108            0 :                     return Err(e);
     109              :                 }
     110            0 :                 Err(join_error) => anyhow::bail!("Failed to join on a task: {join_error:?}"),
     111              :             }
     112              :         }
     113              : 
     114            0 :         Ok(total_stats)
     115            0 :     }
     116              : }
     117              : 
     118              : /// S3 delete_objects allows up to 1000 keys to be passed in a single request.
     119              : /// Yet if you pass too many key requests, apparently S3 could return with OK and
     120              : /// actually delete nothing, so keep the number lower.
     121              : const MAX_ITEMS_TO_DELETE: usize = 200;
     122              : 
     123            0 : #[derive(Debug, Default)]
     124              : pub struct DeletionStats {
     125              :     pub deleted_tenant_keys: BTreeMap<TenantId, usize>,
     126              :     pub deleted_timeline_keys: BTreeMap<TenantTimelineId, usize>,
     127              : }
     128              : 
     129              : impl DeletionStats {
     130            0 :     fn merge(&mut self, other: Self) {
     131            0 :         self.deleted_tenant_keys.extend(other.deleted_tenant_keys);
     132            0 :         self.deleted_timeline_keys
     133            0 :             .extend(other.deleted_timeline_keys);
     134            0 :     }
     135              : }
     136              : 
     137            0 : async fn delete_batch(
     138            0 :     s3_client: &Client,
     139            0 :     s3_target: &RootTarget,
     140            0 :     batch: DeleteBatch,
     141            0 :     dry_run: bool,
     142            0 : ) -> anyhow::Result<DeletionStats> {
     143            0 :     let (deleted_tenant_keys, deleted_timeline_keys) = tokio::join!(
     144            0 :         delete_tenants_batch(batch.tenants, s3_target, s3_client, dry_run),
     145            0 :         delete_timelines_batch(batch.timelines, s3_target, s3_client, dry_run),
     146            0 :     );
     147              : 
     148              :     Ok(DeletionStats {
     149            0 :         deleted_tenant_keys: deleted_tenant_keys.context("tenant batch deletion")?,
     150            0 :         deleted_timeline_keys: deleted_timeline_keys.context("timeline batch deletion")?,
     151              :     })
     152            0 : }
     153              : 
     154            0 : async fn delete_tenants_batch(
     155            0 :     batched_tenants: Vec<TenantId>,
     156            0 :     s3_target: &RootTarget,
     157            0 :     s3_client: &Client,
     158            0 :     dry_run: bool,
     159            0 : ) -> Result<BTreeMap<TenantId, usize>, anyhow::Error> {
     160            0 :     info!("Deleting tenants batch of size {}", batched_tenants.len());
     161            0 :     info!("Tenant ids to remove: {batched_tenants:?}");
     162            0 :     let deleted_keys = delete_elements(
     163            0 :         &batched_tenants,
     164            0 :         s3_target,
     165            0 :         s3_client,
     166            0 :         dry_run,
     167            0 :         |root_target, tenant_to_delete| root_target.tenant_root(&tenant_to_delete),
     168            0 :     )
     169            0 :     .await?;
     170              : 
     171            0 :     if !dry_run {
     172            0 :         let mut last_err = None;
     173            0 :         for _ in 0..MAX_RETRIES {
     174            0 :             match ensure_tenant_batch_deleted(s3_client, s3_target, &batched_tenants).await {
     175              :                 Ok(()) => {
     176            0 :                     last_err = None;
     177            0 :                     break;
     178              :                 }
     179            0 :                 Err(e) => {
     180            0 :                     error!("Failed to ensure the tenant batch is deleted: {e}");
     181            0 :                     last_err = Some(e);
     182              :                 }
     183              :             }
     184              :         }
     185              : 
     186            0 :         if let Some(e) = last_err {
     187            0 :             anyhow::bail!(
     188            0 :                 "Failed to ensure that tenant batch is deleted {MAX_RETRIES} times: {e:?}"
     189            0 :             );
     190            0 :         }
     191            0 :     }
     192              : 
     193            0 :     Ok(deleted_keys)
     194            0 : }
     195              : 
     196            0 : async fn delete_timelines_batch(
     197            0 :     batched_timelines: Vec<TenantTimelineId>,
     198            0 :     s3_target: &RootTarget,
     199            0 :     s3_client: &Client,
     200            0 :     dry_run: bool,
     201            0 : ) -> Result<BTreeMap<TenantTimelineId, usize>, anyhow::Error> {
     202            0 :     info!(
     203            0 :         "Deleting timelines batch of size {}",
     204            0 :         batched_timelines.len()
     205            0 :     );
     206            0 :     info!(
     207            0 :         "Timeline ids to remove: {:?}",
     208            0 :         batched_timelines
     209            0 :             .iter()
     210            0 :             .map(|id| id.to_string())
     211            0 :             .collect::<Vec<_>>()
     212            0 :     );
     213            0 :     let deleted_keys = delete_elements(
     214            0 :         &batched_timelines,
     215            0 :         s3_target,
     216            0 :         s3_client,
     217            0 :         dry_run,
     218            0 :         |root_target, timeline_to_delete| root_target.timeline_root(&timeline_to_delete),
     219            0 :     )
     220            0 :     .await?;
     221              : 
     222            0 :     if !dry_run {
     223            0 :         let mut last_err = None;
     224            0 :         for _ in 0..MAX_RETRIES {
     225            0 :             match ensure_timeline_batch_deleted(s3_client, s3_target, &batched_timelines).await {
     226              :                 Ok(()) => {
     227            0 :                     last_err = None;
     228            0 :                     break;
     229              :                 }
     230            0 :                 Err(e) => {
     231            0 :                     error!("Failed to ensure the timelines batch is deleted: {e}");
     232            0 :                     last_err = Some(e);
     233              :                 }
     234              :             }
     235              :         }
     236              : 
     237            0 :         if let Some(e) = last_err {
     238            0 :             anyhow::bail!(
     239            0 :                 "Failed to ensure that timeline batch is deleted {MAX_RETRIES} times: {e:?}"
     240            0 :             );
     241            0 :         }
     242            0 :     }
     243            0 :     Ok(deleted_keys)
     244            0 : }
     245              : 
     246            0 : async fn delete_elements<I>(
     247            0 :     batched_ids: &Vec<I>,
     248            0 :     s3_target: &RootTarget,
     249            0 :     s3_client: &Client,
     250            0 :     dry_run: bool,
     251            0 :     target_producer: impl Fn(&RootTarget, I) -> S3Target,
     252            0 : ) -> Result<BTreeMap<I, usize>, anyhow::Error>
     253            0 : where
     254            0 :     I: Ord + PartialOrd + Copy,
     255            0 : {
     256            0 :     let mut deleted_keys = BTreeMap::new();
     257            0 :     let mut object_ids_to_delete = Vec::with_capacity(MAX_ITEMS_TO_DELETE);
     258            0 :     for &id_to_delete in batched_ids {
     259            0 :         let mut continuation_token = None;
     260            0 :         let mut subtargets = vec![target_producer(s3_target, id_to_delete)];
     261            0 :         while let Some(current_target) = subtargets.pop() {
     262              :             loop {
     263            0 :                 let fetch_response = list_objects_with_retries(
     264            0 :                     s3_client,
     265            0 :                     &current_target,
     266            0 :                     continuation_token.clone(),
     267            0 :                 )
     268            0 :                 .await?;
     269              : 
     270            0 :                 for object_id in fetch_response
     271            0 :                     .contents()
     272            0 :                     .unwrap_or_default()
     273            0 :                     .iter()
     274            0 :                     .filter_map(|object| object.key())
     275            0 :                     .map(|key| ObjectIdentifier::builder().key(key).build())
     276              :                 {
     277            0 :                     if object_ids_to_delete.len() >= MAX_ITEMS_TO_DELETE {
     278            0 :                         let object_ids_for_request = std::mem::replace(
     279            0 :                             &mut object_ids_to_delete,
     280            0 :                             Vec::with_capacity(MAX_ITEMS_TO_DELETE),
     281            0 :                         );
     282            0 :                         send_delete_request(
     283            0 :                             s3_client,
     284            0 :                             s3_target.bucket_name(),
     285            0 :                             object_ids_for_request,
     286            0 :                             dry_run,
     287            0 :                         )
     288            0 :                         .await
     289            0 :                         .context("object ids deletion")?;
     290            0 :                     }
     291              : 
     292            0 :                     object_ids_to_delete.push(object_id);
     293            0 :                     *deleted_keys.entry(id_to_delete).or_default() += 1;
     294              :                 }
     295              : 
     296            0 :                 subtargets.extend(
     297            0 :                     fetch_response
     298            0 :                         .common_prefixes()
     299            0 :                         .unwrap_or_default()
     300            0 :                         .iter()
     301            0 :                         .filter_map(|common_prefix| common_prefix.prefix())
     302            0 :                         .map(|prefix| {
     303            0 :                             let mut new_target = current_target.clone();
     304            0 :                             new_target.prefix_in_bucket = prefix.to_string();
     305            0 :                             new_target
     306            0 :                         }),
     307            0 :                 );
     308            0 : 
     309            0 :                 match fetch_response.next_continuation_token {
     310            0 :                     Some(new_token) => continuation_token = Some(new_token),
     311            0 :                     None => break,
     312              :                 }
     313              :             }
     314              :         }
     315              :     }
     316            0 :     if !object_ids_to_delete.is_empty() {
     317            0 :         info!("Removing last objects of the batch");
     318            0 :         send_delete_request(
     319            0 :             s3_client,
     320            0 :             s3_target.bucket_name(),
     321            0 :             object_ids_to_delete,
     322            0 :             dry_run,
     323            0 :         )
     324            0 :         .await
     325            0 :         .context("Last object ids deletion")?;
     326            0 :     }
     327            0 :     Ok(deleted_keys)
     328            0 : }
     329              : 
     330            0 : pub async fn send_delete_request(
     331            0 :     s3_client: &Client,
     332            0 :     bucket_name: &str,
     333            0 :     ids: Vec<ObjectIdentifier>,
     334            0 :     dry_run: bool,
     335            0 : ) -> anyhow::Result<()> {
     336            0 :     info!("Removing {} object ids from S3", ids.len());
     337            0 :     info!("Object ids to remove: {ids:?}");
     338            0 :     let delete_request = s3_client
     339            0 :         .delete_objects()
     340            0 :         .bucket(bucket_name)
     341            0 :         .delete(Delete::builder().set_objects(Some(ids)).build());
     342            0 :     if dry_run {
     343            0 :         info!("Dry run, skipping the actual removal");
     344            0 :         Ok(())
     345              :     } else {
     346            0 :         let original_request = delete_request.clone();
     347              : 
     348            0 :         for _ in 0..MAX_RETRIES {
     349            0 :             match delete_request
     350            0 :                 .clone()
     351            0 :                 .send()
     352            0 :                 .await
     353            0 :                 .context("delete request processing")
     354              :             {
     355            0 :                 Ok(delete_response) => {
     356            0 :                     info!("Delete response: {delete_response:?}");
     357            0 :                     match delete_response.errors() {
     358            0 :                         Some(delete_errors) => {
     359            0 :                             error!("Delete request returned errors: {delete_errors:?}");
     360            0 :                             tokio::time::sleep(Duration::from_secs(1)).await;
     361              :                         }
     362              :                         None => {
     363            0 :                             info!("Successfully removed an object batch from S3");
     364            0 :                             return Ok(());
     365              :                         }
     366              :                     }
     367              :                 }
     368            0 :                 Err(e) => {
     369            0 :                     error!("Failed to send a delete request: {e:#}");
     370            0 :                     tokio::time::sleep(Duration::from_secs(1)).await;
     371              :                 }
     372              :             }
     373              :         }
     374              : 
     375            0 :         error!("Failed to do deletion, request: {original_request:?}");
     376            0 :         anyhow::bail!("Failed to run deletion request {MAX_RETRIES} times");
     377              :     }
     378            0 : }
     379              : 
     380            0 : async fn ensure_tenant_batch_deleted(
     381            0 :     s3_client: &Client,
     382            0 :     s3_target: &RootTarget,
     383            0 :     batch: &[TenantId],
     384            0 : ) -> anyhow::Result<()> {
     385            0 :     let mut not_deleted_tenants = Vec::with_capacity(batch.len());
     386              : 
     387            0 :     for &tenant_id in batch {
     388            0 :         let fetch_response =
     389            0 :             list_objects_with_retries(s3_client, &s3_target.tenant_root(&tenant_id), None).await?;
     390              : 
     391            0 :         if fetch_response.is_truncated()
     392            0 :             || fetch_response.contents().is_some()
     393            0 :             || fetch_response.common_prefixes().is_some()
     394              :         {
     395            0 :             error!(
     396            0 :                 "Tenant {tenant_id} should be deleted, but its list response is {fetch_response:?}"
     397            0 :             );
     398            0 :             not_deleted_tenants.push(tenant_id);
     399            0 :         }
     400              :     }
     401              : 
     402            0 :     anyhow::ensure!(
     403            0 :         not_deleted_tenants.is_empty(),
     404            0 :         "Failed to delete all tenants in a batch. Tenants {not_deleted_tenants:?} should be deleted."
     405              :     );
     406            0 :     Ok(())
     407            0 : }
     408              : 
     409            0 : async fn ensure_timeline_batch_deleted(
     410            0 :     s3_client: &Client,
     411            0 :     s3_target: &RootTarget,
     412            0 :     batch: &[TenantTimelineId],
     413            0 : ) -> anyhow::Result<()> {
     414            0 :     let mut not_deleted_timelines = Vec::with_capacity(batch.len());
     415              : 
     416            0 :     for &id in batch {
     417            0 :         let fetch_response =
     418            0 :             list_objects_with_retries(s3_client, &s3_target.timeline_root(&id), None).await?;
     419              : 
     420            0 :         if fetch_response.is_truncated()
     421            0 :             || fetch_response.contents().is_some()
     422            0 :             || fetch_response.common_prefixes().is_some()
     423              :         {
     424            0 :             error!("Timeline {id} should be deleted, but its list response is {fetch_response:?}");
     425            0 :             not_deleted_timelines.push(id);
     426            0 :         }
     427              :     }
     428              : 
     429            0 :     anyhow::ensure!(
     430            0 :         not_deleted_timelines.is_empty(),
     431            0 :         "Failed to delete all timelines in a batch"
     432              :     );
     433            0 :     Ok(())
     434            0 : }
        

Generated by: LCOV version 2.1-beta