LCOV - differential code coverage report
Current view: top level - s3_scrubber/src - garbage.rs (source / functions) Coverage Total Hit UBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 0.0 % 299 0 299
Current Date: 2024-01-09 02:06:09 Functions: 0.0 % 121 0 121
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! Functionality for finding and purging garbage, as in "garbage collection".  Garbage means
       2                 : //! S3 objects which are either not referenced by any metadata, or are referenced by a
       3                 : //! control plane tenant/timeline in a deleted state.
       4                 : 
       5                 : use std::{
       6                 :     collections::{HashMap, HashSet},
       7                 :     sync::Arc,
       8                 : };
       9                 : 
      10                 : use anyhow::Context;
      11                 : use aws_sdk_s3::{
      12                 :     types::{Delete, ObjectIdentifier},
      13                 :     Client,
      14                 : };
      15                 : use futures_util::{pin_mut, TryStreamExt};
      16                 : use pageserver_api::shard::TenantShardId;
      17                 : use serde::{Deserialize, Serialize};
      18                 : use tokio_stream::StreamExt;
      19                 : use utils::id::TenantId;
      20                 : 
      21                 : use crate::{
      22                 :     cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
      23                 :     init_remote,
      24                 :     metadata_stream::{stream_listing, stream_tenant_timelines, stream_tenants},
      25                 :     BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId, TraversingDepth,
      26                 : };
      27                 : 
      28 UBC           0 : #[derive(Serialize, Deserialize, Debug)]
      29                 : enum GarbageReason {
      30                 :     DeletedInConsole,
      31                 :     MissingInConsole,
      32                 : }
      33                 : 
      34               0 : #[derive(Serialize, Deserialize, Debug)]
      35                 : enum GarbageEntity {
      36                 :     Tenant(TenantShardId),
      37                 :     Timeline(TenantShardTimelineId),
      38                 : }
      39                 : 
      40               0 : #[derive(Serialize, Deserialize, Debug)]
      41                 : struct GarbageItem {
      42                 :     entity: GarbageEntity,
      43                 :     reason: GarbageReason,
      44                 : }
      45                 : 
      46               0 : #[derive(Serialize, Deserialize, Debug)]
      47                 : pub struct GarbageList {
      48                 :     /// Remember what NodeKind we were finding garbage for, so that we can
      49                 :     /// purge the list without re-stating it.
      50                 :     node_kind: NodeKind,
      51                 : 
      52                 :     /// Embed the identity of the bucket, so that we do not risk executing
      53                 :     /// the wrong list against the wrong bucket, and so that the user does not have
      54                 :     /// to re-state the bucket details when purging.
      55                 :     bucket_config: BucketConfig,
      56                 : 
      57                 :     items: Vec<GarbageItem>,
      58                 : 
      59                 :     /// Advisory information to enable consumers to do a validation that if we
      60                 :     /// see garbage, we saw some active tenants too.  This protects against classes of bugs
      61                 :     /// in the scrubber that might otherwise generate a "deleted all" result.
      62                 :     active_tenant_count: usize,
      63                 : }
      64                 : 
      65                 : impl GarbageList {
      66               0 :     fn new(node_kind: NodeKind, bucket_config: BucketConfig) -> Self {
      67               0 :         Self {
      68               0 :             items: Vec::new(),
      69               0 :             active_tenant_count: 0,
      70               0 :             node_kind,
      71               0 :             bucket_config,
      72               0 :         }
      73               0 :     }
      74                 : 
      75                 :     /// Return true if appended, false if not.  False means the result was not garbage.
      76               0 :     fn maybe_append<T>(&mut self, entity: GarbageEntity, result: Option<T>) -> bool
      77               0 :     where
      78               0 :         T: MaybeDeleted,
      79               0 :     {
      80               0 :         match result {
      81               0 :             Some(result_item) if result_item.is_deleted() => {
      82               0 :                 self.items.push(GarbageItem {
      83               0 :                     entity,
      84               0 :                     reason: GarbageReason::DeletedInConsole,
      85               0 :                 });
      86               0 :                 true
      87                 :             }
      88               0 :             Some(_) => false,
      89                 :             None => {
      90               0 :                 self.items.push(GarbageItem {
      91               0 :                     entity,
      92               0 :                     reason: GarbageReason::MissingInConsole,
      93               0 :                 });
      94               0 :                 true
      95                 :             }
      96                 :         }
      97               0 :     }
      98                 : }
      99                 : 
     100               0 : pub async fn find_garbage(
     101               0 :     bucket_config: BucketConfig,
     102               0 :     console_config: ConsoleConfig,
     103               0 :     depth: TraversingDepth,
     104               0 :     node_kind: NodeKind,
     105               0 :     output_path: String,
     106               0 : ) -> anyhow::Result<()> {
     107               0 :     let garbage = find_garbage_inner(bucket_config, console_config, depth, node_kind).await?;
     108               0 :     let serialized = serde_json::to_vec_pretty(&garbage)?;
     109                 : 
     110               0 :     tokio::fs::write(&output_path, &serialized).await?;
     111                 : 
     112               0 :     tracing::info!("Wrote garbage report to {output_path}");
     113                 : 
     114               0 :     Ok(())
     115               0 : }
     116                 : 
     117                 : // How many concurrent S3 operations to issue (approximately): this is the concurrency
     118                 : // for things like listing the timelines within tenant prefixes.
     119                 : const S3_CONCURRENCY: usize = 32;
     120                 : 
     121                 : // How many concurrent API requests to make to the console API.
     122                 : const CONSOLE_CONCURRENCY: usize = 128;
     123                 : 
     124                 : struct ConsoleCache {
     125                 :     /// Set of tenants found in the control plane API
     126                 :     projects: HashMap<TenantId, ProjectData>,
     127                 :     /// Set of tenants for which the control plane API returned 404
     128                 :     not_found: HashSet<TenantId>,
     129                 : }
     130                 : 
     131               0 : async fn find_garbage_inner(
     132               0 :     bucket_config: BucketConfig,
     133               0 :     console_config: ConsoleConfig,
     134               0 :     depth: TraversingDepth,
     135               0 :     node_kind: NodeKind,
     136               0 : ) -> anyhow::Result<GarbageList> {
     137                 :     // Construct clients for S3 and for Console API
     138               0 :     let (s3_client, target) = init_remote(bucket_config.clone(), node_kind)?;
     139               0 :     let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config));
     140                 : 
     141                 :     // Build a set of console-known tenants, for quickly eliminating known-active tenants without having
     142                 :     // to issue O(N) console API requests.
     143               0 :     let console_projects: HashMap<TenantId, ProjectData> = cloud_admin_api_client
     144               0 :         // FIXME: we can't just assume that all console's region ids are aws-<something>.  This hack
     145               0 :         // will go away when we are talking to Control Plane APIs, which are per-region.
     146               0 :         .list_projects(format!("aws-{}", bucket_config.region))
     147               0 :         .await?
     148               0 :         .into_iter()
     149               0 :         .map(|t| (t.tenant, t))
     150               0 :         .collect();
     151               0 :     tracing::info!(
     152               0 :         "Loaded {} console projects tenant IDs",
     153               0 :         console_projects.len()
     154               0 :     );
     155                 : 
     156                 :     // Because many tenant shards may look up the same TenantId, we maintain a cache.
     157               0 :     let console_cache = Arc::new(std::sync::Mutex::new(ConsoleCache {
     158               0 :         projects: console_projects,
     159               0 :         not_found: HashSet::new(),
     160               0 :     }));
     161                 : 
     162                 :     // Enumerate Tenants in S3, and check if each one exists in Console
     163               0 :     tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket);
     164               0 :     let tenants = stream_tenants(&s3_client, &target);
     165               0 :     let tenants_checked = tenants.map_ok(|t| {
     166               0 :         let api_client = cloud_admin_api_client.clone();
     167               0 :         let console_cache = console_cache.clone();
     168               0 :         async move {
     169                 :             // Check cache before issuing API call
     170               0 :             let project_data = {
     171               0 :                 let cache = console_cache.lock().unwrap();
     172               0 :                 let result = cache.projects.get(&t.tenant_id).cloned();
     173               0 :                 if result.is_none() && cache.not_found.contains(&t.tenant_id) {
     174               0 :                     return Ok((t, None));
     175               0 :                 }
     176               0 :                 result
     177               0 :             };
     178               0 : 
     179               0 :             match project_data {
     180               0 :                 Some(project_data) => Ok((t, Some(project_data.clone()))),
     181                 :                 None => {
     182               0 :                     let project_data = api_client
     183               0 :                         .find_tenant_project(t.tenant_id)
     184               0 :                         .await
     185               0 :                         .map_err(|e| anyhow::anyhow!(e));
     186               0 : 
     187               0 :                     // Populate cache with result of API call
     188               0 :                     {
     189               0 :                         let mut cache = console_cache.lock().unwrap();
     190               0 :                         if let Ok(Some(project_data)) = &project_data {
     191               0 :                             cache.projects.insert(t.tenant_id, project_data.clone());
     192               0 :                         } else if let Ok(None) = &project_data {
     193               0 :                             cache.not_found.insert(t.tenant_id);
     194               0 :                         }
     195                 :                     }
     196                 : 
     197               0 :                     project_data.map(|r| (t, r))
     198                 :                 }
     199                 :             }
     200               0 :         }
     201               0 :     });
     202               0 :     let tenants_checked = tenants_checked.try_buffer_unordered(CONSOLE_CONCURRENCY);
     203               0 : 
     204               0 :     // Process the results of Tenant checks.  If a Tenant is garbage, it goes into
     205               0 :     // the `GarbageList`.  Else it goes into `active_tenants` for more detailed timeline
     206               0 :     // checks if they are enabled by the `depth` parameter.
     207               0 :     pin_mut!(tenants_checked);
     208               0 :     let mut garbage = GarbageList::new(node_kind, bucket_config);
     209               0 :     let mut active_tenants: Vec<TenantShardId> = vec![];
     210               0 :     let mut counter = 0;
     211               0 :     while let Some(result) = tenants_checked.next().await {
     212               0 :         let (tenant_shard_id, console_result) = result?;
     213                 : 
     214                 :         // Paranoia check
     215               0 :         if let Some(project) = &console_result {
     216               0 :             assert!(project.tenant == tenant_shard_id.tenant_id);
     217               0 :         }
     218                 : 
     219               0 :         if garbage.maybe_append(GarbageEntity::Tenant(tenant_shard_id), console_result) {
     220               0 :             tracing::debug!("Tenant {tenant_shard_id} is garbage");
     221                 :         } else {
     222               0 :             tracing::debug!("Tenant {tenant_shard_id} is active");
     223               0 :             active_tenants.push(tenant_shard_id);
     224                 :         }
     225                 : 
     226               0 :         counter += 1;
     227               0 :         if counter % 1000 == 0 {
     228               0 :             tracing::info!(
     229               0 :                 "Progress: {counter} tenants checked, {} active, {} garbage",
     230               0 :                 active_tenants.len(),
     231               0 :                 garbage.items.len()
     232               0 :             );
     233               0 :         }
     234                 :     }
     235                 : 
     236               0 :     tracing::info!(
     237               0 :         "Found {}/{} garbage tenants",
     238               0 :         garbage.items.len(),
     239               0 :         garbage.items.len() + active_tenants.len()
     240               0 :     );
     241                 : 
     242                 :     // If we are only checking tenant-deep, we are done.  Otherwise we must
     243                 :     // proceed to check the individual timelines of the active tenants.
     244               0 :     if depth == TraversingDepth::Tenant {
     245               0 :         return Ok(garbage);
     246               0 :     }
     247                 : 
     248               0 :     tracing::info!(
     249               0 :         "Checking timelines for {} active tenants",
     250               0 :         active_tenants.len(),
     251               0 :     );
     252                 : 
     253                 :     // Construct a stream of all timelines within active tenants
     254               0 :     let active_tenants = tokio_stream::iter(active_tenants.iter().map(Ok));
     255               0 :     let timelines = active_tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, *t));
     256               0 :     let timelines = timelines.try_buffer_unordered(S3_CONCURRENCY);
     257               0 :     let timelines = timelines.try_flatten();
     258               0 : 
     259               0 :     // For all timelines within active tenants, call into console API to check their existence
     260               0 :     let timelines_checked = timelines.map_ok(|ttid| {
     261               0 :         let api_client = cloud_admin_api_client.clone();
     262               0 :         async move {
     263               0 :             api_client
     264               0 :                 .find_timeline_branch(ttid.timeline_id)
     265               0 :                 .await
     266               0 :                 .map_err(|e| anyhow::anyhow!(e))
     267               0 :                 .map(|r| (ttid, r))
     268               0 :         }
     269               0 :     });
     270               0 :     let timelines_checked = timelines_checked.try_buffer_unordered(CONSOLE_CONCURRENCY);
     271               0 : 
     272               0 :     // Update the GarbageList with any timelines which appear not to exist.
     273               0 :     pin_mut!(timelines_checked);
     274               0 :     while let Some(result) = timelines_checked.next().await {
     275               0 :         let (ttid, console_result) = result?;
     276               0 :         if garbage.maybe_append(GarbageEntity::Timeline(ttid), console_result) {
     277               0 :             tracing::debug!("Timeline {ttid} is garbage");
     278                 :         } else {
     279               0 :             tracing::debug!("Timeline {ttid} is active");
     280                 :         }
     281                 :     }
     282                 : 
     283               0 :     Ok(garbage)
     284               0 : }
     285                 : 
     286               0 : #[derive(clap::ValueEnum, Debug, Clone)]
     287                 : pub enum PurgeMode {
     288                 :     /// The safest mode: only delete tenants that were explicitly reported as deleted
     289                 :     /// by Console API.
     290                 :     DeletedOnly,
     291                 : 
     292                 :     /// Delete all garbage tenants, including those which are only presumed to be deleted,
     293                 :     /// because the Console API could not find them.
     294                 :     DeletedAndMissing,
     295                 : }
     296                 : 
     297                 : impl std::fmt::Display for PurgeMode {
     298               0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     299               0 :         match self {
     300               0 :             PurgeMode::DeletedOnly => write!(f, "deleted-only"),
     301               0 :             PurgeMode::DeletedAndMissing => write!(f, "deleted-and-missing"),
     302                 :         }
     303               0 :     }
     304                 : }
     305                 : 
     306               0 : pub async fn get_tenant_objects(
     307               0 :     s3_client: &Arc<Client>,
     308               0 :     target: RootTarget,
     309               0 :     tenant_shard_id: TenantShardId,
     310               0 : ) -> anyhow::Result<Vec<ObjectIdentifier>> {
     311               0 :     tracing::debug!("Listing objects in tenant {tenant_shard_id}");
     312                 :     // TODO: apply extra validation based on object modification time.  Don't purge
     313                 :     // tenants where any timeline's index_part.json has been touched recently.
     314                 : 
     315               0 :     let mut tenant_root = target.tenant_root(&tenant_shard_id);
     316               0 : 
     317               0 :     // Remove delimiter, so that object listing lists all keys in the prefix and not just
     318               0 :     // common prefixes.
     319               0 :     tenant_root.delimiter = String::new();
     320               0 : 
     321               0 :     let key_stream = stream_listing(s3_client, &tenant_root);
     322               0 :     key_stream.try_collect().await
     323               0 : }
     324                 : 
     325               0 : pub async fn get_timeline_objects(
     326               0 :     s3_client: &Arc<Client>,
     327               0 :     target: RootTarget,
     328               0 :     ttid: TenantShardTimelineId,
     329               0 : ) -> anyhow::Result<Vec<ObjectIdentifier>> {
     330               0 :     tracing::debug!("Listing objects in timeline {ttid}");
     331               0 :     let mut timeline_root = target.timeline_root(&ttid);
     332               0 : 
     333               0 :     // TODO: apply extra validation based on object modification time.  Don't purge
     334               0 :     // timelines whose index_part.json has been touched recently.
     335               0 : 
     336               0 :     // Remove delimiter, so that object listing lists all keys in the prefix and not just
     337               0 :     // common prefixes.
     338               0 :     timeline_root.delimiter = String::new();
     339               0 :     let key_stream = stream_listing(s3_client, &timeline_root);
     340               0 : 
     341               0 :     key_stream.try_collect().await
     342               0 : }
     343                 : 
     344                 : const MAX_KEYS_PER_DELETE: usize = 1000;
     345                 : 
     346                 : /// Drain a buffer of keys into DeleteObjects requests
     347               0 : async fn do_delete(
     348               0 :     s3_client: &Arc<Client>,
     349               0 :     bucket_name: &str,
     350               0 :     keys: &mut Vec<ObjectIdentifier>,
     351               0 :     dry_run: bool,
     352               0 :     drain: bool,
     353               0 : ) -> anyhow::Result<()> {
     354               0 :     while (!keys.is_empty() && drain) || (keys.len() >= MAX_KEYS_PER_DELETE) {
     355               0 :         let request_keys =
     356               0 :             keys.split_off(keys.len() - (std::cmp::min(MAX_KEYS_PER_DELETE, keys.len())));
     357               0 :         if dry_run {
     358               0 :             tracing::info!("Dry-run deletion of objects: ");
     359               0 :             for k in request_keys {
     360               0 :                 tracing::info!("  {k:?}");
     361                 :             }
     362                 :         } else {
     363               0 :             let delete_request = s3_client
     364               0 :                 .delete_objects()
     365               0 :                 .bucket(bucket_name)
     366               0 :                 .delete(Delete::builder().set_objects(Some(request_keys)).build()?);
     367               0 :             delete_request
     368               0 :                 .send()
     369               0 :                 .await
     370               0 :                 .context("DeleteObjects request")?;
     371                 :         }
     372                 :     }
     373                 : 
     374               0 :     Ok(())
     375               0 : }
     376                 : 
     377               0 : pub async fn purge_garbage(
     378               0 :     input_path: String,
     379               0 :     mode: PurgeMode,
     380               0 :     dry_run: bool,
     381               0 : ) -> anyhow::Result<()> {
     382               0 :     let list_bytes = tokio::fs::read(&input_path).await?;
     383               0 :     let garbage_list = serde_json::from_slice::<GarbageList>(&list_bytes)?;
     384               0 :     tracing::info!(
     385               0 :         "Loaded {} items in garbage list from {}",
     386               0 :         garbage_list.items.len(),
     387               0 :         input_path
     388               0 :     );
     389                 : 
     390               0 :     let (s3_client, target) =
     391               0 :         init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind)?;
     392                 : 
     393                 :     // Sanity checks on the incoming list
     394               0 :     if garbage_list.active_tenant_count == 0 {
     395               0 :         anyhow::bail!("Refusing to purge a garbage list that reports 0 active tenants");
     396               0 :     }
     397               0 : 
     398               0 :     let filtered_items = garbage_list
     399               0 :         .items
     400               0 :         .iter()
     401               0 :         .filter(|i| match (&mode, &i.reason) {
     402               0 :             (PurgeMode::DeletedAndMissing, _) => true,
     403               0 :             (PurgeMode::DeletedOnly, GarbageReason::DeletedInConsole) => true,
     404               0 :             (PurgeMode::DeletedOnly, GarbageReason::MissingInConsole) => false,
     405               0 :         });
     406                 : 
     407               0 :     tracing::info!(
     408               0 :         "Filtered down to {} garbage items based on mode {}",
     409               0 :         garbage_list.items.len(),
     410               0 :         mode
     411               0 :     );
     412                 : 
     413               0 :     let items = tokio_stream::iter(filtered_items.map(Ok));
     414               0 :     let get_objects_results = items.map_ok(|i| {
     415               0 :         let s3_client = s3_client.clone();
     416               0 :         let target = target.clone();
     417               0 :         async move {
     418               0 :             match i.entity {
     419               0 :                 GarbageEntity::Tenant(tenant_id) => {
     420               0 :                     get_tenant_objects(&s3_client, target, tenant_id).await
     421                 :                 }
     422               0 :                 GarbageEntity::Timeline(ttid) => {
     423               0 :                     get_timeline_objects(&s3_client, target, ttid).await
     424                 :                 }
     425                 :             }
     426               0 :         }
     427               0 :     });
     428               0 :     let get_objects_results = get_objects_results.try_buffer_unordered(S3_CONCURRENCY);
     429               0 : 
     430               0 :     pin_mut!(get_objects_results);
     431               0 :     let mut objects_to_delete = Vec::new();
     432               0 :     while let Some(result) = get_objects_results.next().await {
     433               0 :         let mut object_list = result?;
     434               0 :         objects_to_delete.append(&mut object_list);
     435               0 :         if objects_to_delete.len() >= MAX_KEYS_PER_DELETE {
     436               0 :             do_delete(
     437               0 :                 &s3_client,
     438               0 :                 &garbage_list.bucket_config.bucket,
     439               0 :                 &mut objects_to_delete,
     440               0 :                 dry_run,
     441               0 :                 false,
     442               0 :             )
     443               0 :             .await?;
     444               0 :         }
     445                 :     }
     446                 : 
     447               0 :     do_delete(
     448               0 :         &s3_client,
     449               0 :         &garbage_list.bucket_config.bucket,
     450               0 :         &mut objects_to_delete,
     451               0 :         dry_run,
     452               0 :         true,
     453               0 :     )
     454               0 :     .await?;
     455                 : 
     456               0 :     tracing::info!("Fell through");
     457                 : 
     458               0 :     Ok(())
     459               0 : }
        

Generated by: LCOV version 2.1-beta