LCOV - code coverage report
Current view: top level - storage_scrubber/src - garbage.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 0.0 % 432 0
Test Date: 2025-03-12 00:01:28 Functions: 0.0 % 70 0

            Line data    Source code
       1              : //! Functionality for finding and purging garbage, as in "garbage collection".
       2              : //!
       3              : //! Garbage means S3 objects which are either not referenced by any metadata,
       4              : //! or are referenced by a control plane tenant/timeline in a deleted state.
       5              : 
       6              : use std::collections::{HashMap, HashSet};
       7              : use std::sync::Arc;
       8              : use std::time::Duration;
       9              : 
      10              : use anyhow::Context;
      11              : use futures_util::TryStreamExt;
      12              : use pageserver_api::shard::TenantShardId;
      13              : use remote_storage::{GenericRemoteStorage, ListingMode, ListingObject, RemotePath};
      14              : use serde::{Deserialize, Serialize};
      15              : use tokio_stream::StreamExt;
      16              : use tokio_util::sync::CancellationToken;
      17              : use utils::backoff;
      18              : use utils::id::TenantId;
      19              : 
      20              : use crate::cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData};
      21              : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants_maybe_prefix};
      22              : use crate::{
      23              :     BucketConfig, ConsoleConfig, MAX_RETRIES, NodeKind, TenantShardTimelineId, TraversingDepth,
      24              :     init_remote, list_objects_with_retries,
      25              : };
      26              : 
      27            0 : #[derive(Serialize, Deserialize, Debug)]
      28              : enum GarbageReason {
      29              :     DeletedInConsole,
      30              :     MissingInConsole,
      31              : 
      32              :     // The remaining data relates to a known deletion issue, and we're sure that purging this
      33              :     // will not delete any real data, for example https://github.com/neondatabase/neon/pull/7928 where
      34              :     // there is nothing in a tenant path apart from a heatmap file.
      35              :     KnownBug,
      36              : }
      37              : 
      38            0 : #[derive(Serialize, Deserialize, Debug)]
      39              : enum GarbageEntity {
      40              :     Tenant(TenantShardId),
      41              :     Timeline(TenantShardTimelineId),
      42              : }
      43              : 
      44            0 : #[derive(Serialize, Deserialize, Debug)]
      45              : struct GarbageItem {
      46              :     entity: GarbageEntity,
      47              :     reason: GarbageReason,
      48              : }
      49              : 
      50            0 : #[derive(Serialize, Deserialize, Debug)]
      51              : pub struct GarbageList {
      52              :     /// Remember what NodeKind we were finding garbage for, so that we can
      53              :     /// purge the list without re-stating it.
      54              :     node_kind: NodeKind,
      55              : 
      56              :     /// Embed the identity of the bucket, so that we do not risk executing
      57              :     /// the wrong list against the wrong bucket, and so that the user does not have
      58              :     /// to re-state the bucket details when purging.
      59              :     bucket_config: BucketConfig,
      60              : 
      61              :     items: Vec<GarbageItem>,
      62              : 
      63              :     /// Advisory information to enable consumers to do a validation that if we
      64              :     /// see garbage, we saw some active tenants too.  This protects against classes of bugs
      65              :     /// in the scrubber that might otherwise generate a "deleted all" result.
      66              :     active_tenant_count: usize,
      67              :     active_timeline_count: usize,
      68              : }
      69              : 
      70              : impl GarbageList {
      71            0 :     fn new(node_kind: NodeKind, bucket_config: BucketConfig) -> Self {
      72            0 :         Self {
      73            0 :             items: Vec::new(),
      74            0 :             active_tenant_count: 0,
      75            0 :             active_timeline_count: 0,
      76            0 :             node_kind,
      77            0 :             bucket_config,
      78            0 :         }
      79            0 :     }
      80              : 
      81              :     /// If an entity has been identified as requiring purge due to a known bug, e.g.
      82              :     /// a particular type of object left behind after an incomplete deletion.
      83            0 :     fn append_buggy(&mut self, entity: GarbageEntity) {
      84            0 :         self.items.push(GarbageItem {
      85            0 :             entity,
      86            0 :             reason: GarbageReason::KnownBug,
      87            0 :         });
      88            0 :     }
      89              : 
      90              :     /// Return true if appended, false if not.  False means the result was not garbage.
      91            0 :     fn maybe_append<T>(&mut self, entity: GarbageEntity, result: Option<T>) -> bool
      92            0 :     where
      93            0 :         T: MaybeDeleted,
      94            0 :     {
      95            0 :         match result {
      96            0 :             Some(result_item) if result_item.is_deleted() => {
      97            0 :                 self.items.push(GarbageItem {
      98            0 :                     entity,
      99            0 :                     reason: GarbageReason::DeletedInConsole,
     100            0 :                 });
     101            0 :                 true
     102              :             }
     103            0 :             Some(_) => false,
     104              :             None => {
     105            0 :                 self.items.push(GarbageItem {
     106            0 :                     entity,
     107            0 :                     reason: GarbageReason::MissingInConsole,
     108            0 :                 });
     109            0 :                 true
     110              :             }
     111              :         }
     112            0 :     }
     113              : }
     114              : 
     115            0 : pub async fn find_garbage(
     116            0 :     bucket_config: BucketConfig,
     117            0 :     console_config: ConsoleConfig,
     118            0 :     depth: TraversingDepth,
     119            0 :     node_kind: NodeKind,
     120            0 :     tenant_id_prefix: Option<String>,
     121            0 :     output_path: String,
     122            0 : ) -> anyhow::Result<()> {
     123            0 :     let garbage = find_garbage_inner(
     124            0 :         bucket_config,
     125            0 :         console_config,
     126            0 :         depth,
     127            0 :         node_kind,
     128            0 :         tenant_id_prefix,
     129            0 :     )
     130            0 :     .await?;
     131            0 :     let serialized = serde_json::to_vec_pretty(&garbage)?;
     132              : 
     133            0 :     tokio::fs::write(&output_path, &serialized).await?;
     134              : 
     135            0 :     tracing::info!("Wrote garbage report to {output_path}");
     136              : 
     137            0 :     Ok(())
     138            0 : }
     139              : 
     140              : // How many concurrent S3 operations to issue (approximately): this is the concurrency
     141              : // for things like listing the timelines within tenant prefixes.
     142              : const S3_CONCURRENCY: usize = 32;
     143              : 
     144              : // How many concurrent API requests to make to the console API.
     145              : //
     146              : // Be careful increasing this; roughly we shouldn't have more than ~100 rps. It
     147              : // would be better to implement real rsp limiter.
     148              : const CONSOLE_CONCURRENCY: usize = 16;
     149              : 
     150              : struct ConsoleCache {
     151              :     /// Set of tenants found in the control plane API
     152              :     projects: HashMap<TenantId, ProjectData>,
     153              :     /// Set of tenants for which the control plane API returned 404
     154              :     not_found: HashSet<TenantId>,
     155              : }
     156              : 
     157            0 : async fn find_garbage_inner(
     158            0 :     bucket_config: BucketConfig,
     159            0 :     console_config: ConsoleConfig,
     160            0 :     depth: TraversingDepth,
     161            0 :     node_kind: NodeKind,
     162            0 :     tenant_id_prefix: Option<String>,
     163            0 : ) -> anyhow::Result<GarbageList> {
     164              :     // Construct clients for S3 and for Console API
     165            0 :     let (remote_client, target) = init_remote(bucket_config.clone(), node_kind).await?;
     166            0 :     let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config));
     167              : 
     168              :     // Build a set of console-known tenants, for quickly eliminating known-active tenants without having
     169              :     // to issue O(N) console API requests.
     170            0 :     let console_projects: HashMap<TenantId, ProjectData> = cloud_admin_api_client
     171            0 :         .list_projects()
     172            0 :         .await?
     173            0 :         .into_iter()
     174            0 :         .map(|t| (t.tenant, t))
     175            0 :         .collect();
     176            0 :     tracing::info!(
     177            0 :         "Loaded {} console projects tenant IDs",
     178            0 :         console_projects.len()
     179              :     );
     180              : 
     181              :     // Because many tenant shards may look up the same TenantId, we maintain a cache.
     182            0 :     let console_cache = Arc::new(std::sync::Mutex::new(ConsoleCache {
     183            0 :         projects: console_projects,
     184            0 :         not_found: HashSet::new(),
     185            0 :     }));
     186            0 : 
     187            0 :     // Enumerate Tenants in S3, and check if each one exists in Console
     188            0 :     tracing::info!("Finding all tenants in {}...", bucket_config.desc_str());
     189            0 :     let tenants = stream_tenants_maybe_prefix(&remote_client, &target, tenant_id_prefix);
     190            0 :     let tenants_checked = tenants.map_ok(|t| {
     191            0 :         let api_client = cloud_admin_api_client.clone();
     192            0 :         let console_cache = console_cache.clone();
     193            0 :         async move {
     194              :             // Check cache before issuing API call
     195            0 :             let project_data = {
     196            0 :                 let cache = console_cache.lock().unwrap();
     197            0 :                 let result = cache.projects.get(&t.tenant_id).cloned();
     198            0 :                 if result.is_none() && cache.not_found.contains(&t.tenant_id) {
     199            0 :                     return Ok((t, None));
     200            0 :                 }
     201            0 :                 result
     202            0 :             };
     203            0 : 
     204            0 :             match project_data {
     205            0 :                 Some(project_data) => Ok((t, Some(project_data.clone()))),
     206              :                 None => {
     207            0 :                     let project_data = api_client
     208            0 :                         .find_tenant_project(t.tenant_id)
     209            0 :                         .await
     210            0 :                         .map_err(|e| anyhow::anyhow!(e));
     211            0 : 
     212            0 :                     // Populate cache with result of API call
     213            0 :                     {
     214            0 :                         let mut cache = console_cache.lock().unwrap();
     215            0 :                         if let Ok(Some(project_data)) = &project_data {
     216            0 :                             cache.projects.insert(t.tenant_id, project_data.clone());
     217            0 :                         } else if let Ok(None) = &project_data {
     218            0 :                             cache.not_found.insert(t.tenant_id);
     219            0 :                         }
     220              :                     }
     221              : 
     222            0 :                     project_data.map(|r| (t, r))
     223              :                 }
     224              :             }
     225            0 :         }
     226            0 :     });
     227            0 :     let mut tenants_checked =
     228            0 :         std::pin::pin!(tenants_checked.try_buffer_unordered(CONSOLE_CONCURRENCY));
     229            0 : 
     230            0 :     // Process the results of Tenant checks.  If a Tenant is garbage, it goes into
     231            0 :     // the `GarbageList`.  Else it goes into `active_tenants` for more detailed timeline
     232            0 :     // checks if they are enabled by the `depth` parameter.
     233            0 :     let mut garbage = GarbageList::new(node_kind, bucket_config);
     234            0 :     let mut active_tenants: Vec<TenantShardId> = vec![];
     235            0 :     let mut counter = 0;
     236            0 :     while let Some(result) = tenants_checked.next().await {
     237            0 :         let (tenant_shard_id, console_result) = result?;
     238              : 
     239              :         // Paranoia check
     240            0 :         if let Some(project) = &console_result {
     241            0 :             assert!(project.tenant == tenant_shard_id.tenant_id);
     242            0 :         }
     243              : 
     244              :         // Special case: If it's missing in console, check for known bugs that would enable us to conclusively
     245              :         // identify it as purge-able anyway
     246            0 :         if console_result.is_none() {
     247            0 :             let timelines = stream_tenant_timelines(&remote_client, &target, tenant_shard_id)
     248            0 :                 .await?
     249            0 :                 .collect::<Vec<_>>()
     250            0 :                 .await;
     251            0 :             if timelines.is_empty() {
     252              :                 // No timelines, but a heatmap: the deletion bug where we deleted everything but heatmaps
     253            0 :                 let tenant_objects = list_objects_with_retries(
     254            0 :                     &remote_client,
     255            0 :                     ListingMode::WithDelimiter,
     256            0 :                     &target.tenant_root(&tenant_shard_id),
     257            0 :                 )
     258            0 :                 .await?;
     259            0 :                 if let Some(object) = tenant_objects.keys.first() {
     260            0 :                     if object.key.get_path().as_str().ends_with("heatmap-v1.json") {
     261            0 :                         tracing::info!(
     262            0 :                             "Tenant {tenant_shard_id}: is missing in console and is only a heatmap (known historic deletion bug)"
     263              :                         );
     264            0 :                         garbage.append_buggy(GarbageEntity::Tenant(tenant_shard_id));
     265            0 :                         continue;
     266              :                     } else {
     267            0 :                         tracing::info!(
     268            0 :                             "Tenant {tenant_shard_id} is missing in console and contains one object: {}",
     269              :                             object.key
     270              :                         );
     271              :                     }
     272              :                 } else {
     273            0 :                     tracing::info!(
     274            0 :                         "Tenant {tenant_shard_id} is missing in console appears to have been deleted while we ran"
     275              :                     );
     276              :                 }
     277              :             } else {
     278              :                 // A console-unknown tenant with timelines: check if these timelines only contain initdb.tar.zst, from the initial
     279              :                 // rollout of WAL DR in which we never deleted these.
     280            0 :                 let mut any_non_initdb = false;
     281              : 
     282            0 :                 for timeline_r in timelines {
     283            0 :                     let timeline = timeline_r?;
     284            0 :                     let timeline_objects = list_objects_with_retries(
     285            0 :                         &remote_client,
     286            0 :                         ListingMode::WithDelimiter,
     287            0 :                         &target.timeline_root(&timeline),
     288            0 :                     )
     289            0 :                     .await?;
     290            0 :                     if !timeline_objects.prefixes.is_empty() {
     291            0 :                         // Sub-paths?  Unexpected
     292            0 :                         any_non_initdb = true;
     293            0 :                     } else {
     294            0 :                         let object = timeline_objects.keys.first().unwrap();
     295            0 :                         if object.key.get_path().as_str().ends_with("initdb.tar.zst") {
     296            0 :                             tracing::info!("Timeline {timeline} contains only initdb.tar.zst");
     297            0 :                         } else {
     298            0 :                             any_non_initdb = true;
     299            0 :                         }
     300              :                     }
     301              :                 }
     302              : 
     303            0 :                 if any_non_initdb {
     304            0 :                     tracing::info!(
     305            0 :                         "Tenant {tenant_shard_id}: is missing in console and contains timelines, one or more of which are more than just initdb"
     306              :                     );
     307              :                 } else {
     308            0 :                     tracing::info!(
     309            0 :                         "Tenant {tenant_shard_id}: is missing in console and contains only timelines that only contain initdb"
     310              :                     );
     311            0 :                     garbage.append_buggy(GarbageEntity::Tenant(tenant_shard_id));
     312            0 :                     continue;
     313              :                 }
     314              :             }
     315            0 :         }
     316              : 
     317            0 :         if garbage.maybe_append(GarbageEntity::Tenant(tenant_shard_id), console_result) {
     318            0 :             tracing::debug!("Tenant {tenant_shard_id} is garbage");
     319              :         } else {
     320            0 :             tracing::debug!("Tenant {tenant_shard_id} is active");
     321            0 :             active_tenants.push(tenant_shard_id);
     322            0 :             garbage.active_tenant_count = active_tenants.len();
     323              :         }
     324              : 
     325            0 :         counter += 1;
     326            0 :         if counter % 1000 == 0 {
     327            0 :             tracing::info!(
     328            0 :                 "Progress: {counter} tenants checked, {} active, {} garbage",
     329            0 :                 active_tenants.len(),
     330            0 :                 garbage.items.len()
     331              :             );
     332            0 :         }
     333              :     }
     334              : 
     335            0 :     tracing::info!(
     336            0 :         "Found {}/{} garbage tenants",
     337            0 :         garbage.items.len(),
     338            0 :         garbage.items.len() + active_tenants.len()
     339              :     );
     340              : 
     341              :     // If we are only checking tenant-deep, we are done.  Otherwise we must
     342              :     // proceed to check the individual timelines of the active tenants.
     343            0 :     if depth == TraversingDepth::Tenant {
     344            0 :         return Ok(garbage);
     345            0 :     }
     346            0 : 
     347            0 :     tracing::info!(
     348            0 :         "Checking timelines for {} active tenants",
     349            0 :         active_tenants.len(),
     350              :     );
     351              : 
     352              :     // Construct a stream of all timelines within active tenants
     353            0 :     let active_tenants = tokio_stream::iter(active_tenants.iter().map(Ok));
     354            0 :     let timelines = active_tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, *t));
     355            0 :     let timelines = timelines.try_buffer_unordered(S3_CONCURRENCY);
     356            0 :     let timelines = timelines.try_flatten();
     357            0 : 
     358            0 :     // For all timelines within active tenants, call into console API to check their existence
     359            0 :     let timelines_checked = timelines.map_ok(|ttid| {
     360            0 :         let api_client = cloud_admin_api_client.clone();
     361            0 :         async move {
     362            0 :             api_client
     363            0 :                 .find_timeline_branch(ttid.tenant_shard_id.tenant_id, ttid.timeline_id)
     364            0 :                 .await
     365            0 :                 .map_err(|e| anyhow::anyhow!(e))
     366            0 :                 .map(|r| (ttid, r))
     367            0 :         }
     368            0 :     });
     369            0 :     let mut timelines_checked =
     370            0 :         std::pin::pin!(timelines_checked.try_buffer_unordered(CONSOLE_CONCURRENCY));
     371            0 : 
     372            0 :     // Update the GarbageList with any timelines which appear not to exist.
     373            0 :     let mut active_timelines: Vec<TenantShardTimelineId> = vec![];
     374            0 :     while let Some(result) = timelines_checked.next().await {
     375            0 :         let (ttid, console_result) = result?;
     376            0 :         if garbage.maybe_append(GarbageEntity::Timeline(ttid), console_result) {
     377            0 :             tracing::debug!("Timeline {ttid} is garbage");
     378              :         } else {
     379            0 :             tracing::debug!("Timeline {ttid} is active");
     380            0 :             active_timelines.push(ttid);
     381            0 :             garbage.active_timeline_count = active_timelines.len();
     382              :         }
     383              :     }
     384              : 
     385            0 :     let num_garbage_timelines = garbage
     386            0 :         .items
     387            0 :         .iter()
     388            0 :         .filter(|g| matches!(g.entity, GarbageEntity::Timeline(_)))
     389            0 :         .count();
     390            0 :     tracing::info!(
     391            0 :         "Found {}/{} garbage timelines in active tenants",
     392            0 :         num_garbage_timelines,
     393            0 :         active_timelines.len(),
     394              :     );
     395              : 
     396            0 :     Ok(garbage)
     397            0 : }
     398              : 
     399              : #[derive(clap::ValueEnum, Debug, Clone)]
     400              : pub enum PurgeMode {
     401              :     /// The safest mode: only delete tenants that were explicitly reported as deleted
     402              :     /// by Console API.
     403              :     DeletedOnly,
     404              : 
     405              :     /// Delete all garbage tenants, including those which are only presumed to be deleted,
     406              :     /// because the Console API could not find them.
     407              :     DeletedAndMissing,
     408              : }
     409              : 
     410              : impl std::fmt::Display for PurgeMode {
     411            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     412            0 :         match self {
     413            0 :             PurgeMode::DeletedOnly => write!(f, "deleted-only"),
     414            0 :             PurgeMode::DeletedAndMissing => write!(f, "deleted-and-missing"),
     415              :         }
     416            0 :     }
     417              : }
     418              : 
     419            0 : pub async fn get_tenant_objects(
     420            0 :     s3_client: &GenericRemoteStorage,
     421            0 :     tenant_shard_id: TenantShardId,
     422            0 : ) -> anyhow::Result<Vec<ListingObject>> {
     423            0 :     tracing::debug!("Listing objects in tenant {tenant_shard_id}");
     424            0 :     let tenant_root = super::remote_tenant_path(&tenant_shard_id);
     425            0 : 
     426            0 :     // TODO: apply extra validation based on object modification time.  Don't purge
     427            0 :     // tenants where any timeline's index_part.json has been touched recently.
     428            0 : 
     429            0 :     let cancel = CancellationToken::new();
     430            0 :     let list = backoff::retry(
     431            0 :         || s3_client.list(Some(&tenant_root), ListingMode::NoDelimiter, None, &cancel),
     432            0 :         |_| false,
     433            0 :         3,
     434            0 :         MAX_RETRIES as u32,
     435            0 :         "get_tenant_objects",
     436            0 :         &cancel,
     437            0 :     )
     438            0 :     .await
     439            0 :     .expect("dummy cancellation token")?;
     440            0 :     Ok(list.keys)
     441            0 : }
     442              : 
     443            0 : pub async fn get_timeline_objects(
     444            0 :     s3_client: &GenericRemoteStorage,
     445            0 :     ttid: TenantShardTimelineId,
     446            0 : ) -> anyhow::Result<Vec<ListingObject>> {
     447            0 :     tracing::debug!("Listing objects in timeline {ttid}");
     448            0 :     let timeline_root = super::remote_timeline_path_id(&ttid);
     449            0 : 
     450            0 :     let cancel = CancellationToken::new();
     451            0 :     let list = backoff::retry(
     452            0 :         || {
     453            0 :             s3_client.list(
     454            0 :                 Some(&timeline_root),
     455            0 :                 ListingMode::NoDelimiter,
     456            0 :                 None,
     457            0 :                 &cancel,
     458            0 :             )
     459            0 :         },
     460            0 :         |_| false,
     461            0 :         3,
     462            0 :         MAX_RETRIES as u32,
     463            0 :         "get_timeline_objects",
     464            0 :         &cancel,
     465            0 :     )
     466            0 :     .await
     467            0 :     .expect("dummy cancellation token")?;
     468              : 
     469            0 :     Ok(list.keys)
     470            0 : }
     471              : 
     472              : /// Drain a buffer of keys into DeleteObjects requests
     473              : ///
     474              : /// If `drain` is true, drains keys completely; otherwise stops when <
     475              : /// `max_keys_per_delete`` keys are left.
     476              : /// `num_deleted` returns number of deleted keys.
     477            0 : async fn do_delete(
     478            0 :     remote_client: &GenericRemoteStorage,
     479            0 :     keys: &mut Vec<ListingObject>,
     480            0 :     dry_run: bool,
     481            0 :     drain: bool,
     482            0 :     progress_tracker: &mut DeletionProgressTracker,
     483            0 : ) -> anyhow::Result<()> {
     484            0 :     let cancel = CancellationToken::new();
     485            0 :     let max_keys_per_delete = remote_client.max_keys_per_delete();
     486            0 :     while (!keys.is_empty() && drain) || (keys.len() >= max_keys_per_delete) {
     487            0 :         let request_keys =
     488            0 :             keys.split_off(keys.len() - (std::cmp::min(max_keys_per_delete, keys.len())));
     489            0 : 
     490            0 :         let request_keys: Vec<RemotePath> = request_keys.into_iter().map(|o| o.key).collect();
     491            0 : 
     492            0 :         let num_deleted = request_keys.len();
     493            0 :         if dry_run {
     494            0 :             tracing::info!("Dry-run deletion of objects: ");
     495            0 :             for k in request_keys {
     496            0 :                 tracing::info!("  {k:?}");
     497              :             }
     498              :         } else {
     499            0 :             remote_client
     500            0 :                 .delete_objects(&request_keys, &cancel)
     501            0 :                 .await
     502            0 :                 .context("deletetion request")?;
     503            0 :             progress_tracker.register(num_deleted);
     504              :         }
     505              :     }
     506              : 
     507            0 :     Ok(())
     508            0 : }
     509              : 
     510              : /// Simple tracker reporting each 10k deleted keys.
     511              : #[derive(Default)]
     512              : struct DeletionProgressTracker {
     513              :     num_deleted: usize,
     514              :     last_reported_num_deleted: usize,
     515              : }
     516              : 
     517              : impl DeletionProgressTracker {
     518            0 :     fn register(&mut self, n: usize) {
     519            0 :         self.num_deleted += n;
     520            0 :         if self.num_deleted - self.last_reported_num_deleted > 10000 {
     521            0 :             tracing::info!("progress: deleted {} keys", self.num_deleted);
     522            0 :             self.last_reported_num_deleted = self.num_deleted;
     523            0 :         }
     524            0 :     }
     525              : }
     526              : 
     527            0 : pub async fn purge_garbage(
     528            0 :     input_path: String,
     529            0 :     mode: PurgeMode,
     530            0 :     min_age: Duration,
     531            0 :     dry_run: bool,
     532            0 : ) -> anyhow::Result<()> {
     533            0 :     let list_bytes = tokio::fs::read(&input_path).await?;
     534            0 :     let garbage_list = serde_json::from_slice::<GarbageList>(&list_bytes)?;
     535            0 :     tracing::info!(
     536            0 :         "Loaded {} items in garbage list from {}",
     537            0 :         garbage_list.items.len(),
     538              :         input_path
     539              :     );
     540              : 
     541            0 :     let (remote_client, _target) =
     542            0 :         init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?;
     543              : 
     544            0 :     assert_eq!(
     545            0 :         garbage_list.bucket_config.bucket_name().unwrap(),
     546            0 :         remote_client.bucket_name().unwrap()
     547            0 :     );
     548              : 
     549              :     // Sanity checks on the incoming list
     550            0 :     if garbage_list.active_tenant_count == 0 {
     551            0 :         anyhow::bail!("Refusing to purge a garbage list that reports 0 active tenants");
     552            0 :     }
     553            0 :     if garbage_list
     554            0 :         .items
     555            0 :         .iter()
     556            0 :         .any(|g| matches!(g.entity, GarbageEntity::Timeline(_)))
     557            0 :         && garbage_list.active_timeline_count == 0
     558              :     {
     559            0 :         anyhow::bail!(
     560            0 :             "Refusing to purge a garbage list containing garbage timelines that reports 0 active timelines"
     561            0 :         );
     562            0 :     }
     563            0 : 
     564            0 :     let filtered_items = garbage_list
     565            0 :         .items
     566            0 :         .iter()
     567            0 :         .filter(|i| match (&mode, &i.reason) {
     568            0 :             (PurgeMode::DeletedAndMissing, _) => true,
     569            0 :             (PurgeMode::DeletedOnly, GarbageReason::DeletedInConsole) => true,
     570            0 :             (PurgeMode::DeletedOnly, GarbageReason::KnownBug) => true,
     571            0 :             (PurgeMode::DeletedOnly, GarbageReason::MissingInConsole) => false,
     572            0 :         });
     573            0 : 
     574            0 :     tracing::info!(
     575            0 :         "Filtered down to {} garbage items based on mode {}",
     576            0 :         garbage_list.items.len(),
     577              :         mode
     578              :     );
     579              : 
     580            0 :     let items = tokio_stream::iter(filtered_items.map(Ok));
     581            0 :     let get_objects_results = items.map_ok(|i| {
     582            0 :         let remote_client = remote_client.clone();
     583            0 :         async move {
     584            0 :             match i.entity {
     585            0 :                 GarbageEntity::Tenant(tenant_id) => {
     586            0 :                     get_tenant_objects(&remote_client, tenant_id).await
     587              :                 }
     588            0 :                 GarbageEntity::Timeline(ttid) => get_timeline_objects(&remote_client, ttid).await,
     589              :             }
     590            0 :         }
     591            0 :     });
     592            0 :     let mut get_objects_results =
     593            0 :         std::pin::pin!(get_objects_results.try_buffer_unordered(S3_CONCURRENCY));
     594            0 : 
     595            0 :     let mut objects_to_delete = Vec::new();
     596            0 :     let mut progress_tracker = DeletionProgressTracker::default();
     597            0 :     while let Some(result) = get_objects_results.next().await {
     598            0 :         let mut object_list = result?;
     599              : 
     600              :         // Extra safety check: even if a collection of objects is garbage, check max() of modification
     601              :         // times before purging, so that if we incorrectly marked a live tenant as garbage then we would
     602              :         // notice that its index has been written recently and would omit deleting it.
     603            0 :         if object_list.is_empty() {
     604              :             // Simplify subsequent code by ensuring list always has at least one item
     605              :             // Usually, this only occurs if there is parallel deletions racing us, as there is no empty prefixes
     606            0 :             continue;
     607            0 :         }
     608            0 :         let max_mtime = object_list.iter().map(|o| o.last_modified).max().unwrap();
     609            0 :         let age = max_mtime.elapsed();
     610            0 :         match age {
     611              :             Err(_) => {
     612            0 :                 tracing::warn!("Bad last_modified time");
     613            0 :                 continue;
     614              :             }
     615            0 :             Ok(a) if a < min_age => {
     616            0 :                 // Failed age check.  This doesn't mean we did something wrong: a tenant might really be garbage and recently
     617            0 :                 // written, but out of an abundance of caution we still don't purge it.
     618            0 :                 tracing::info!(
     619            0 :                     "Skipping tenant with young objects {}..{}",
     620            0 :                     object_list.first().as_ref().unwrap().key,
     621            0 :                     object_list.last().as_ref().unwrap().key
     622              :                 );
     623            0 :                 continue;
     624              :             }
     625            0 :             Ok(_) => {
     626            0 :                 // Passed age check
     627            0 :             }
     628            0 :         }
     629            0 : 
     630            0 :         objects_to_delete.append(&mut object_list);
     631            0 :         if objects_to_delete.len() >= remote_client.max_keys_per_delete() {
     632            0 :             do_delete(
     633            0 :                 &remote_client,
     634            0 :                 &mut objects_to_delete,
     635            0 :                 dry_run,
     636            0 :                 false,
     637            0 :                 &mut progress_tracker,
     638            0 :             )
     639            0 :             .await?;
     640            0 :         }
     641              :     }
     642              : 
     643            0 :     do_delete(
     644            0 :         &remote_client,
     645            0 :         &mut objects_to_delete,
     646            0 :         dry_run,
     647            0 :         true,
     648            0 :         &mut progress_tracker,
     649            0 :     )
     650            0 :     .await?;
     651              : 
     652            0 :     tracing::info!("{} keys deleted in total", progress_tracker.num_deleted);
     653              : 
     654            0 :     Ok(())
     655            0 : }
        

Generated by: LCOV version 2.1-beta