TLA Line data Source code
1 : use std::collections::BTreeMap;
2 : use std::num::NonZeroUsize;
3 : use std::sync::Arc;
4 : use std::time::Duration;
5 :
6 : use anyhow::Context;
7 : use aws_sdk_s3::types::{Delete, ObjectIdentifier};
8 : use aws_sdk_s3::Client;
9 : use tokio::sync::mpsc::error::TryRecvError;
10 : use tokio::sync::mpsc::UnboundedReceiver;
11 : use tokio::sync::Mutex;
12 : use tokio::task::JoinSet;
13 : use tracing::{debug, error, info, info_span, Instrument};
14 :
15 : use crate::delete_batch_producer::DeleteBatch;
16 : use crate::{list_objects_with_retries, RootTarget, S3Target, TenantId, MAX_RETRIES};
17 : use utils::id::TenantTimelineId;
18 :
19 : pub struct S3Deleter {
20 : dry_run: bool,
21 : concurrent_tasks_count: NonZeroUsize,
22 : delete_batch_receiver: Arc<Mutex<UnboundedReceiver<DeleteBatch>>>,
23 : s3_client: Arc<Client>,
24 : s3_target: RootTarget,
25 : }
26 :
27 : impl S3Deleter {
28 UBC 0 : pub fn new(
29 0 : dry_run: bool,
30 0 : concurrent_tasks_count: NonZeroUsize,
31 0 : s3_client: Arc<Client>,
32 0 : delete_batch_receiver: Arc<Mutex<UnboundedReceiver<DeleteBatch>>>,
33 0 : s3_target: RootTarget,
34 0 : ) -> Self {
35 0 : Self {
36 0 : dry_run,
37 0 : concurrent_tasks_count,
38 0 : delete_batch_receiver,
39 0 : s3_client,
40 0 : s3_target,
41 0 : }
42 0 : }
43 :
44 0 : pub async fn remove_all(self) -> anyhow::Result<DeletionStats> {
45 0 : let mut deletion_tasks = JoinSet::new();
46 0 : for id in 0..self.concurrent_tasks_count.get() {
47 0 : let closure_client = Arc::clone(&self.s3_client);
48 0 : let closure_s3_target = self.s3_target.clone();
49 0 : let closure_batch_receiver = Arc::clone(&self.delete_batch_receiver);
50 0 : let dry_run = self.dry_run;
51 0 : deletion_tasks.spawn(
52 0 : async move {
53 0 : info!("Task started");
54 : (
55 0 : id,
56 0 : async move {
57 0 : let mut task_stats = DeletionStats::default();
58 : loop {
59 0 : let mut guard = closure_batch_receiver.lock().await;
60 0 : let receiver_result = guard.try_recv();
61 0 : drop(guard);
62 0 : match receiver_result {
63 0 : Ok(batch) => {
64 0 : let stats = delete_batch(
65 0 : &closure_client,
66 0 : &closure_s3_target,
67 0 : batch,
68 0 : dry_run,
69 0 : )
70 0 : .await
71 0 : .context("batch deletion")?;
72 0 : debug!(
73 0 : "Batch processed, number of objects deleted per tenant in the batch is: {}, per timeline — {}",
74 0 : stats.deleted_tenant_keys.len(),
75 0 : stats.deleted_timeline_keys.len(),
76 0 : );
77 0 : task_stats.merge(stats);
78 : }
79 : Err(TryRecvError::Empty) => {
80 0 : debug!("No tasks yet, waiting");
81 0 : tokio::time::sleep(Duration::from_secs(1)).await;
82 0 : continue;
83 : }
84 : Err(TryRecvError::Disconnected) => {
85 0 : info!("Task finished: sender dropped");
86 0 : return Ok(task_stats);
87 : }
88 : }
89 : }
90 0 : }
91 0 : .in_current_span()
92 0 : .await,
93 : )
94 0 : }
95 0 : .instrument(info_span!("deletion_task", %id)),
96 : );
97 : }
98 :
99 0 : let mut total_stats = DeletionStats::default();
100 0 : while let Some(task_result) = deletion_tasks.join_next().await {
101 0 : match task_result {
102 0 : Ok((id, Ok(task_stats))) => {
103 0 : info!("Task {id} completed");
104 0 : total_stats.merge(task_stats);
105 : }
106 0 : Ok((id, Err(e))) => {
107 0 : error!("Task {id} failed: {e:#}");
108 0 : return Err(e);
109 : }
110 0 : Err(join_error) => anyhow::bail!("Failed to join on a task: {join_error:?}"),
111 : }
112 : }
113 :
114 0 : Ok(total_stats)
115 0 : }
116 : }
117 :
118 : /// S3 delete_objects allows up to 1000 keys to be passed in a single request.
119 : /// Yet if you pass too many key requests, apparently S3 could return with OK and
120 : /// actually delete nothing, so keep the number lower.
121 : const MAX_ITEMS_TO_DELETE: usize = 200;
122 :
123 0 : #[derive(Debug, Default)]
124 : pub struct DeletionStats {
125 : pub deleted_tenant_keys: BTreeMap<TenantId, usize>,
126 : pub deleted_timeline_keys: BTreeMap<TenantTimelineId, usize>,
127 : }
128 :
129 : impl DeletionStats {
130 0 : fn merge(&mut self, other: Self) {
131 0 : self.deleted_tenant_keys.extend(other.deleted_tenant_keys);
132 0 : self.deleted_timeline_keys
133 0 : .extend(other.deleted_timeline_keys);
134 0 : }
135 : }
136 :
137 0 : async fn delete_batch(
138 0 : s3_client: &Client,
139 0 : s3_target: &RootTarget,
140 0 : batch: DeleteBatch,
141 0 : dry_run: bool,
142 0 : ) -> anyhow::Result<DeletionStats> {
143 0 : let (deleted_tenant_keys, deleted_timeline_keys) = tokio::join!(
144 0 : delete_tenants_batch(batch.tenants, s3_target, s3_client, dry_run),
145 0 : delete_timelines_batch(batch.timelines, s3_target, s3_client, dry_run),
146 0 : );
147 :
148 : Ok(DeletionStats {
149 0 : deleted_tenant_keys: deleted_tenant_keys.context("tenant batch deletion")?,
150 0 : deleted_timeline_keys: deleted_timeline_keys.context("timeline batch deletion")?,
151 : })
152 0 : }
153 :
154 0 : async fn delete_tenants_batch(
155 0 : batched_tenants: Vec<TenantId>,
156 0 : s3_target: &RootTarget,
157 0 : s3_client: &Client,
158 0 : dry_run: bool,
159 0 : ) -> Result<BTreeMap<TenantId, usize>, anyhow::Error> {
160 0 : info!("Deleting tenants batch of size {}", batched_tenants.len());
161 0 : info!("Tenant ids to remove: {batched_tenants:?}");
162 0 : let deleted_keys = delete_elements(
163 0 : &batched_tenants,
164 0 : s3_target,
165 0 : s3_client,
166 0 : dry_run,
167 0 : |root_target, tenant_to_delete| root_target.tenant_root(&tenant_to_delete),
168 0 : )
169 0 : .await?;
170 :
171 0 : if !dry_run {
172 0 : let mut last_err = None;
173 0 : for _ in 0..MAX_RETRIES {
174 0 : match ensure_tenant_batch_deleted(s3_client, s3_target, &batched_tenants).await {
175 : Ok(()) => {
176 0 : last_err = None;
177 0 : break;
178 : }
179 0 : Err(e) => {
180 0 : error!("Failed to ensure the tenant batch is deleted: {e}");
181 0 : last_err = Some(e);
182 : }
183 : }
184 : }
185 :
186 0 : if let Some(e) = last_err {
187 0 : anyhow::bail!(
188 0 : "Failed to ensure that tenant batch is deleted {MAX_RETRIES} times: {e:?}"
189 0 : );
190 0 : }
191 0 : }
192 :
193 0 : Ok(deleted_keys)
194 0 : }
195 :
196 0 : async fn delete_timelines_batch(
197 0 : batched_timelines: Vec<TenantTimelineId>,
198 0 : s3_target: &RootTarget,
199 0 : s3_client: &Client,
200 0 : dry_run: bool,
201 0 : ) -> Result<BTreeMap<TenantTimelineId, usize>, anyhow::Error> {
202 0 : info!(
203 0 : "Deleting timelines batch of size {}",
204 0 : batched_timelines.len()
205 0 : );
206 0 : info!(
207 0 : "Timeline ids to remove: {:?}",
208 0 : batched_timelines
209 0 : .iter()
210 0 : .map(|id| id.to_string())
211 0 : .collect::<Vec<_>>()
212 0 : );
213 0 : let deleted_keys = delete_elements(
214 0 : &batched_timelines,
215 0 : s3_target,
216 0 : s3_client,
217 0 : dry_run,
218 0 : |root_target, timeline_to_delete| root_target.timeline_root(&timeline_to_delete),
219 0 : )
220 0 : .await?;
221 :
222 0 : if !dry_run {
223 0 : let mut last_err = None;
224 0 : for _ in 0..MAX_RETRIES {
225 0 : match ensure_timeline_batch_deleted(s3_client, s3_target, &batched_timelines).await {
226 : Ok(()) => {
227 0 : last_err = None;
228 0 : break;
229 : }
230 0 : Err(e) => {
231 0 : error!("Failed to ensure the timelines batch is deleted: {e}");
232 0 : last_err = Some(e);
233 : }
234 : }
235 : }
236 :
237 0 : if let Some(e) = last_err {
238 0 : anyhow::bail!(
239 0 : "Failed to ensure that timeline batch is deleted {MAX_RETRIES} times: {e:?}"
240 0 : );
241 0 : }
242 0 : }
243 0 : Ok(deleted_keys)
244 0 : }
245 :
246 0 : async fn delete_elements<I>(
247 0 : batched_ids: &Vec<I>,
248 0 : s3_target: &RootTarget,
249 0 : s3_client: &Client,
250 0 : dry_run: bool,
251 0 : target_producer: impl Fn(&RootTarget, I) -> S3Target,
252 0 : ) -> Result<BTreeMap<I, usize>, anyhow::Error>
253 0 : where
254 0 : I: Ord + PartialOrd + Copy,
255 0 : {
256 0 : let mut deleted_keys = BTreeMap::new();
257 0 : let mut object_ids_to_delete = Vec::with_capacity(MAX_ITEMS_TO_DELETE);
258 0 : for &id_to_delete in batched_ids {
259 0 : let mut continuation_token = None;
260 0 : let mut subtargets = vec![target_producer(s3_target, id_to_delete)];
261 0 : while let Some(current_target) = subtargets.pop() {
262 : loop {
263 0 : let fetch_response = list_objects_with_retries(
264 0 : s3_client,
265 0 : ¤t_target,
266 0 : continuation_token.clone(),
267 0 : )
268 0 : .await?;
269 :
270 0 : for object_id in fetch_response
271 0 : .contents()
272 0 : .unwrap_or_default()
273 0 : .iter()
274 0 : .filter_map(|object| object.key())
275 0 : .map(|key| ObjectIdentifier::builder().key(key).build())
276 : {
277 0 : if object_ids_to_delete.len() >= MAX_ITEMS_TO_DELETE {
278 0 : let object_ids_for_request = std::mem::replace(
279 0 : &mut object_ids_to_delete,
280 0 : Vec::with_capacity(MAX_ITEMS_TO_DELETE),
281 0 : );
282 0 : send_delete_request(
283 0 : s3_client,
284 0 : s3_target.bucket_name(),
285 0 : object_ids_for_request,
286 0 : dry_run,
287 0 : )
288 0 : .await
289 0 : .context("object ids deletion")?;
290 0 : }
291 :
292 0 : object_ids_to_delete.push(object_id);
293 0 : *deleted_keys.entry(id_to_delete).or_default() += 1;
294 : }
295 :
296 0 : subtargets.extend(
297 0 : fetch_response
298 0 : .common_prefixes()
299 0 : .unwrap_or_default()
300 0 : .iter()
301 0 : .filter_map(|common_prefix| common_prefix.prefix())
302 0 : .map(|prefix| {
303 0 : let mut new_target = current_target.clone();
304 0 : new_target.prefix_in_bucket = prefix.to_string();
305 0 : new_target
306 0 : }),
307 0 : );
308 0 :
309 0 : match fetch_response.next_continuation_token {
310 0 : Some(new_token) => continuation_token = Some(new_token),
311 0 : None => break,
312 : }
313 : }
314 : }
315 : }
316 0 : if !object_ids_to_delete.is_empty() {
317 0 : info!("Removing last objects of the batch");
318 0 : send_delete_request(
319 0 : s3_client,
320 0 : s3_target.bucket_name(),
321 0 : object_ids_to_delete,
322 0 : dry_run,
323 0 : )
324 0 : .await
325 0 : .context("Last object ids deletion")?;
326 0 : }
327 0 : Ok(deleted_keys)
328 0 : }
329 :
330 0 : pub async fn send_delete_request(
331 0 : s3_client: &Client,
332 0 : bucket_name: &str,
333 0 : ids: Vec<ObjectIdentifier>,
334 0 : dry_run: bool,
335 0 : ) -> anyhow::Result<()> {
336 0 : info!("Removing {} object ids from S3", ids.len());
337 0 : info!("Object ids to remove: {ids:?}");
338 0 : let delete_request = s3_client
339 0 : .delete_objects()
340 0 : .bucket(bucket_name)
341 0 : .delete(Delete::builder().set_objects(Some(ids)).build());
342 0 : if dry_run {
343 0 : info!("Dry run, skipping the actual removal");
344 0 : Ok(())
345 : } else {
346 0 : let original_request = delete_request.clone();
347 :
348 0 : for _ in 0..MAX_RETRIES {
349 0 : match delete_request
350 0 : .clone()
351 0 : .send()
352 0 : .await
353 0 : .context("delete request processing")
354 : {
355 0 : Ok(delete_response) => {
356 0 : info!("Delete response: {delete_response:?}");
357 0 : match delete_response.errors() {
358 0 : Some(delete_errors) => {
359 0 : error!("Delete request returned errors: {delete_errors:?}");
360 0 : tokio::time::sleep(Duration::from_secs(1)).await;
361 : }
362 : None => {
363 0 : info!("Successfully removed an object batch from S3");
364 0 : return Ok(());
365 : }
366 : }
367 : }
368 0 : Err(e) => {
369 0 : error!("Failed to send a delete request: {e:#}");
370 0 : tokio::time::sleep(Duration::from_secs(1)).await;
371 : }
372 : }
373 : }
374 :
375 0 : error!("Failed to do deletion, request: {original_request:?}");
376 0 : anyhow::bail!("Failed to run deletion request {MAX_RETRIES} times");
377 : }
378 0 : }
379 :
380 0 : async fn ensure_tenant_batch_deleted(
381 0 : s3_client: &Client,
382 0 : s3_target: &RootTarget,
383 0 : batch: &[TenantId],
384 0 : ) -> anyhow::Result<()> {
385 0 : let mut not_deleted_tenants = Vec::with_capacity(batch.len());
386 :
387 0 : for &tenant_id in batch {
388 0 : let fetch_response =
389 0 : list_objects_with_retries(s3_client, &s3_target.tenant_root(&tenant_id), None).await?;
390 :
391 0 : if fetch_response.is_truncated()
392 0 : || fetch_response.contents().is_some()
393 0 : || fetch_response.common_prefixes().is_some()
394 : {
395 0 : error!(
396 0 : "Tenant {tenant_id} should be deleted, but its list response is {fetch_response:?}"
397 0 : );
398 0 : not_deleted_tenants.push(tenant_id);
399 0 : }
400 : }
401 :
402 0 : anyhow::ensure!(
403 0 : not_deleted_tenants.is_empty(),
404 0 : "Failed to delete all tenants in a batch. Tenants {not_deleted_tenants:?} should be deleted."
405 : );
406 0 : Ok(())
407 0 : }
408 :
409 0 : async fn ensure_timeline_batch_deleted(
410 0 : s3_client: &Client,
411 0 : s3_target: &RootTarget,
412 0 : batch: &[TenantTimelineId],
413 0 : ) -> anyhow::Result<()> {
414 0 : let mut not_deleted_timelines = Vec::with_capacity(batch.len());
415 :
416 0 : for &id in batch {
417 0 : let fetch_response =
418 0 : list_objects_with_retries(s3_client, &s3_target.timeline_root(&id), None).await?;
419 :
420 0 : if fetch_response.is_truncated()
421 0 : || fetch_response.contents().is_some()
422 0 : || fetch_response.common_prefixes().is_some()
423 : {
424 0 : error!("Timeline {id} should be deleted, but its list response is {fetch_response:?}");
425 0 : not_deleted_timelines.push(id);
426 0 : }
427 : }
428 :
429 0 : anyhow::ensure!(
430 0 : not_deleted_timelines.is_empty(),
431 0 : "Failed to delete all timelines in a batch"
432 : );
433 0 : Ok(())
434 0 : }
|