Line data Source code
1 : //! Functionality for finding and purging garbage, as in "garbage collection".
2 : //!
3 : //! Garbage means S3 objects which are either not referenced by any metadata,
4 : //! or are referenced by a control plane tenant/timeline in a deleted state.
5 :
6 : use std::collections::{HashMap, HashSet};
7 : use std::sync::Arc;
8 : use std::time::Duration;
9 :
10 : use anyhow::Context;
11 : use futures_util::TryStreamExt;
12 : use pageserver_api::shard::TenantShardId;
13 : use remote_storage::{GenericRemoteStorage, ListingMode, ListingObject, RemotePath};
14 : use serde::{Deserialize, Serialize};
15 : use tokio_stream::StreamExt;
16 : use tokio_util::sync::CancellationToken;
17 : use utils::backoff;
18 : use utils::id::TenantId;
19 :
20 : use crate::cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData};
21 : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants_maybe_prefix};
22 : use crate::{
23 : BucketConfig, ConsoleConfig, MAX_RETRIES, NodeKind, TenantShardTimelineId, TraversingDepth,
24 : init_remote, list_objects_with_retries,
25 : };
26 :
27 0 : #[derive(Serialize, Deserialize, Debug)]
28 : enum GarbageReason {
29 : DeletedInConsole,
30 : MissingInConsole,
31 :
32 : // The remaining data relates to a known deletion issue, and we're sure that purging this
33 : // will not delete any real data, for example https://github.com/neondatabase/neon/pull/7928 where
34 : // there is nothing in a tenant path apart from a heatmap file.
35 : KnownBug,
36 : }
37 :
38 0 : #[derive(Serialize, Deserialize, Debug)]
39 : enum GarbageEntity {
40 : Tenant(TenantShardId),
41 : Timeline(TenantShardTimelineId),
42 : }
43 :
44 0 : #[derive(Serialize, Deserialize, Debug)]
45 : struct GarbageItem {
46 : entity: GarbageEntity,
47 : reason: GarbageReason,
48 : }
49 :
50 0 : #[derive(Serialize, Deserialize, Debug)]
51 : pub struct GarbageList {
52 : /// Remember what NodeKind we were finding garbage for, so that we can
53 : /// purge the list without re-stating it.
54 : node_kind: NodeKind,
55 :
56 : /// Embed the identity of the bucket, so that we do not risk executing
57 : /// the wrong list against the wrong bucket, and so that the user does not have
58 : /// to re-state the bucket details when purging.
59 : bucket_config: BucketConfig,
60 :
61 : items: Vec<GarbageItem>,
62 :
63 : /// Advisory information to enable consumers to do a validation that if we
64 : /// see garbage, we saw some active tenants too. This protects against classes of bugs
65 : /// in the scrubber that might otherwise generate a "deleted all" result.
66 : active_tenant_count: usize,
67 : active_timeline_count: usize,
68 : }
69 :
70 : impl GarbageList {
71 0 : fn new(node_kind: NodeKind, bucket_config: BucketConfig) -> Self {
72 0 : Self {
73 0 : items: Vec::new(),
74 0 : active_tenant_count: 0,
75 0 : active_timeline_count: 0,
76 0 : node_kind,
77 0 : bucket_config,
78 0 : }
79 0 : }
80 :
81 : /// If an entity has been identified as requiring purge due to a known bug, e.g.
82 : /// a particular type of object left behind after an incomplete deletion.
83 0 : fn append_buggy(&mut self, entity: GarbageEntity) {
84 0 : self.items.push(GarbageItem {
85 0 : entity,
86 0 : reason: GarbageReason::KnownBug,
87 0 : });
88 0 : }
89 :
90 : /// Return true if appended, false if not. False means the result was not garbage.
91 0 : fn maybe_append<T>(&mut self, entity: GarbageEntity, result: Option<T>) -> bool
92 0 : where
93 0 : T: MaybeDeleted,
94 0 : {
95 0 : match result {
96 0 : Some(result_item) if result_item.is_deleted() => {
97 0 : self.items.push(GarbageItem {
98 0 : entity,
99 0 : reason: GarbageReason::DeletedInConsole,
100 0 : });
101 0 : true
102 : }
103 0 : Some(_) => false,
104 : None => {
105 0 : self.items.push(GarbageItem {
106 0 : entity,
107 0 : reason: GarbageReason::MissingInConsole,
108 0 : });
109 0 : true
110 : }
111 : }
112 0 : }
113 : }
114 :
115 0 : pub async fn find_garbage(
116 0 : bucket_config: BucketConfig,
117 0 : console_config: ConsoleConfig,
118 0 : depth: TraversingDepth,
119 0 : node_kind: NodeKind,
120 0 : tenant_id_prefix: Option<String>,
121 0 : output_path: String,
122 0 : ) -> anyhow::Result<()> {
123 0 : let garbage = find_garbage_inner(
124 0 : bucket_config,
125 0 : console_config,
126 0 : depth,
127 0 : node_kind,
128 0 : tenant_id_prefix,
129 0 : )
130 0 : .await?;
131 0 : let serialized = serde_json::to_vec_pretty(&garbage)?;
132 :
133 0 : tokio::fs::write(&output_path, &serialized).await?;
134 :
135 0 : tracing::info!("Wrote garbage report to {output_path}");
136 :
137 0 : Ok(())
138 0 : }
139 :
140 : // How many concurrent S3 operations to issue (approximately): this is the concurrency
141 : // for things like listing the timelines within tenant prefixes.
142 : const S3_CONCURRENCY: usize = 32;
143 :
144 : // How many concurrent API requests to make to the console API.
145 : //
146 : // Be careful increasing this; roughly we shouldn't have more than ~100 rps. It
147 : // would be better to implement real rsp limiter.
148 : const CONSOLE_CONCURRENCY: usize = 16;
149 :
150 : struct ConsoleCache {
151 : /// Set of tenants found in the control plane API
152 : projects: HashMap<TenantId, ProjectData>,
153 : /// Set of tenants for which the control plane API returned 404
154 : not_found: HashSet<TenantId>,
155 : }
156 :
157 0 : async fn find_garbage_inner(
158 0 : bucket_config: BucketConfig,
159 0 : console_config: ConsoleConfig,
160 0 : depth: TraversingDepth,
161 0 : node_kind: NodeKind,
162 0 : tenant_id_prefix: Option<String>,
163 0 : ) -> anyhow::Result<GarbageList> {
164 : // Construct clients for S3 and for Console API
165 0 : let (remote_client, target) = init_remote(bucket_config.clone(), node_kind).await?;
166 0 : let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config));
167 :
168 : // Build a set of console-known tenants, for quickly eliminating known-active tenants without having
169 : // to issue O(N) console API requests.
170 0 : let console_projects: HashMap<TenantId, ProjectData> = cloud_admin_api_client
171 0 : .list_projects()
172 0 : .await?
173 0 : .into_iter()
174 0 : .map(|t| (t.tenant, t))
175 0 : .collect();
176 0 : tracing::info!(
177 0 : "Loaded {} console projects tenant IDs",
178 0 : console_projects.len()
179 : );
180 :
181 : // Because many tenant shards may look up the same TenantId, we maintain a cache.
182 0 : let console_cache = Arc::new(std::sync::Mutex::new(ConsoleCache {
183 0 : projects: console_projects,
184 0 : not_found: HashSet::new(),
185 0 : }));
186 0 :
187 0 : // Enumerate Tenants in S3, and check if each one exists in Console
188 0 : tracing::info!("Finding all tenants in {}...", bucket_config.desc_str());
189 0 : let tenants = stream_tenants_maybe_prefix(&remote_client, &target, tenant_id_prefix);
190 0 : let tenants_checked = tenants.map_ok(|t| {
191 0 : let api_client = cloud_admin_api_client.clone();
192 0 : let console_cache = console_cache.clone();
193 0 : async move {
194 : // Check cache before issuing API call
195 0 : let project_data = {
196 0 : let cache = console_cache.lock().unwrap();
197 0 : let result = cache.projects.get(&t.tenant_id).cloned();
198 0 : if result.is_none() && cache.not_found.contains(&t.tenant_id) {
199 0 : return Ok((t, None));
200 0 : }
201 0 : result
202 0 : };
203 0 :
204 0 : match project_data {
205 0 : Some(project_data) => Ok((t, Some(project_data.clone()))),
206 : None => {
207 0 : let project_data = api_client
208 0 : .find_tenant_project(t.tenant_id)
209 0 : .await
210 0 : .map_err(|e| anyhow::anyhow!(e));
211 0 :
212 0 : // Populate cache with result of API call
213 0 : {
214 0 : let mut cache = console_cache.lock().unwrap();
215 0 : if let Ok(Some(project_data)) = &project_data {
216 0 : cache.projects.insert(t.tenant_id, project_data.clone());
217 0 : } else if let Ok(None) = &project_data {
218 0 : cache.not_found.insert(t.tenant_id);
219 0 : }
220 : }
221 :
222 0 : project_data.map(|r| (t, r))
223 : }
224 : }
225 0 : }
226 0 : });
227 0 : let mut tenants_checked =
228 0 : std::pin::pin!(tenants_checked.try_buffer_unordered(CONSOLE_CONCURRENCY));
229 0 :
230 0 : // Process the results of Tenant checks. If a Tenant is garbage, it goes into
231 0 : // the `GarbageList`. Else it goes into `active_tenants` for more detailed timeline
232 0 : // checks if they are enabled by the `depth` parameter.
233 0 : let mut garbage = GarbageList::new(node_kind, bucket_config);
234 0 : let mut active_tenants: Vec<TenantShardId> = vec![];
235 0 : let mut counter = 0;
236 0 : while let Some(result) = tenants_checked.next().await {
237 0 : let (tenant_shard_id, console_result) = result?;
238 :
239 : // Paranoia check
240 0 : if let Some(project) = &console_result {
241 0 : assert!(project.tenant == tenant_shard_id.tenant_id);
242 0 : }
243 :
244 : // Special case: If it's missing in console, check for known bugs that would enable us to conclusively
245 : // identify it as purge-able anyway
246 0 : if console_result.is_none() {
247 0 : let timelines = stream_tenant_timelines(&remote_client, &target, tenant_shard_id)
248 0 : .await?
249 0 : .collect::<Vec<_>>()
250 0 : .await;
251 0 : if timelines.is_empty() {
252 : // No timelines, but a heatmap: the deletion bug where we deleted everything but heatmaps
253 0 : let tenant_objects = list_objects_with_retries(
254 0 : &remote_client,
255 0 : ListingMode::WithDelimiter,
256 0 : &target.tenant_root(&tenant_shard_id),
257 0 : )
258 0 : .await?;
259 0 : if let Some(object) = tenant_objects.keys.first() {
260 0 : if object.key.get_path().as_str().ends_with("heatmap-v1.json") {
261 0 : tracing::info!(
262 0 : "Tenant {tenant_shard_id}: is missing in console and is only a heatmap (known historic deletion bug)"
263 : );
264 0 : garbage.append_buggy(GarbageEntity::Tenant(tenant_shard_id));
265 0 : continue;
266 : } else {
267 0 : tracing::info!(
268 0 : "Tenant {tenant_shard_id} is missing in console and contains one object: {}",
269 : object.key
270 : );
271 : }
272 : } else {
273 0 : tracing::info!(
274 0 : "Tenant {tenant_shard_id} is missing in console appears to have been deleted while we ran"
275 : );
276 : }
277 : } else {
278 : // A console-unknown tenant with timelines: check if these timelines only contain initdb.tar.zst, from the initial
279 : // rollout of WAL DR in which we never deleted these.
280 0 : let mut any_non_initdb = false;
281 :
282 0 : for timeline_r in timelines {
283 0 : let timeline = timeline_r?;
284 0 : let timeline_objects = list_objects_with_retries(
285 0 : &remote_client,
286 0 : ListingMode::WithDelimiter,
287 0 : &target.timeline_root(&timeline),
288 0 : )
289 0 : .await?;
290 0 : if !timeline_objects.prefixes.is_empty() {
291 0 : // Sub-paths? Unexpected
292 0 : any_non_initdb = true;
293 0 : } else {
294 0 : let object = timeline_objects.keys.first().unwrap();
295 0 : if object.key.get_path().as_str().ends_with("initdb.tar.zst") {
296 0 : tracing::info!("Timeline {timeline} contains only initdb.tar.zst");
297 0 : } else {
298 0 : any_non_initdb = true;
299 0 : }
300 : }
301 : }
302 :
303 0 : if any_non_initdb {
304 0 : tracing::info!(
305 0 : "Tenant {tenant_shard_id}: is missing in console and contains timelines, one or more of which are more than just initdb"
306 : );
307 : } else {
308 0 : tracing::info!(
309 0 : "Tenant {tenant_shard_id}: is missing in console and contains only timelines that only contain initdb"
310 : );
311 0 : garbage.append_buggy(GarbageEntity::Tenant(tenant_shard_id));
312 0 : continue;
313 : }
314 : }
315 0 : }
316 :
317 0 : if garbage.maybe_append(GarbageEntity::Tenant(tenant_shard_id), console_result) {
318 0 : tracing::debug!("Tenant {tenant_shard_id} is garbage");
319 : } else {
320 0 : tracing::debug!("Tenant {tenant_shard_id} is active");
321 0 : active_tenants.push(tenant_shard_id);
322 0 : garbage.active_tenant_count = active_tenants.len();
323 : }
324 :
325 0 : counter += 1;
326 0 : if counter % 1000 == 0 {
327 0 : tracing::info!(
328 0 : "Progress: {counter} tenants checked, {} active, {} garbage",
329 0 : active_tenants.len(),
330 0 : garbage.items.len()
331 : );
332 0 : }
333 : }
334 :
335 0 : tracing::info!(
336 0 : "Found {}/{} garbage tenants",
337 0 : garbage.items.len(),
338 0 : garbage.items.len() + active_tenants.len()
339 : );
340 :
341 : // If we are only checking tenant-deep, we are done. Otherwise we must
342 : // proceed to check the individual timelines of the active tenants.
343 0 : if depth == TraversingDepth::Tenant {
344 0 : return Ok(garbage);
345 0 : }
346 0 :
347 0 : tracing::info!(
348 0 : "Checking timelines for {} active tenants",
349 0 : active_tenants.len(),
350 : );
351 :
352 : // Construct a stream of all timelines within active tenants
353 0 : let active_tenants = tokio_stream::iter(active_tenants.iter().map(Ok));
354 0 : let timelines = active_tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, *t));
355 0 : let timelines = timelines.try_buffer_unordered(S3_CONCURRENCY);
356 0 : let timelines = timelines.try_flatten();
357 0 :
358 0 : // For all timelines within active tenants, call into console API to check their existence
359 0 : let timelines_checked = timelines.map_ok(|ttid| {
360 0 : let api_client = cloud_admin_api_client.clone();
361 0 : async move {
362 0 : api_client
363 0 : .find_timeline_branch(ttid.tenant_shard_id.tenant_id, ttid.timeline_id)
364 0 : .await
365 0 : .map_err(|e| anyhow::anyhow!(e))
366 0 : .map(|r| (ttid, r))
367 0 : }
368 0 : });
369 0 : let mut timelines_checked =
370 0 : std::pin::pin!(timelines_checked.try_buffer_unordered(CONSOLE_CONCURRENCY));
371 0 :
372 0 : // Update the GarbageList with any timelines which appear not to exist.
373 0 : let mut active_timelines: Vec<TenantShardTimelineId> = vec![];
374 0 : while let Some(result) = timelines_checked.next().await {
375 0 : let (ttid, console_result) = result?;
376 0 : if garbage.maybe_append(GarbageEntity::Timeline(ttid), console_result) {
377 0 : tracing::debug!("Timeline {ttid} is garbage");
378 : } else {
379 0 : tracing::debug!("Timeline {ttid} is active");
380 0 : active_timelines.push(ttid);
381 0 : garbage.active_timeline_count = active_timelines.len();
382 : }
383 : }
384 :
385 0 : let num_garbage_timelines = garbage
386 0 : .items
387 0 : .iter()
388 0 : .filter(|g| matches!(g.entity, GarbageEntity::Timeline(_)))
389 0 : .count();
390 0 : tracing::info!(
391 0 : "Found {}/{} garbage timelines in active tenants",
392 0 : num_garbage_timelines,
393 0 : active_timelines.len(),
394 : );
395 :
396 0 : Ok(garbage)
397 0 : }
398 :
399 : #[derive(clap::ValueEnum, Debug, Clone)]
400 : pub enum PurgeMode {
401 : /// The safest mode: only delete tenants that were explicitly reported as deleted
402 : /// by Console API.
403 : DeletedOnly,
404 :
405 : /// Delete all garbage tenants, including those which are only presumed to be deleted,
406 : /// because the Console API could not find them.
407 : DeletedAndMissing,
408 : }
409 :
410 : impl std::fmt::Display for PurgeMode {
411 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
412 0 : match self {
413 0 : PurgeMode::DeletedOnly => write!(f, "deleted-only"),
414 0 : PurgeMode::DeletedAndMissing => write!(f, "deleted-and-missing"),
415 : }
416 0 : }
417 : }
418 :
419 0 : pub async fn get_tenant_objects(
420 0 : s3_client: &GenericRemoteStorage,
421 0 : tenant_shard_id: TenantShardId,
422 0 : ) -> anyhow::Result<Vec<ListingObject>> {
423 0 : tracing::debug!("Listing objects in tenant {tenant_shard_id}");
424 0 : let tenant_root = super::remote_tenant_path(&tenant_shard_id);
425 0 :
426 0 : // TODO: apply extra validation based on object modification time. Don't purge
427 0 : // tenants where any timeline's index_part.json has been touched recently.
428 0 :
429 0 : let cancel = CancellationToken::new();
430 0 : let list = backoff::retry(
431 0 : || s3_client.list(Some(&tenant_root), ListingMode::NoDelimiter, None, &cancel),
432 0 : |_| false,
433 0 : 3,
434 0 : MAX_RETRIES as u32,
435 0 : "get_tenant_objects",
436 0 : &cancel,
437 0 : )
438 0 : .await
439 0 : .expect("dummy cancellation token")?;
440 0 : Ok(list.keys)
441 0 : }
442 :
443 0 : pub async fn get_timeline_objects(
444 0 : s3_client: &GenericRemoteStorage,
445 0 : ttid: TenantShardTimelineId,
446 0 : ) -> anyhow::Result<Vec<ListingObject>> {
447 0 : tracing::debug!("Listing objects in timeline {ttid}");
448 0 : let timeline_root = super::remote_timeline_path_id(&ttid);
449 0 :
450 0 : let cancel = CancellationToken::new();
451 0 : let list = backoff::retry(
452 0 : || {
453 0 : s3_client.list(
454 0 : Some(&timeline_root),
455 0 : ListingMode::NoDelimiter,
456 0 : None,
457 0 : &cancel,
458 0 : )
459 0 : },
460 0 : |_| false,
461 0 : 3,
462 0 : MAX_RETRIES as u32,
463 0 : "get_timeline_objects",
464 0 : &cancel,
465 0 : )
466 0 : .await
467 0 : .expect("dummy cancellation token")?;
468 :
469 0 : Ok(list.keys)
470 0 : }
471 :
472 : /// Drain a buffer of keys into DeleteObjects requests
473 : ///
474 : /// If `drain` is true, drains keys completely; otherwise stops when <
475 : /// `max_keys_per_delete`` keys are left.
476 : /// `num_deleted` returns number of deleted keys.
477 0 : async fn do_delete(
478 0 : remote_client: &GenericRemoteStorage,
479 0 : keys: &mut Vec<ListingObject>,
480 0 : dry_run: bool,
481 0 : drain: bool,
482 0 : progress_tracker: &mut DeletionProgressTracker,
483 0 : ) -> anyhow::Result<()> {
484 0 : let cancel = CancellationToken::new();
485 0 : let max_keys_per_delete = remote_client.max_keys_per_delete();
486 0 : while (!keys.is_empty() && drain) || (keys.len() >= max_keys_per_delete) {
487 0 : let request_keys =
488 0 : keys.split_off(keys.len() - (std::cmp::min(max_keys_per_delete, keys.len())));
489 0 :
490 0 : let request_keys: Vec<RemotePath> = request_keys.into_iter().map(|o| o.key).collect();
491 0 :
492 0 : let num_deleted = request_keys.len();
493 0 : if dry_run {
494 0 : tracing::info!("Dry-run deletion of objects: ");
495 0 : for k in request_keys {
496 0 : tracing::info!(" {k:?}");
497 : }
498 : } else {
499 0 : remote_client
500 0 : .delete_objects(&request_keys, &cancel)
501 0 : .await
502 0 : .context("deletetion request")?;
503 0 : progress_tracker.register(num_deleted);
504 : }
505 : }
506 :
507 0 : Ok(())
508 0 : }
509 :
510 : /// Simple tracker reporting each 10k deleted keys.
511 : #[derive(Default)]
512 : struct DeletionProgressTracker {
513 : num_deleted: usize,
514 : last_reported_num_deleted: usize,
515 : }
516 :
517 : impl DeletionProgressTracker {
518 0 : fn register(&mut self, n: usize) {
519 0 : self.num_deleted += n;
520 0 : if self.num_deleted - self.last_reported_num_deleted > 10000 {
521 0 : tracing::info!("progress: deleted {} keys", self.num_deleted);
522 0 : self.last_reported_num_deleted = self.num_deleted;
523 0 : }
524 0 : }
525 : }
526 :
527 0 : pub async fn purge_garbage(
528 0 : input_path: String,
529 0 : mode: PurgeMode,
530 0 : min_age: Duration,
531 0 : dry_run: bool,
532 0 : ) -> anyhow::Result<()> {
533 0 : let list_bytes = tokio::fs::read(&input_path).await?;
534 0 : let garbage_list = serde_json::from_slice::<GarbageList>(&list_bytes)?;
535 0 : tracing::info!(
536 0 : "Loaded {} items in garbage list from {}",
537 0 : garbage_list.items.len(),
538 : input_path
539 : );
540 :
541 0 : let (remote_client, _target) =
542 0 : init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?;
543 :
544 0 : assert_eq!(
545 0 : garbage_list.bucket_config.bucket_name().unwrap(),
546 0 : remote_client.bucket_name().unwrap()
547 0 : );
548 :
549 : // Sanity checks on the incoming list
550 0 : if garbage_list.active_tenant_count == 0 {
551 0 : anyhow::bail!("Refusing to purge a garbage list that reports 0 active tenants");
552 0 : }
553 0 : if garbage_list
554 0 : .items
555 0 : .iter()
556 0 : .any(|g| matches!(g.entity, GarbageEntity::Timeline(_)))
557 0 : && garbage_list.active_timeline_count == 0
558 : {
559 0 : anyhow::bail!(
560 0 : "Refusing to purge a garbage list containing garbage timelines that reports 0 active timelines"
561 0 : );
562 0 : }
563 0 :
564 0 : let filtered_items = garbage_list
565 0 : .items
566 0 : .iter()
567 0 : .filter(|i| match (&mode, &i.reason) {
568 0 : (PurgeMode::DeletedAndMissing, _) => true,
569 0 : (PurgeMode::DeletedOnly, GarbageReason::DeletedInConsole) => true,
570 0 : (PurgeMode::DeletedOnly, GarbageReason::KnownBug) => true,
571 0 : (PurgeMode::DeletedOnly, GarbageReason::MissingInConsole) => false,
572 0 : });
573 0 :
574 0 : tracing::info!(
575 0 : "Filtered down to {} garbage items based on mode {}",
576 0 : garbage_list.items.len(),
577 : mode
578 : );
579 :
580 0 : let items = tokio_stream::iter(filtered_items.map(Ok));
581 0 : let get_objects_results = items.map_ok(|i| {
582 0 : let remote_client = remote_client.clone();
583 0 : async move {
584 0 : match i.entity {
585 0 : GarbageEntity::Tenant(tenant_id) => {
586 0 : get_tenant_objects(&remote_client, tenant_id).await
587 : }
588 0 : GarbageEntity::Timeline(ttid) => get_timeline_objects(&remote_client, ttid).await,
589 : }
590 0 : }
591 0 : });
592 0 : let mut get_objects_results =
593 0 : std::pin::pin!(get_objects_results.try_buffer_unordered(S3_CONCURRENCY));
594 0 :
595 0 : let mut objects_to_delete = Vec::new();
596 0 : let mut progress_tracker = DeletionProgressTracker::default();
597 0 : while let Some(result) = get_objects_results.next().await {
598 0 : let mut object_list = result?;
599 :
600 : // Extra safety check: even if a collection of objects is garbage, check max() of modification
601 : // times before purging, so that if we incorrectly marked a live tenant as garbage then we would
602 : // notice that its index has been written recently and would omit deleting it.
603 0 : if object_list.is_empty() {
604 : // Simplify subsequent code by ensuring list always has at least one item
605 : // Usually, this only occurs if there is parallel deletions racing us, as there is no empty prefixes
606 0 : continue;
607 0 : }
608 0 : let max_mtime = object_list.iter().map(|o| o.last_modified).max().unwrap();
609 0 : let age = max_mtime.elapsed();
610 0 : match age {
611 : Err(_) => {
612 0 : tracing::warn!("Bad last_modified time");
613 0 : continue;
614 : }
615 0 : Ok(a) if a < min_age => {
616 0 : // Failed age check. This doesn't mean we did something wrong: a tenant might really be garbage and recently
617 0 : // written, but out of an abundance of caution we still don't purge it.
618 0 : tracing::info!(
619 0 : "Skipping tenant with young objects {}..{}",
620 0 : object_list.first().as_ref().unwrap().key,
621 0 : object_list.last().as_ref().unwrap().key
622 : );
623 0 : continue;
624 : }
625 0 : Ok(_) => {
626 0 : // Passed age check
627 0 : }
628 0 : }
629 0 :
630 0 : objects_to_delete.append(&mut object_list);
631 0 : if objects_to_delete.len() >= remote_client.max_keys_per_delete() {
632 0 : do_delete(
633 0 : &remote_client,
634 0 : &mut objects_to_delete,
635 0 : dry_run,
636 0 : false,
637 0 : &mut progress_tracker,
638 0 : )
639 0 : .await?;
640 0 : }
641 : }
642 :
643 0 : do_delete(
644 0 : &remote_client,
645 0 : &mut objects_to_delete,
646 0 : dry_run,
647 0 : true,
648 0 : &mut progress_tracker,
649 0 : )
650 0 : .await?;
651 :
652 0 : tracing::info!("{} keys deleted in total", progress_tracker.num_deleted);
653 :
654 0 : Ok(())
655 0 : }
|