LCOV - code coverage report
Current view: top level - storage_scrubber/src - garbage.rs (source / functions) Coverage Total Hit
Test: 465a86b0c1fda0069b3e0f6c1c126e6b635a1f72.info Lines: 0.0 % 329 0
Test Date: 2024-06-25 15:47:26 Functions: 0.0 % 81 0

            Line data    Source code
       1              : //! Functionality for finding and purging garbage, as in "garbage collection".  Garbage means
       2              : //! S3 objects which are either not referenced by any metadata, or are referenced by a
       3              : //! control plane tenant/timeline in a deleted state.
       4              : 
       5              : use std::{
       6              :     collections::{HashMap, HashSet},
       7              :     sync::Arc,
       8              : };
       9              : 
      10              : use anyhow::Context;
      11              : use aws_sdk_s3::{
      12              :     types::{Delete, ObjectIdentifier},
      13              :     Client,
      14              : };
      15              : use futures_util::TryStreamExt;
      16              : use pageserver_api::shard::TenantShardId;
      17              : use serde::{Deserialize, Serialize};
      18              : use tokio_stream::StreamExt;
      19              : use utils::id::TenantId;
      20              : 
      21              : use crate::{
      22              :     cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
      23              :     init_remote,
      24              :     metadata_stream::{stream_listing, stream_tenant_timelines, stream_tenants},
      25              :     BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId, TraversingDepth,
      26              : };
      27              : 
      28            0 : #[derive(Serialize, Deserialize, Debug)]
      29              : enum GarbageReason {
      30              :     DeletedInConsole,
      31              :     MissingInConsole,
      32              : }
      33              : 
      34            0 : #[derive(Serialize, Deserialize, Debug)]
      35              : enum GarbageEntity {
      36              :     Tenant(TenantShardId),
      37              :     Timeline(TenantShardTimelineId),
      38              : }
      39              : 
      40            0 : #[derive(Serialize, Deserialize, Debug)]
      41              : struct GarbageItem {
      42              :     entity: GarbageEntity,
      43              :     reason: GarbageReason,
      44              : }
      45              : 
      46            0 : #[derive(Serialize, Deserialize, Debug)]
      47              : pub struct GarbageList {
      48              :     /// Remember what NodeKind we were finding garbage for, so that we can
      49              :     /// purge the list without re-stating it.
      50              :     node_kind: NodeKind,
      51              : 
      52              :     /// Embed the identity of the bucket, so that we do not risk executing
      53              :     /// the wrong list against the wrong bucket, and so that the user does not have
      54              :     /// to re-state the bucket details when purging.
      55              :     bucket_config: BucketConfig,
      56              : 
      57              :     items: Vec<GarbageItem>,
      58              : 
      59              :     /// Advisory information to enable consumers to do a validation that if we
      60              :     /// see garbage, we saw some active tenants too.  This protects against classes of bugs
      61              :     /// in the scrubber that might otherwise generate a "deleted all" result.
      62              :     active_tenant_count: usize,
      63              :     active_timeline_count: usize,
      64              : }
      65              : 
      66              : impl GarbageList {
      67            0 :     fn new(node_kind: NodeKind, bucket_config: BucketConfig) -> Self {
      68            0 :         Self {
      69            0 :             items: Vec::new(),
      70            0 :             active_tenant_count: 0,
      71            0 :             active_timeline_count: 0,
      72            0 :             node_kind,
      73            0 :             bucket_config,
      74            0 :         }
      75            0 :     }
      76              : 
      77              :     /// Return true if appended, false if not.  False means the result was not garbage.
      78            0 :     fn maybe_append<T>(&mut self, entity: GarbageEntity, result: Option<T>) -> bool
      79            0 :     where
      80            0 :         T: MaybeDeleted,
      81            0 :     {
      82            0 :         match result {
      83            0 :             Some(result_item) if result_item.is_deleted() => {
      84            0 :                 self.items.push(GarbageItem {
      85            0 :                     entity,
      86            0 :                     reason: GarbageReason::DeletedInConsole,
      87            0 :                 });
      88            0 :                 true
      89              :             }
      90            0 :             Some(_) => false,
      91              :             None => {
      92            0 :                 self.items.push(GarbageItem {
      93            0 :                     entity,
      94            0 :                     reason: GarbageReason::MissingInConsole,
      95            0 :                 });
      96            0 :                 true
      97              :             }
      98              :         }
      99            0 :     }
     100              : }
     101              : 
     102            0 : pub async fn find_garbage(
     103            0 :     bucket_config: BucketConfig,
     104            0 :     console_config: ConsoleConfig,
     105            0 :     depth: TraversingDepth,
     106            0 :     node_kind: NodeKind,
     107            0 :     output_path: String,
     108            0 : ) -> anyhow::Result<()> {
     109            0 :     let garbage = find_garbage_inner(bucket_config, console_config, depth, node_kind).await?;
     110            0 :     let serialized = serde_json::to_vec_pretty(&garbage)?;
     111              : 
     112            0 :     tokio::fs::write(&output_path, &serialized).await?;
     113              : 
     114            0 :     tracing::info!("Wrote garbage report to {output_path}");
     115              : 
     116            0 :     Ok(())
     117            0 : }
     118              : 
     119              : // How many concurrent S3 operations to issue (approximately): this is the concurrency
     120              : // for things like listing the timelines within tenant prefixes.
     121              : const S3_CONCURRENCY: usize = 32;
     122              : 
     123              : // How many concurrent API requests to make to the console API.
     124              : //
     125              : // Be careful increasing this; roughly we shouldn't have more than ~100 rps. It
     126              : // would be better to implement real rsp limiter.
     127              : const CONSOLE_CONCURRENCY: usize = 16;
     128              : 
     129              : struct ConsoleCache {
     130              :     /// Set of tenants found in the control plane API
     131              :     projects: HashMap<TenantId, ProjectData>,
     132              :     /// Set of tenants for which the control plane API returned 404
     133              :     not_found: HashSet<TenantId>,
     134              : }
     135              : 
     136            0 : async fn find_garbage_inner(
     137            0 :     bucket_config: BucketConfig,
     138            0 :     console_config: ConsoleConfig,
     139            0 :     depth: TraversingDepth,
     140            0 :     node_kind: NodeKind,
     141            0 : ) -> anyhow::Result<GarbageList> {
     142              :     // Construct clients for S3 and for Console API
     143            0 :     let (s3_client, target) = init_remote(bucket_config.clone(), node_kind)?;
     144            0 :     let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config));
     145              : 
     146              :     // Build a set of console-known tenants, for quickly eliminating known-active tenants without having
     147              :     // to issue O(N) console API requests.
     148            0 :     let console_projects: HashMap<TenantId, ProjectData> = cloud_admin_api_client
     149            0 :         // FIXME: we can't just assume that all console's region ids are aws-<something>.  This hack
     150            0 :         // will go away when we are talking to Control Plane APIs, which are per-region.
     151            0 :         .list_projects(format!("aws-{}", bucket_config.region))
     152            0 :         .await?
     153            0 :         .into_iter()
     154            0 :         .map(|t| (t.tenant, t))
     155            0 :         .collect();
     156            0 :     tracing::info!(
     157            0 :         "Loaded {} console projects tenant IDs",
     158            0 :         console_projects.len()
     159              :     );
     160              : 
     161              :     // Because many tenant shards may look up the same TenantId, we maintain a cache.
     162            0 :     let console_cache = Arc::new(std::sync::Mutex::new(ConsoleCache {
     163            0 :         projects: console_projects,
     164            0 :         not_found: HashSet::new(),
     165            0 :     }));
     166            0 : 
     167            0 :     // Enumerate Tenants in S3, and check if each one exists in Console
     168            0 :     tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket);
     169            0 :     let tenants = stream_tenants(&s3_client, &target);
     170            0 :     let tenants_checked = tenants.map_ok(|t| {
     171            0 :         let api_client = cloud_admin_api_client.clone();
     172            0 :         let console_cache = console_cache.clone();
     173            0 :         async move {
     174              :             // Check cache before issuing API call
     175            0 :             let project_data = {
     176            0 :                 let cache = console_cache.lock().unwrap();
     177            0 :                 let result = cache.projects.get(&t.tenant_id).cloned();
     178            0 :                 if result.is_none() && cache.not_found.contains(&t.tenant_id) {
     179            0 :                     return Ok((t, None));
     180            0 :                 }
     181            0 :                 result
     182            0 :             };
     183            0 : 
     184            0 :             match project_data {
     185            0 :                 Some(project_data) => Ok((t, Some(project_data.clone()))),
     186              :                 None => {
     187            0 :                     let project_data = api_client
     188            0 :                         .find_tenant_project(t.tenant_id)
     189            0 :                         .await
     190            0 :                         .map_err(|e| anyhow::anyhow!(e));
     191            0 : 
     192            0 :                     // Populate cache with result of API call
     193            0 :                     {
     194            0 :                         let mut cache = console_cache.lock().unwrap();
     195            0 :                         if let Ok(Some(project_data)) = &project_data {
     196            0 :                             cache.projects.insert(t.tenant_id, project_data.clone());
     197            0 :                         } else if let Ok(None) = &project_data {
     198            0 :                             cache.not_found.insert(t.tenant_id);
     199            0 :                         }
     200              :                     }
     201              : 
     202            0 :                     project_data.map(|r| (t, r))
     203              :                 }
     204              :             }
     205            0 :         }
     206            0 :     });
     207            0 :     let mut tenants_checked =
     208            0 :         std::pin::pin!(tenants_checked.try_buffer_unordered(CONSOLE_CONCURRENCY));
     209            0 : 
     210            0 :     // Process the results of Tenant checks.  If a Tenant is garbage, it goes into
     211            0 :     // the `GarbageList`.  Else it goes into `active_tenants` for more detailed timeline
     212            0 :     // checks if they are enabled by the `depth` parameter.
     213            0 :     let mut garbage = GarbageList::new(node_kind, bucket_config);
     214            0 :     let mut active_tenants: Vec<TenantShardId> = vec![];
     215            0 :     let mut counter = 0;
     216            0 :     while let Some(result) = tenants_checked.next().await {
     217            0 :         let (tenant_shard_id, console_result) = result?;
     218              : 
     219              :         // Paranoia check
     220            0 :         if let Some(project) = &console_result {
     221            0 :             assert!(project.tenant == tenant_shard_id.tenant_id);
     222            0 :         }
     223              : 
     224            0 :         if garbage.maybe_append(GarbageEntity::Tenant(tenant_shard_id), console_result) {
     225            0 :             tracing::debug!("Tenant {tenant_shard_id} is garbage");
     226              :         } else {
     227            0 :             tracing::debug!("Tenant {tenant_shard_id} is active");
     228            0 :             active_tenants.push(tenant_shard_id);
     229            0 :             garbage.active_tenant_count = active_tenants.len();
     230              :         }
     231              : 
     232            0 :         counter += 1;
     233            0 :         if counter % 1000 == 0 {
     234            0 :             tracing::info!(
     235            0 :                 "Progress: {counter} tenants checked, {} active, {} garbage",
     236            0 :                 active_tenants.len(),
     237            0 :                 garbage.items.len()
     238              :             );
     239            0 :         }
     240              :     }
     241              : 
     242            0 :     tracing::info!(
     243            0 :         "Found {}/{} garbage tenants",
     244            0 :         garbage.items.len(),
     245            0 :         garbage.items.len() + active_tenants.len()
     246              :     );
     247              : 
     248              :     // If we are only checking tenant-deep, we are done.  Otherwise we must
     249              :     // proceed to check the individual timelines of the active tenants.
     250            0 :     if depth == TraversingDepth::Tenant {
     251            0 :         return Ok(garbage);
     252            0 :     }
     253            0 : 
     254            0 :     tracing::info!(
     255            0 :         "Checking timelines for {} active tenants",
     256            0 :         active_tenants.len(),
     257              :     );
     258              : 
     259              :     // Construct a stream of all timelines within active tenants
     260            0 :     let active_tenants = tokio_stream::iter(active_tenants.iter().map(Ok));
     261            0 :     let timelines = active_tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, *t));
     262            0 :     let timelines = timelines.try_buffer_unordered(S3_CONCURRENCY);
     263            0 :     let timelines = timelines.try_flatten();
     264            0 : 
     265            0 :     // For all timelines within active tenants, call into console API to check their existence
     266            0 :     let timelines_checked = timelines.map_ok(|ttid| {
     267            0 :         let api_client = cloud_admin_api_client.clone();
     268            0 :         async move {
     269            0 :             api_client
     270            0 :                 .find_timeline_branch(ttid.tenant_shard_id.tenant_id, ttid.timeline_id)
     271            0 :                 .await
     272            0 :                 .map_err(|e| anyhow::anyhow!(e))
     273            0 :                 .map(|r| (ttid, r))
     274            0 :         }
     275            0 :     });
     276            0 :     let mut timelines_checked =
     277            0 :         std::pin::pin!(timelines_checked.try_buffer_unordered(CONSOLE_CONCURRENCY));
     278            0 : 
     279            0 :     // Update the GarbageList with any timelines which appear not to exist.
     280            0 :     let mut active_timelines: Vec<TenantShardTimelineId> = vec![];
     281            0 :     while let Some(result) = timelines_checked.next().await {
     282            0 :         let (ttid, console_result) = result?;
     283            0 :         if garbage.maybe_append(GarbageEntity::Timeline(ttid), console_result) {
     284            0 :             tracing::debug!("Timeline {ttid} is garbage");
     285              :         } else {
     286            0 :             tracing::debug!("Timeline {ttid} is active");
     287            0 :             active_timelines.push(ttid);
     288            0 :             garbage.active_timeline_count = active_timelines.len();
     289              :         }
     290              :     }
     291              : 
     292            0 :     let num_garbage_timelines = garbage
     293            0 :         .items
     294            0 :         .iter()
     295            0 :         .filter(|g| matches!(g.entity, GarbageEntity::Timeline(_)))
     296            0 :         .count();
     297            0 :     tracing::info!(
     298            0 :         "Found {}/{} garbage timelines in active tenants",
     299            0 :         num_garbage_timelines,
     300            0 :         active_timelines.len(),
     301              :     );
     302              : 
     303            0 :     Ok(garbage)
     304            0 : }
     305              : 
     306            0 : #[derive(clap::ValueEnum, Debug, Clone)]
     307              : pub enum PurgeMode {
     308              :     /// The safest mode: only delete tenants that were explicitly reported as deleted
     309              :     /// by Console API.
     310              :     DeletedOnly,
     311              : 
     312              :     /// Delete all garbage tenants, including those which are only presumed to be deleted,
     313              :     /// because the Console API could not find them.
     314              :     DeletedAndMissing,
     315              : }
     316              : 
     317              : impl std::fmt::Display for PurgeMode {
     318            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     319            0 :         match self {
     320            0 :             PurgeMode::DeletedOnly => write!(f, "deleted-only"),
     321            0 :             PurgeMode::DeletedAndMissing => write!(f, "deleted-and-missing"),
     322              :         }
     323            0 :     }
     324              : }
     325              : 
     326            0 : pub async fn get_tenant_objects(
     327            0 :     s3_client: &Arc<Client>,
     328            0 :     target: RootTarget,
     329            0 :     tenant_shard_id: TenantShardId,
     330            0 : ) -> anyhow::Result<Vec<ObjectIdentifier>> {
     331            0 :     tracing::debug!("Listing objects in tenant {tenant_shard_id}");
     332              :     // TODO: apply extra validation based on object modification time.  Don't purge
     333              :     // tenants where any timeline's index_part.json has been touched recently.
     334              : 
     335            0 :     let mut tenant_root = target.tenant_root(&tenant_shard_id);
     336            0 : 
     337            0 :     // Remove delimiter, so that object listing lists all keys in the prefix and not just
     338            0 :     // common prefixes.
     339            0 :     tenant_root.delimiter = String::new();
     340            0 : 
     341            0 :     let key_stream = stream_listing(s3_client, &tenant_root);
     342            0 :     key_stream.try_collect().await
     343            0 : }
     344              : 
     345            0 : pub async fn get_timeline_objects(
     346            0 :     s3_client: &Arc<Client>,
     347            0 :     target: RootTarget,
     348            0 :     ttid: TenantShardTimelineId,
     349            0 : ) -> anyhow::Result<Vec<ObjectIdentifier>> {
     350            0 :     tracing::debug!("Listing objects in timeline {ttid}");
     351            0 :     let mut timeline_root = target.timeline_root(&ttid);
     352            0 : 
     353            0 :     // TODO: apply extra validation based on object modification time.  Don't purge
     354            0 :     // timelines whose index_part.json has been touched recently.
     355            0 : 
     356            0 :     // Remove delimiter, so that object listing lists all keys in the prefix and not just
     357            0 :     // common prefixes.
     358            0 :     timeline_root.delimiter = String::new();
     359            0 :     let key_stream = stream_listing(s3_client, &timeline_root);
     360            0 : 
     361            0 :     key_stream.try_collect().await
     362            0 : }
     363              : 
     364              : const MAX_KEYS_PER_DELETE: usize = 1000;
     365              : 
     366              : /// Drain a buffer of keys into DeleteObjects requests
     367              : ///
     368              : /// If `drain` is true, drains keys completely; otherwise stops when <
     369              : /// MAX_KEYS_PER_DELETE keys are left.
     370              : /// `num_deleted` returns number of deleted keys.
     371            0 : async fn do_delete(
     372            0 :     s3_client: &Arc<Client>,
     373            0 :     bucket_name: &str,
     374            0 :     keys: &mut Vec<ObjectIdentifier>,
     375            0 :     dry_run: bool,
     376            0 :     drain: bool,
     377            0 :     progress_tracker: &mut DeletionProgressTracker,
     378            0 : ) -> anyhow::Result<()> {
     379            0 :     while (!keys.is_empty() && drain) || (keys.len() >= MAX_KEYS_PER_DELETE) {
     380            0 :         let request_keys =
     381            0 :             keys.split_off(keys.len() - (std::cmp::min(MAX_KEYS_PER_DELETE, keys.len())));
     382            0 :         let num_deleted = request_keys.len();
     383            0 :         if dry_run {
     384            0 :             tracing::info!("Dry-run deletion of objects: ");
     385            0 :             for k in request_keys {
     386            0 :                 tracing::info!("  {k:?}");
     387              :             }
     388              :         } else {
     389            0 :             let delete_request = s3_client
     390            0 :                 .delete_objects()
     391            0 :                 .bucket(bucket_name)
     392            0 :                 .delete(Delete::builder().set_objects(Some(request_keys)).build()?);
     393            0 :             delete_request
     394            0 :                 .send()
     395            0 :                 .await
     396            0 :                 .context("DeleteObjects request")?;
     397            0 :             progress_tracker.register(num_deleted);
     398              :         }
     399              :     }
     400              : 
     401            0 :     Ok(())
     402            0 : }
     403              : 
     404              : /// Simple tracker reporting each 10k deleted keys.
     405              : #[derive(Default)]
     406              : struct DeletionProgressTracker {
     407              :     num_deleted: usize,
     408              :     last_reported_num_deleted: usize,
     409              : }
     410              : 
     411              : impl DeletionProgressTracker {
     412            0 :     fn register(&mut self, n: usize) {
     413            0 :         self.num_deleted += n;
     414            0 :         if self.num_deleted - self.last_reported_num_deleted > 10000 {
     415            0 :             tracing::info!("progress: deleted {} keys", self.num_deleted);
     416            0 :             self.last_reported_num_deleted = self.num_deleted;
     417            0 :         }
     418            0 :     }
     419              : }
     420              : 
     421            0 : pub async fn purge_garbage(
     422            0 :     input_path: String,
     423            0 :     mode: PurgeMode,
     424            0 :     dry_run: bool,
     425            0 : ) -> anyhow::Result<()> {
     426            0 :     let list_bytes = tokio::fs::read(&input_path).await?;
     427            0 :     let garbage_list = serde_json::from_slice::<GarbageList>(&list_bytes)?;
     428            0 :     tracing::info!(
     429            0 :         "Loaded {} items in garbage list from {}",
     430            0 :         garbage_list.items.len(),
     431              :         input_path
     432              :     );
     433              : 
     434            0 :     let (s3_client, target) =
     435            0 :         init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind)?;
     436              : 
     437              :     // Sanity checks on the incoming list
     438            0 :     if garbage_list.active_tenant_count == 0 {
     439            0 :         anyhow::bail!("Refusing to purge a garbage list that reports 0 active tenants");
     440            0 :     }
     441            0 :     if garbage_list
     442            0 :         .items
     443            0 :         .iter()
     444            0 :         .any(|g| matches!(g.entity, GarbageEntity::Timeline(_)))
     445            0 :         && garbage_list.active_timeline_count == 0
     446              :     {
     447            0 :         anyhow::bail!("Refusing to purge a garbage list containing garbage timelines that reports 0 active timelines");
     448            0 :     }
     449            0 : 
     450            0 :     let filtered_items = garbage_list
     451            0 :         .items
     452            0 :         .iter()
     453            0 :         .filter(|i| match (&mode, &i.reason) {
     454            0 :             (PurgeMode::DeletedAndMissing, _) => true,
     455            0 :             (PurgeMode::DeletedOnly, GarbageReason::DeletedInConsole) => true,
     456            0 :             (PurgeMode::DeletedOnly, GarbageReason::MissingInConsole) => false,
     457            0 :         });
     458            0 : 
     459            0 :     tracing::info!(
     460            0 :         "Filtered down to {} garbage items based on mode {}",
     461            0 :         garbage_list.items.len(),
     462              :         mode
     463              :     );
     464              : 
     465            0 :     let items = tokio_stream::iter(filtered_items.map(Ok));
     466            0 :     let get_objects_results = items.map_ok(|i| {
     467            0 :         let s3_client = s3_client.clone();
     468            0 :         let target = target.clone();
     469            0 :         async move {
     470            0 :             match i.entity {
     471            0 :                 GarbageEntity::Tenant(tenant_id) => {
     472            0 :                     get_tenant_objects(&s3_client, target, tenant_id).await
     473              :                 }
     474            0 :                 GarbageEntity::Timeline(ttid) => {
     475            0 :                     get_timeline_objects(&s3_client, target, ttid).await
     476              :                 }
     477              :             }
     478            0 :         }
     479            0 :     });
     480            0 :     let mut get_objects_results =
     481            0 :         std::pin::pin!(get_objects_results.try_buffer_unordered(S3_CONCURRENCY));
     482            0 : 
     483            0 :     let mut objects_to_delete = Vec::new();
     484            0 :     let mut progress_tracker = DeletionProgressTracker::default();
     485            0 :     while let Some(result) = get_objects_results.next().await {
     486            0 :         let mut object_list = result?;
     487            0 :         objects_to_delete.append(&mut object_list);
     488            0 :         if objects_to_delete.len() >= MAX_KEYS_PER_DELETE {
     489            0 :             do_delete(
     490            0 :                 &s3_client,
     491            0 :                 &garbage_list.bucket_config.bucket,
     492            0 :                 &mut objects_to_delete,
     493            0 :                 dry_run,
     494            0 :                 false,
     495            0 :                 &mut progress_tracker,
     496            0 :             )
     497            0 :             .await?;
     498            0 :         }
     499              :     }
     500              : 
     501            0 :     do_delete(
     502            0 :         &s3_client,
     503            0 :         &garbage_list.bucket_config.bucket,
     504            0 :         &mut objects_to_delete,
     505            0 :         dry_run,
     506            0 :         true,
     507            0 :         &mut progress_tracker,
     508            0 :     )
     509            0 :     .await?;
     510              : 
     511            0 :     tracing::info!("{} keys deleted in total", progress_tracker.num_deleted);
     512              : 
     513            0 :     Ok(())
     514            0 : }
        

Generated by: LCOV version 2.1-beta