LCOV - code coverage report
Current view: top level - storage_scrubber/src - pageserver_physical_gc.rs (source / functions) Coverage Total Hit
Test: 42f947419473a288706e86ecdf7c2863d760d5d7.info Lines: 0.0 % 400 0
Test Date: 2024-08-02 21:34:27 Functions: 0.0 % 29 0

            Line data    Source code
       1              : use std::collections::{BTreeMap, HashMap};
       2              : use std::sync::Arc;
       3              : use std::time::{Duration, SystemTime};
       4              : 
       5              : use crate::checks::{list_timeline_blobs, BlobDataParseResult};
       6              : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
       7              : use crate::{
       8              :     init_remote, BucketConfig, ControllerClientConfig, NodeKind, RootTarget, TenantShardTimelineId,
       9              : };
      10              : use aws_sdk_s3::Client;
      11              : use futures_util::{StreamExt, TryStreamExt};
      12              : use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
      13              : use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
      14              : use pageserver::tenant::storage_layer::LayerName;
      15              : use pageserver::tenant::IndexPart;
      16              : use pageserver_api::controller_api::TenantDescribeResponse;
      17              : use pageserver_api::shard::{ShardIndex, TenantShardId};
      18              : use remote_storage::RemotePath;
      19              : use reqwest::Method;
      20              : use serde::Serialize;
      21              : use storage_controller_client::control_api;
      22              : use tracing::{info_span, Instrument};
      23              : use utils::generation::Generation;
      24              : use utils::id::{TenantId, TenantTimelineId};
      25              : 
      26              : #[derive(Serialize, Default)]
      27              : pub struct GcSummary {
      28              :     indices_deleted: usize,
      29              :     remote_storage_errors: usize,
      30              :     controller_api_errors: usize,
      31              :     ancestor_layers_deleted: usize,
      32              : }
      33              : 
      34              : impl GcSummary {
      35            0 :     fn merge(&mut self, other: Self) {
      36            0 :         let Self {
      37            0 :             indices_deleted,
      38            0 :             remote_storage_errors,
      39            0 :             ancestor_layers_deleted,
      40            0 :             controller_api_errors,
      41            0 :         } = other;
      42            0 : 
      43            0 :         self.indices_deleted += indices_deleted;
      44            0 :         self.remote_storage_errors += remote_storage_errors;
      45            0 :         self.ancestor_layers_deleted += ancestor_layers_deleted;
      46            0 :         self.controller_api_errors += controller_api_errors;
      47            0 :     }
      48              : }
      49              : 
      50            0 : #[derive(clap::ValueEnum, Debug, Clone, Copy)]
      51              : pub enum GcMode {
      52              :     // Delete nothing
      53              :     DryRun,
      54              : 
      55              :     // Enable only removing old-generation indices
      56              :     IndicesOnly,
      57              : 
      58              :     // Enable all forms of GC
      59              :     Full,
      60              : }
      61              : 
      62              : impl std::fmt::Display for GcMode {
      63            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      64            0 :         match self {
      65            0 :             GcMode::DryRun => write!(f, "dry-run"),
      66            0 :             GcMode::IndicesOnly => write!(f, "indices-only"),
      67            0 :             GcMode::Full => write!(f, "full"),
      68              :         }
      69            0 :     }
      70              : }
      71              : 
      72              : mod refs {
      73              :     use super::*;
      74              :     // Map of cross-shard layer references, giving a refcount for each layer in each shard that is referenced by some other
      75              :     // shard in the same tenant.  This is sparse!  The vast majority of timelines will have no cross-shard refs, and those that
      76              :     // do have cross shard refs should eventually drop most of them via compaction.
      77              :     //
      78              :     // In our inner map type, the TTID in the key is shard-agnostic, and the ShardIndex in the value refers to the _ancestor
      79              :     // which is is referenced_.
      80              :     #[derive(Default)]
      81              :     pub(super) struct AncestorRefs(
      82              :         BTreeMap<TenantTimelineId, HashMap<(ShardIndex, LayerName), usize>>,
      83              :     );
      84              : 
      85              :     impl AncestorRefs {
      86              :         /// Insert references for layers discovered in a particular shard-timeline that refer to an ancestral shard-timeline.
      87            0 :         pub(super) fn update(
      88            0 :             &mut self,
      89            0 :             ttid: TenantShardTimelineId,
      90            0 :             layers: Vec<(LayerName, LayerFileMetadata)>,
      91            0 :         ) {
      92            0 :             let ttid_refs = self.0.entry(ttid.as_tenant_timeline_id()).or_default();
      93            0 :             for (layer_name, layer_metadata) in layers {
      94            0 :                 // Increment refcount of this layer in the ancestor shard
      95            0 :                 *(ttid_refs
      96            0 :                     .entry((layer_metadata.shard, layer_name))
      97            0 :                     .or_default()) += 1;
      98            0 :             }
      99            0 :         }
     100              : 
     101              :         /// For a particular TTID, return the map of all ancestor layers referenced by a descendent to their refcount
     102              :         ///
     103              :         /// The `ShardIndex` in the result's key is the index of the _ancestor_, not the descendent.
     104            0 :         pub(super) fn get_ttid_refcounts(
     105            0 :             &self,
     106            0 :             ttid: &TenantTimelineId,
     107            0 :         ) -> Option<&HashMap<(ShardIndex, LayerName), usize>> {
     108            0 :             self.0.get(ttid)
     109            0 :         }
     110              :     }
     111              : }
     112              : 
     113              : use refs::AncestorRefs;
     114              : 
     115              : // As we see shards for a tenant, acccumulate knowledge needed for cross-shard GC:
     116              : // - Are there any ancestor shards?
     117              : // - Are there any refs to ancestor shards' layers?
     118              : #[derive(Default)]
     119              : struct TenantRefAccumulator {
     120              :     shards_seen: HashMap<TenantId, Vec<ShardIndex>>,
     121              : 
     122              :     // For each shard that has refs to an ancestor's layers, the set of ancestor layers referred to
     123              :     ancestor_ref_shards: AncestorRefs,
     124              : }
     125              : 
     126              : impl TenantRefAccumulator {
     127            0 :     fn update(&mut self, ttid: TenantShardTimelineId, index_part: &IndexPart) {
     128            0 :         let this_shard_idx = ttid.tenant_shard_id.to_index();
     129            0 :         (*self
     130            0 :             .shards_seen
     131            0 :             .entry(ttid.tenant_shard_id.tenant_id)
     132            0 :             .or_default())
     133            0 :         .push(this_shard_idx);
     134            0 : 
     135            0 :         let mut ancestor_refs = Vec::new();
     136            0 :         for (layer_name, layer_metadata) in &index_part.layer_metadata {
     137            0 :             if layer_metadata.shard != this_shard_idx {
     138            0 :                 // This is a reference from this shard to a layer in an ancestor shard: we must track this
     139            0 :                 // as a marker to not GC this layer from the parent.
     140            0 :                 ancestor_refs.push((layer_name.clone(), layer_metadata.clone()));
     141            0 :             }
     142              :         }
     143              : 
     144            0 :         if !ancestor_refs.is_empty() {
     145            0 :             tracing::info!(%ttid, "Found {} ancestor refs", ancestor_refs.len());
     146            0 :             self.ancestor_ref_shards.update(ttid, ancestor_refs);
     147            0 :         }
     148            0 :     }
     149              : 
     150              :     /// Consume Self and return a vector of ancestor tenant shards that should be GC'd, and map of referenced ancestor layers to preserve
     151            0 :     async fn into_gc_ancestors(
     152            0 :         self,
     153            0 :         controller_client: &control_api::Client,
     154            0 :         summary: &mut GcSummary,
     155            0 :     ) -> (Vec<TenantShardId>, AncestorRefs) {
     156            0 :         let mut ancestors_to_gc = Vec::new();
     157            0 :         for (tenant_id, mut shard_indices) in self.shards_seen {
     158              :             // Find the highest shard count
     159            0 :             let latest_count = shard_indices
     160            0 :                 .iter()
     161            0 :                 .map(|i| i.shard_count)
     162            0 :                 .max()
     163            0 :                 .expect("Always at least one shard");
     164            0 : 
     165            0 :             let (mut latest_shards, ancestor_shards) = {
     166            0 :                 let at =
     167            0 :                     itertools::partition(&mut shard_indices, |i| i.shard_count == latest_count);
     168            0 :                 (shard_indices[0..at].to_owned(), &shard_indices[at..])
     169            0 :             };
     170            0 :             // Sort shards, as we will later compare them with a sorted list from the controller
     171            0 :             latest_shards.sort();
     172            0 : 
     173            0 :             // Check that we have a complete view of the latest shard count: this should always be the case unless we happened
     174            0 :             // to scan the S3 bucket halfway through a shard split.
     175            0 :             if latest_shards.len() != latest_count.count() as usize {
     176              :                 // This should be extremely rare, so we warn on it.
     177            0 :                 tracing::warn!(%tenant_id, "Missed some shards at count {:?}", latest_count);
     178            0 :                 continue;
     179            0 :             }
     180            0 : 
     181            0 :             // Check if we have any non-latest-count shards
     182            0 :             if ancestor_shards.is_empty() {
     183            0 :                 tracing::debug!(%tenant_id, "No ancestor shards to clean up");
     184            0 :                 continue;
     185            0 :             }
     186            0 : 
     187            0 :             // Based on S3 view, this tenant looks like it might have some ancestor shard work to do.  We
     188            0 :             // must only do this work if the tenant is not currently being split: otherwise, it is not safe
     189            0 :             // to GC ancestors, because if the split fails then the controller will try to attach ancestor
     190            0 :             // shards again.
     191            0 :             match controller_client
     192            0 :                 .dispatch::<(), TenantDescribeResponse>(
     193            0 :                     Method::GET,
     194            0 :                     format!("control/v1/tenant/{tenant_id}"),
     195            0 :                     None,
     196            0 :                 )
     197            0 :                 .await
     198              :             {
     199            0 :                 Err(e) => {
     200            0 :                     // We were not able to learn the latest shard split state from the controller, so we will not
     201            0 :                     // do ancestor GC on this tenant.
     202            0 :                     tracing::warn!(%tenant_id, "Failed to query storage controller, will not do ancestor GC: {e}");
     203            0 :                     summary.controller_api_errors += 1;
     204            0 :                     continue;
     205              :                 }
     206            0 :                 Ok(desc) => {
     207            0 :                     // We expect to see that the latest shard count matches the one we saw in S3, and that none
     208            0 :                     // of the shards indicate splitting in progress.
     209            0 : 
     210            0 :                     let controller_indices: Vec<ShardIndex> = desc
     211            0 :                         .shards
     212            0 :                         .iter()
     213            0 :                         .map(|s| s.tenant_shard_id.to_index())
     214            0 :                         .collect();
     215            0 :                     if controller_indices != latest_shards {
     216            0 :                         tracing::info!(%tenant_id, "Latest shards seen in S3 ({latest_shards:?}) don't match controller state ({controller_indices:?})");
     217            0 :                         continue;
     218            0 :                     }
     219            0 : 
     220            0 :                     if desc.shards.iter().any(|s| s.is_splitting) {
     221            0 :                         tracing::info!(%tenant_id, "One or more shards is currently splitting");
     222            0 :                         continue;
     223            0 :                     }
     224            0 : 
     225            0 :                     // This shouldn't be too noisy, because we only log this for tenants that have some ancestral refs.
     226            0 :                     tracing::info!(%tenant_id, "Validated state with controller: {desc:?}");
     227              :                 }
     228              :             }
     229              : 
     230              :             // GC ancestor shards
     231            0 :             for ancestor_shard in ancestor_shards.iter().map(|idx| TenantShardId {
     232            0 :                 tenant_id,
     233            0 :                 shard_count: idx.shard_count,
     234            0 :                 shard_number: idx.shard_number,
     235            0 :             }) {
     236            0 :                 ancestors_to_gc.push(ancestor_shard);
     237            0 :             }
     238              :         }
     239              : 
     240            0 :         (ancestors_to_gc, self.ancestor_ref_shards)
     241            0 :     }
     242              : }
     243              : 
     244            0 : async fn is_old_enough(
     245            0 :     s3_client: &Client,
     246            0 :     bucket_config: &BucketConfig,
     247            0 :     min_age: &Duration,
     248            0 :     key: &str,
     249            0 :     summary: &mut GcSummary,
     250            0 : ) -> bool {
     251              :     // Validation: we will only GC indices & layers after a time threshold (e.g. one week) so that during an incident
     252              :     // it is easier to read old data for analysis, and easier to roll back shard splits without having to un-delete any objects.
     253            0 :     let age: Duration = match s3_client
     254            0 :         .head_object()
     255            0 :         .bucket(&bucket_config.bucket)
     256            0 :         .key(key)
     257            0 :         .send()
     258            0 :         .await
     259              :     {
     260            0 :         Ok(response) => match response.last_modified {
     261              :             None => {
     262            0 :                 tracing::warn!("Missing last_modified");
     263            0 :                 summary.remote_storage_errors += 1;
     264            0 :                 return false;
     265              :             }
     266            0 :             Some(last_modified) => match SystemTime::try_from(last_modified).map(|t| t.elapsed()) {
     267            0 :                 Ok(Ok(e)) => e,
     268              :                 Err(_) | Ok(Err(_)) => {
     269            0 :                     tracing::warn!("Bad last_modified time: {last_modified:?}");
     270            0 :                     return false;
     271              :                 }
     272              :             },
     273              :         },
     274            0 :         Err(e) => {
     275            0 :             tracing::warn!("Failed to HEAD {key}: {e}");
     276            0 :             summary.remote_storage_errors += 1;
     277            0 :             return false;
     278              :         }
     279              :     };
     280            0 :     let old_enough = &age > min_age;
     281            0 : 
     282            0 :     if !old_enough {
     283            0 :         tracing::info!(
     284            0 :             "Skipping young object {} < {}",
     285            0 :             humantime::format_duration(age),
     286            0 :             humantime::format_duration(*min_age)
     287              :         );
     288            0 :     }
     289              : 
     290            0 :     old_enough
     291            0 : }
     292              : 
     293            0 : async fn maybe_delete_index(
     294            0 :     s3_client: &Client,
     295            0 :     bucket_config: &BucketConfig,
     296            0 :     min_age: &Duration,
     297            0 :     latest_gen: Generation,
     298            0 :     key: &str,
     299            0 :     mode: GcMode,
     300            0 :     summary: &mut GcSummary,
     301            0 : ) {
     302            0 :     // Validation: we will only delete things that parse cleanly
     303            0 :     let basename = key.rsplit_once('/').unwrap().1;
     304            0 :     let candidate_generation =
     305            0 :         match parse_remote_index_path(RemotePath::from_string(basename).unwrap()) {
     306            0 :             Some(g) => g,
     307              :             None => {
     308            0 :                 if basename == IndexPart::FILE_NAME {
     309              :                     // A legacy pre-generation index
     310            0 :                     Generation::none()
     311              :                 } else {
     312              :                     // A strange key: we will not delete this because we don't understand it.
     313            0 :                     tracing::warn!("Bad index key");
     314            0 :                     return;
     315              :                 }
     316              :             }
     317              :         };
     318              : 
     319              :     // Validation: we will only delete indices more than one generation old, to avoid interfering
     320              :     // in typical migrations, even if they are very long running.
     321            0 :     if candidate_generation >= latest_gen {
     322              :         // This shouldn't happen: when we loaded metadata, it should have selected the latest
     323              :         // generation already, and only populated [`S3TimelineBlobData::unused_index_keys`]
     324              :         // with older generations.
     325            0 :         tracing::warn!("Deletion candidate is >= latest generation, this is a bug!");
     326            0 :         return;
     327            0 :     } else if candidate_generation.next() == latest_gen {
     328              :         // Skip deleting the latest-1th generation's index.
     329            0 :         return;
     330            0 :     }
     331            0 : 
     332            0 :     if !is_old_enough(s3_client, bucket_config, min_age, key, summary).await {
     333            0 :         return;
     334            0 :     }
     335              : 
     336            0 :     if matches!(mode, GcMode::DryRun) {
     337            0 :         tracing::info!("Dry run: would delete this key");
     338            0 :         return;
     339            0 :     }
     340            0 : 
     341            0 :     // All validations passed: erase the object
     342            0 :     match s3_client
     343            0 :         .delete_object()
     344            0 :         .bucket(&bucket_config.bucket)
     345            0 :         .key(key)
     346            0 :         .send()
     347            0 :         .await
     348              :     {
     349              :         Ok(_) => {
     350            0 :             tracing::info!("Successfully deleted index");
     351            0 :             summary.indices_deleted += 1;
     352              :         }
     353            0 :         Err(e) => {
     354            0 :             tracing::warn!("Failed to delete index: {e}");
     355            0 :             summary.remote_storage_errors += 1;
     356              :         }
     357              :     }
     358            0 : }
     359              : 
     360              : #[allow(clippy::too_many_arguments)]
     361            0 : async fn gc_ancestor(
     362            0 :     s3_client: &Client,
     363            0 :     bucket_config: &BucketConfig,
     364            0 :     root_target: &RootTarget,
     365            0 :     min_age: &Duration,
     366            0 :     ancestor: TenantShardId,
     367            0 :     refs: &AncestorRefs,
     368            0 :     mode: GcMode,
     369            0 :     summary: &mut GcSummary,
     370            0 : ) -> anyhow::Result<()> {
     371              :     // Scan timelines in the ancestor
     372            0 :     let timelines = stream_tenant_timelines(s3_client, root_target, ancestor).await?;
     373            0 :     let mut timelines = std::pin::pin!(timelines);
     374              : 
     375              :     // Build a list of keys to retain
     376              : 
     377            0 :     while let Some(ttid) = timelines.next().await {
     378            0 :         let ttid = ttid?;
     379              : 
     380            0 :         let data = list_timeline_blobs(s3_client, ttid, root_target).await?;
     381              : 
     382            0 :         let s3_layers = match data.blob_data {
     383              :             BlobDataParseResult::Parsed {
     384              :                 index_part: _,
     385              :                 index_part_generation: _,
     386            0 :                 s3_layers,
     387            0 :             } => s3_layers,
     388              :             BlobDataParseResult::Relic => {
     389              :                 // Post-deletion tenant location: don't try and GC it.
     390            0 :                 continue;
     391              :             }
     392            0 :             BlobDataParseResult::Incorrect(reasons) => {
     393            0 :                 // Our primary purpose isn't to report on bad data, but log this rather than skipping silently
     394            0 :                 tracing::warn!(
     395            0 :                     "Skipping ancestor GC for timeline {ttid}, bad metadata: {reasons:?}"
     396              :                 );
     397            0 :                 continue;
     398              :             }
     399              :         };
     400              : 
     401            0 :         let ttid_refs = refs.get_ttid_refcounts(&ttid.as_tenant_timeline_id());
     402            0 :         let ancestor_shard_index = ttid.tenant_shard_id.to_index();
     403              : 
     404            0 :         for (layer_name, layer_gen) in s3_layers {
     405            0 :             let ref_count = ttid_refs
     406            0 :                 .and_then(|m| m.get(&(ancestor_shard_index, layer_name.clone())))
     407            0 :                 .copied()
     408            0 :                 .unwrap_or(0);
     409            0 : 
     410            0 :             if ref_count > 0 {
     411            0 :                 tracing::debug!(%ttid, "Ancestor layer {layer_name}  has {ref_count} refs");
     412            0 :                 continue;
     413            0 :             }
     414            0 : 
     415            0 :             tracing::info!(%ttid, "Ancestor layer {layer_name} is not referenced");
     416              : 
     417              :             // Build the key for the layer we are considering deleting
     418            0 :             let key = root_target.absolute_key(&remote_layer_path(
     419            0 :                 &ttid.tenant_shard_id.tenant_id,
     420            0 :                 &ttid.timeline_id,
     421            0 :                 ancestor_shard_index,
     422            0 :                 &layer_name,
     423            0 :                 layer_gen,
     424            0 :             ));
     425            0 : 
     426            0 :             // We apply a time threshold to GCing objects that are un-referenced: this preserves our ability
     427            0 :             // to roll back a shard split if we have to, by avoiding deleting ancestor layers right away
     428            0 :             if !is_old_enough(s3_client, bucket_config, min_age, &key, summary).await {
     429            0 :                 continue;
     430            0 :             }
     431              : 
     432            0 :             if !matches!(mode, GcMode::Full) {
     433            0 :                 tracing::info!("Dry run: would delete key {key}");
     434            0 :                 continue;
     435            0 :             }
     436            0 : 
     437            0 :             // All validations passed: erase the object
     438            0 :             match s3_client
     439            0 :                 .delete_object()
     440            0 :                 .bucket(&bucket_config.bucket)
     441            0 :                 .key(&key)
     442            0 :                 .send()
     443            0 :                 .await
     444              :             {
     445              :                 Ok(_) => {
     446            0 :                     tracing::info!("Successfully deleted unreferenced ancestor layer {key}");
     447            0 :                     summary.ancestor_layers_deleted += 1;
     448              :                 }
     449            0 :                 Err(e) => {
     450            0 :                     tracing::warn!("Failed to delete layer {key}: {e}");
     451            0 :                     summary.remote_storage_errors += 1;
     452              :                 }
     453              :             }
     454              :         }
     455              : 
     456              :         // TODO: if all the layers are gone, clean up the whole timeline dir (remove index)
     457              :     }
     458              : 
     459            0 :     Ok(())
     460            0 : }
     461              : 
     462              : /// Physical garbage collection: removing unused S3 objects.  This is distinct from the garbage collection
     463              : /// done inside the pageserver, which operates at a higher level (keys, layers).  This type of garbage collection
     464              : /// is about removing:
     465              : /// - Objects that were uploaded but never referenced in the remote index (e.g. because of a shutdown between
     466              : ///   uploading a layer and uploading an index)
     467              : /// - Index objects from historic generations
     468              : ///
     469              : /// This type of GC is not necessary for correctness: rather it serves to reduce wasted storage capacity, and
     470              : /// make sure that object listings don't get slowed down by large numbers of garbage objects.
     471            0 : pub async fn pageserver_physical_gc(
     472            0 :     bucket_config: BucketConfig,
     473            0 :     controller_client_conf: Option<ControllerClientConfig>,
     474            0 :     tenant_shard_ids: Vec<TenantShardId>,
     475            0 :     min_age: Duration,
     476            0 :     mode: GcMode,
     477            0 : ) -> anyhow::Result<GcSummary> {
     478            0 :     let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
     479              : 
     480            0 :     let tenants = if tenant_shard_ids.is_empty() {
     481            0 :         futures::future::Either::Left(stream_tenants(&s3_client, &target))
     482              :     } else {
     483            0 :         futures::future::Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok)))
     484              :     };
     485              : 
     486              :     // How many tenants to process in parallel.  We need to be mindful of pageservers
     487              :     // accessing the same per tenant prefixes, so use a lower setting than pageservers.
     488              :     const CONCURRENCY: usize = 32;
     489              : 
     490              :     // Accumulate information about each tenant for cross-shard GC step we'll do at the end
     491            0 :     let accumulator = Arc::new(std::sync::Mutex::new(TenantRefAccumulator::default()));
     492            0 : 
     493            0 :     // Generate a stream of TenantTimelineId
     494            0 :     let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
     495            0 :     let timelines = timelines.try_buffered(CONCURRENCY);
     496            0 :     let timelines = timelines.try_flatten();
     497            0 : 
     498            0 :     // Generate a stream of S3TimelineBlobData
     499            0 :     async fn gc_timeline(
     500            0 :         s3_client: &Client,
     501            0 :         bucket_config: &BucketConfig,
     502            0 :         min_age: &Duration,
     503            0 :         target: &RootTarget,
     504            0 :         mode: GcMode,
     505            0 :         ttid: TenantShardTimelineId,
     506            0 :         accumulator: &Arc<std::sync::Mutex<TenantRefAccumulator>>,
     507            0 :     ) -> anyhow::Result<GcSummary> {
     508            0 :         let mut summary = GcSummary::default();
     509            0 :         let data = list_timeline_blobs(s3_client, ttid, target).await?;
     510            0 : 
     511            0 :         let (index_part, latest_gen, candidates) = match &data.blob_data {
     512            0 :             BlobDataParseResult::Parsed {
     513            0 :                 index_part,
     514            0 :                 index_part_generation,
     515            0 :                 s3_layers: _s3_layers,
     516            0 :             } => (index_part, *index_part_generation, data.unused_index_keys),
     517            0 :             BlobDataParseResult::Relic => {
     518            0 :                 // Post-deletion tenant location: don't try and GC it.
     519            0 :                 return Ok(summary);
     520            0 :             }
     521            0 :             BlobDataParseResult::Incorrect(reasons) => {
     522            0 :                 // Our primary purpose isn't to report on bad data, but log this rather than skipping silently
     523            0 :                 tracing::warn!("Skipping timeline {ttid}, bad metadata: {reasons:?}");
     524            0 :                 return Ok(summary);
     525            0 :             }
     526            0 :         };
     527            0 : 
     528            0 :         accumulator.lock().unwrap().update(ttid, index_part);
     529            0 : 
     530            0 :         for key in candidates {
     531            0 :             maybe_delete_index(
     532            0 :                 s3_client,
     533            0 :                 bucket_config,
     534            0 :                 min_age,
     535            0 :                 latest_gen,
     536            0 :                 &key,
     537            0 :                 mode,
     538            0 :                 &mut summary,
     539            0 :             )
     540            0 :             .instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, key))
     541            0 :             .await;
     542            0 :         }
     543            0 : 
     544            0 :         Ok(summary)
     545            0 :     }
     546            0 : 
     547            0 :     let mut summary = GcSummary::default();
     548            0 : 
     549            0 :     // Drain futures for per-shard GC, populating accumulator as a side effect
     550            0 :     {
     551            0 :         let timelines = timelines.map_ok(|ttid| {
     552            0 :             gc_timeline(
     553            0 :                 &s3_client,
     554            0 :                 &bucket_config,
     555            0 :                 &min_age,
     556            0 :                 &target,
     557            0 :                 mode,
     558            0 :                 ttid,
     559            0 :                 &accumulator,
     560            0 :             )
     561            0 :         });
     562            0 :         let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
     563              : 
     564            0 :         while let Some(i) = timelines.next().await {
     565            0 :             summary.merge(i?);
     566              :         }
     567              :     }
     568              : 
     569              :     // Execute cross-shard GC, using the accumulator's full view of all the shards built in the per-shard GC
     570            0 :     let Some(controller_client) = controller_client_conf.map(|c| c.build_client()) else {
     571            0 :         tracing::info!("Skipping ancestor layer GC, because no `--controller-api` was specified");
     572            0 :         return Ok(summary);
     573              :     };
     574              : 
     575            0 :     let (ancestor_shards, ancestor_refs) = Arc::into_inner(accumulator)
     576            0 :         .unwrap()
     577            0 :         .into_inner()
     578            0 :         .unwrap()
     579            0 :         .into_gc_ancestors(&controller_client, &mut summary)
     580            0 :         .await;
     581              : 
     582            0 :     for ancestor_shard in ancestor_shards {
     583            0 :         gc_ancestor(
     584            0 :             &s3_client,
     585            0 :             &bucket_config,
     586            0 :             &target,
     587            0 :             &min_age,
     588            0 :             ancestor_shard,
     589            0 :             &ancestor_refs,
     590            0 :             mode,
     591            0 :             &mut summary,
     592            0 :         )
     593            0 :         .instrument(info_span!("gc_ancestor", %ancestor_shard))
     594            0 :         .await?;
     595              :     }
     596              : 
     597            0 :     Ok(summary)
     598            0 : }
        

Generated by: LCOV version 2.1-beta