LCOV - code coverage report
Current view: top level - storage_scrubber/src - pageserver_physical_gc.rs (source / functions) Coverage Total Hit
Test: 4f58e98c51285c7fa348e0b410c88a10caf68ad2.info Lines: 0.0 % 559 0
Test Date: 2025-01-07 20:58:07 Functions: 0.0 % 41 0

            Line data    Source code
       1              : use std::collections::{BTreeMap, BTreeSet, HashMap};
       2              : use std::sync::Arc;
       3              : use std::time::Duration;
       4              : 
       5              : use crate::checks::{
       6              :     list_tenant_manifests, list_timeline_blobs, BlobDataParseResult, ListTenantManifestResult,
       7              :     RemoteTenantManifestInfo,
       8              : };
       9              : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
      10              : use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, MAX_RETRIES};
      11              : use futures_util::{StreamExt, TryStreamExt};
      12              : use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
      13              : use pageserver::tenant::remote_timeline_client::manifest::OffloadedTimelineManifest;
      14              : use pageserver::tenant::remote_timeline_client::{
      15              :     parse_remote_index_path, parse_remote_tenant_manifest_path, remote_layer_path,
      16              : };
      17              : use pageserver::tenant::storage_layer::LayerName;
      18              : use pageserver::tenant::IndexPart;
      19              : use pageserver_api::controller_api::TenantDescribeResponse;
      20              : use pageserver_api::shard::{ShardIndex, TenantShardId};
      21              : use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath};
      22              : use reqwest::Method;
      23              : use serde::Serialize;
      24              : use storage_controller_client::control_api;
      25              : use tokio_util::sync::CancellationToken;
      26              : use tracing::{info_span, Instrument};
      27              : use utils::backoff;
      28              : use utils::generation::Generation;
      29              : use utils::id::{TenantId, TenantTimelineId};
      30              : 
      31              : #[derive(Serialize, Default)]
      32              : pub struct GcSummary {
      33              :     indices_deleted: usize,
      34              :     tenant_manifests_deleted: usize,
      35              :     remote_storage_errors: usize,
      36              :     controller_api_errors: usize,
      37              :     ancestor_layers_deleted: usize,
      38              : }
      39              : 
      40              : impl GcSummary {
      41            0 :     fn merge(&mut self, other: Self) {
      42            0 :         let Self {
      43            0 :             indices_deleted,
      44            0 :             tenant_manifests_deleted,
      45            0 :             remote_storage_errors,
      46            0 :             ancestor_layers_deleted,
      47            0 :             controller_api_errors,
      48            0 :         } = other;
      49            0 : 
      50            0 :         self.indices_deleted += indices_deleted;
      51            0 :         self.tenant_manifests_deleted += tenant_manifests_deleted;
      52            0 :         self.remote_storage_errors += remote_storage_errors;
      53            0 :         self.ancestor_layers_deleted += ancestor_layers_deleted;
      54            0 :         self.controller_api_errors += controller_api_errors;
      55            0 :     }
      56              : }
      57              : 
      58              : #[derive(clap::ValueEnum, Debug, Clone, Copy)]
      59              : pub enum GcMode {
      60              :     // Delete nothing
      61              :     DryRun,
      62              : 
      63              :     // Enable only removing old-generation indices
      64              :     IndicesOnly,
      65              : 
      66              :     // Enable all forms of GC
      67              :     Full,
      68              : }
      69              : 
      70              : impl std::fmt::Display for GcMode {
      71            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      72            0 :         match self {
      73            0 :             GcMode::DryRun => write!(f, "dry-run"),
      74            0 :             GcMode::IndicesOnly => write!(f, "indices-only"),
      75            0 :             GcMode::Full => write!(f, "full"),
      76              :         }
      77            0 :     }
      78              : }
      79              : 
      80              : mod refs {
      81              :     use super::*;
      82              :     // Map of cross-shard layer references, giving a refcount for each layer in each shard that is referenced by some other
      83              :     // shard in the same tenant.  This is sparse!  The vast majority of timelines will have no cross-shard refs, and those that
      84              :     // do have cross shard refs should eventually drop most of them via compaction.
      85              :     //
      86              :     // In our inner map type, the TTID in the key is shard-agnostic, and the ShardIndex in the value refers to the _ancestor
      87              :     // which is is referenced_.
      88              :     #[derive(Default)]
      89              :     pub(super) struct AncestorRefs(
      90              :         BTreeMap<TenantTimelineId, HashMap<(ShardIndex, LayerName), usize>>,
      91              :     );
      92              : 
      93              :     impl AncestorRefs {
      94              :         /// Insert references for layers discovered in a particular shard-timeline that refer to an ancestral shard-timeline.
      95            0 :         pub(super) fn update(
      96            0 :             &mut self,
      97            0 :             ttid: TenantShardTimelineId,
      98            0 :             layers: Vec<(LayerName, LayerFileMetadata)>,
      99            0 :         ) {
     100            0 :             let ttid_refs = self.0.entry(ttid.as_tenant_timeline_id()).or_default();
     101            0 :             for (layer_name, layer_metadata) in layers {
     102            0 :                 // Increment refcount of this layer in the ancestor shard
     103            0 :                 *(ttid_refs
     104            0 :                     .entry((layer_metadata.shard, layer_name))
     105            0 :                     .or_default()) += 1;
     106            0 :             }
     107            0 :         }
     108              : 
     109              :         /// For a particular TTID, return the map of all ancestor layers referenced by a descendent to their refcount
     110              :         ///
     111              :         /// The `ShardIndex` in the result's key is the index of the _ancestor_, not the descendent.
     112            0 :         pub(super) fn get_ttid_refcounts(
     113            0 :             &self,
     114            0 :             ttid: &TenantTimelineId,
     115            0 :         ) -> Option<&HashMap<(ShardIndex, LayerName), usize>> {
     116            0 :             self.0.get(ttid)
     117            0 :         }
     118              :     }
     119              : }
     120              : 
     121              : use refs::AncestorRefs;
     122              : 
     123              : // As we see shards for a tenant, acccumulate knowledge needed for cross-shard GC:
     124              : // - Are there any ancestor shards?
     125              : // - Are there any refs to ancestor shards' layers?
     126              : #[derive(Default)]
     127              : struct TenantRefAccumulator {
     128              :     shards_seen: HashMap<TenantId, BTreeSet<ShardIndex>>,
     129              : 
     130              :     // For each shard that has refs to an ancestor's layers, the set of ancestor layers referred to
     131              :     ancestor_ref_shards: AncestorRefs,
     132              : }
     133              : 
     134              : impl TenantRefAccumulator {
     135            0 :     fn update(&mut self, ttid: TenantShardTimelineId, index_part: &IndexPart) {
     136            0 :         let this_shard_idx = ttid.tenant_shard_id.to_index();
     137            0 :         (*self
     138            0 :             .shards_seen
     139            0 :             .entry(ttid.tenant_shard_id.tenant_id)
     140            0 :             .or_default())
     141            0 :         .insert(this_shard_idx);
     142            0 : 
     143            0 :         let mut ancestor_refs = Vec::new();
     144            0 :         for (layer_name, layer_metadata) in &index_part.layer_metadata {
     145            0 :             if layer_metadata.shard != this_shard_idx {
     146            0 :                 // This is a reference from this shard to a layer in an ancestor shard: we must track this
     147            0 :                 // as a marker to not GC this layer from the parent.
     148            0 :                 ancestor_refs.push((layer_name.clone(), layer_metadata.clone()));
     149            0 :             }
     150              :         }
     151              : 
     152            0 :         if !ancestor_refs.is_empty() {
     153            0 :             tracing::info!(%ttid, "Found {} ancestor refs", ancestor_refs.len());
     154            0 :             self.ancestor_ref_shards.update(ttid, ancestor_refs);
     155            0 :         }
     156            0 :     }
     157              : 
     158              :     /// Consume Self and return a vector of ancestor tenant shards that should be GC'd, and map of referenced ancestor layers to preserve
     159            0 :     async fn into_gc_ancestors(
     160            0 :         self,
     161            0 :         controller_client: &control_api::Client,
     162            0 :         summary: &mut GcSummary,
     163            0 :     ) -> (Vec<TenantShardId>, AncestorRefs) {
     164            0 :         let mut ancestors_to_gc = Vec::new();
     165            0 :         for (tenant_id, shard_indices) in self.shards_seen {
     166              :             // Find the highest shard count
     167            0 :             let latest_count = shard_indices
     168            0 :                 .iter()
     169            0 :                 .map(|i| i.shard_count)
     170            0 :                 .max()
     171            0 :                 .expect("Always at least one shard");
     172            0 : 
     173            0 :             let mut shard_indices = shard_indices.iter().collect::<Vec<_>>();
     174            0 :             let (mut latest_shards, ancestor_shards) = {
     175            0 :                 let at =
     176            0 :                     itertools::partition(&mut shard_indices, |i| i.shard_count == latest_count);
     177            0 :                 (shard_indices[0..at].to_owned(), &shard_indices[at..])
     178            0 :             };
     179            0 :             // Sort shards, as we will later compare them with a sorted list from the controller
     180            0 :             latest_shards.sort();
     181            0 : 
     182            0 :             // Check that we have a complete view of the latest shard count: this should always be the case unless we happened
     183            0 :             // to scan the S3 bucket halfway through a shard split.
     184            0 :             if latest_shards.len() != latest_count.count() as usize {
     185              :                 // This should be extremely rare, so we warn on it.
     186            0 :                 tracing::warn!(%tenant_id, "Missed some shards at count {:?}: {latest_shards:?}", latest_count);
     187            0 :                 continue;
     188            0 :             }
     189            0 : 
     190            0 :             // Check if we have any non-latest-count shards
     191            0 :             if ancestor_shards.is_empty() {
     192            0 :                 tracing::debug!(%tenant_id, "No ancestor shards to clean up");
     193            0 :                 continue;
     194            0 :             }
     195            0 : 
     196            0 :             // Based on S3 view, this tenant looks like it might have some ancestor shard work to do.  We
     197            0 :             // must only do this work if the tenant is not currently being split: otherwise, it is not safe
     198            0 :             // to GC ancestors, because if the split fails then the controller will try to attach ancestor
     199            0 :             // shards again.
     200            0 :             match controller_client
     201            0 :                 .dispatch::<(), TenantDescribeResponse>(
     202            0 :                     Method::GET,
     203            0 :                     format!("control/v1/tenant/{tenant_id}"),
     204            0 :                     None,
     205            0 :                 )
     206            0 :                 .await
     207              :             {
     208            0 :                 Err(e) => {
     209            0 :                     // We were not able to learn the latest shard split state from the controller, so we will not
     210            0 :                     // do ancestor GC on this tenant.
     211            0 :                     tracing::warn!(%tenant_id, "Failed to query storage controller, will not do ancestor GC: {e}");
     212            0 :                     summary.controller_api_errors += 1;
     213            0 :                     continue;
     214              :                 }
     215            0 :                 Ok(desc) => {
     216            0 :                     // We expect to see that the latest shard count matches the one we saw in S3, and that none
     217            0 :                     // of the shards indicate splitting in progress.
     218            0 : 
     219            0 :                     let controller_indices: Vec<ShardIndex> = desc
     220            0 :                         .shards
     221            0 :                         .iter()
     222            0 :                         .map(|s| s.tenant_shard_id.to_index())
     223            0 :                         .collect();
     224            0 :                     if !controller_indices.iter().eq(latest_shards.iter().copied()) {
     225            0 :                         tracing::info!(%tenant_id, "Latest shards seen in S3 ({latest_shards:?}) don't match controller state ({controller_indices:?})");
     226            0 :                         continue;
     227            0 :                     }
     228            0 : 
     229            0 :                     if desc.shards.iter().any(|s| s.is_splitting) {
     230            0 :                         tracing::info!(%tenant_id, "One or more shards is currently splitting");
     231            0 :                         continue;
     232            0 :                     }
     233            0 : 
     234            0 :                     // This shouldn't be too noisy, because we only log this for tenants that have some ancestral refs.
     235            0 :                     tracing::info!(%tenant_id, "Validated state with controller: {desc:?}");
     236              :                 }
     237              :             }
     238              : 
     239              :             // GC ancestor shards
     240            0 :             for ancestor_shard in ancestor_shards.iter().map(|idx| TenantShardId {
     241            0 :                 tenant_id,
     242            0 :                 shard_count: idx.shard_count,
     243            0 :                 shard_number: idx.shard_number,
     244            0 :             }) {
     245            0 :                 ancestors_to_gc.push(ancestor_shard);
     246            0 :             }
     247              :         }
     248              : 
     249            0 :         (ancestors_to_gc, self.ancestor_ref_shards)
     250            0 :     }
     251              : }
     252              : 
     253            0 : fn is_old_enough(min_age: &Duration, key: &ListingObject, summary: &mut GcSummary) -> bool {
     254              :     // Validation: we will only GC indices & layers after a time threshold (e.g. one week) so that during an incident
     255              :     // it is easier to read old data for analysis, and easier to roll back shard splits without having to un-delete any objects.
     256            0 :     let age = match key.last_modified.elapsed() {
     257            0 :         Ok(e) => e,
     258              :         Err(_) => {
     259            0 :             tracing::warn!("Bad last_modified time: {:?}", key.last_modified);
     260            0 :             summary.remote_storage_errors += 1;
     261            0 :             return false;
     262              :         }
     263              :     };
     264            0 :     let old_enough = &age > min_age;
     265            0 : 
     266            0 :     if !old_enough {
     267            0 :         tracing::info!(
     268            0 :             "Skipping young object {} < {}",
     269            0 :             humantime::format_duration(age),
     270            0 :             humantime::format_duration(*min_age)
     271              :         );
     272            0 :     }
     273              : 
     274            0 :     old_enough
     275            0 : }
     276              : 
     277              : /// Same as [`is_old_enough`], but doesn't require a [`ListingObject`] passed to it.
     278            0 : async fn check_is_old_enough(
     279            0 :     remote_client: &GenericRemoteStorage,
     280            0 :     key: &RemotePath,
     281            0 :     min_age: &Duration,
     282            0 :     summary: &mut GcSummary,
     283            0 : ) -> Option<bool> {
     284            0 :     let listing_object = remote_client
     285            0 :         .head_object(key, &CancellationToken::new())
     286            0 :         .await
     287            0 :         .ok()?;
     288            0 :     Some(is_old_enough(min_age, &listing_object, summary))
     289            0 : }
     290              : 
     291            0 : async fn maybe_delete_index(
     292            0 :     remote_client: &GenericRemoteStorage,
     293            0 :     min_age: &Duration,
     294            0 :     latest_gen: Generation,
     295            0 :     obj: &ListingObject,
     296            0 :     mode: GcMode,
     297            0 :     summary: &mut GcSummary,
     298            0 : ) {
     299            0 :     // Validation: we will only delete things that parse cleanly
     300            0 :     let basename = obj.key.get_path().file_name().unwrap();
     301            0 :     let candidate_generation =
     302            0 :         match parse_remote_index_path(RemotePath::from_string(basename).unwrap()) {
     303            0 :             Some(g) => g,
     304              :             None => {
     305            0 :                 if basename == IndexPart::FILE_NAME {
     306              :                     // A legacy pre-generation index
     307            0 :                     Generation::none()
     308              :                 } else {
     309              :                     // A strange key: we will not delete this because we don't understand it.
     310            0 :                     tracing::warn!("Bad index key");
     311            0 :                     return;
     312              :                 }
     313              :             }
     314              :         };
     315              : 
     316              :     // Validation: we will only delete indices more than one generation old, to avoid interfering
     317              :     // in typical migrations, even if they are very long running.
     318            0 :     if candidate_generation >= latest_gen {
     319              :         // This shouldn't happen: when we loaded metadata, it should have selected the latest
     320              :         // generation already, and only populated [`S3TimelineBlobData::unused_index_keys`]
     321              :         // with older generations.
     322            0 :         tracing::warn!("Deletion candidate is >= latest generation, this is a bug!");
     323            0 :         return;
     324            0 :     } else if candidate_generation.next() == latest_gen {
     325              :         // Skip deleting the latest-1th generation's index.
     326            0 :         return;
     327            0 :     }
     328            0 : 
     329            0 :     if !is_old_enough(min_age, obj, summary) {
     330            0 :         return;
     331            0 :     }
     332              : 
     333            0 :     if matches!(mode, GcMode::DryRun) {
     334            0 :         tracing::info!("Dry run: would delete this key");
     335            0 :         return;
     336            0 :     }
     337            0 : 
     338            0 :     // All validations passed: erase the object
     339            0 :     let cancel = CancellationToken::new();
     340            0 :     match backoff::retry(
     341            0 :         || remote_client.delete(&obj.key, &cancel),
     342            0 :         |_| false,
     343            0 :         3,
     344            0 :         MAX_RETRIES as u32,
     345            0 :         "maybe_delete_index",
     346            0 :         &cancel,
     347            0 :     )
     348            0 :     .await
     349              :     {
     350              :         None => {
     351            0 :             unreachable!("Using a dummy cancellation token");
     352              :         }
     353              :         Some(Ok(_)) => {
     354            0 :             tracing::info!("Successfully deleted index");
     355            0 :             summary.indices_deleted += 1;
     356              :         }
     357            0 :         Some(Err(e)) => {
     358            0 :             tracing::warn!("Failed to delete index: {e}");
     359            0 :             summary.remote_storage_errors += 1;
     360              :         }
     361              :     }
     362            0 : }
     363              : 
     364            0 : async fn maybe_delete_tenant_manifest(
     365            0 :     remote_client: &GenericRemoteStorage,
     366            0 :     min_age: &Duration,
     367            0 :     latest_gen: Generation,
     368            0 :     obj: &ListingObject,
     369            0 :     mode: GcMode,
     370            0 :     summary: &mut GcSummary,
     371            0 : ) {
     372            0 :     // Validation: we will only delete things that parse cleanly
     373            0 :     let basename = obj.key.get_path().file_name().unwrap();
     374            0 :     let Some(candidate_generation) =
     375            0 :         parse_remote_tenant_manifest_path(RemotePath::from_string(basename).unwrap())
     376              :     else {
     377              :         // A strange key: we will not delete this because we don't understand it.
     378            0 :         tracing::warn!("Bad index key");
     379            0 :         return;
     380              :     };
     381              : 
     382              :     // Validation: we will only delete manifests more than one generation old, and in fact we
     383              :     // should never be called with such recent generations.
     384            0 :     if candidate_generation >= latest_gen {
     385            0 :         tracing::warn!("Deletion candidate is >= latest generation, this is a bug!");
     386            0 :         return;
     387            0 :     } else if candidate_generation.next() == latest_gen {
     388            0 :         tracing::warn!("Deletion candidate is >= latest generation - 1, this is a bug!");
     389            0 :         return;
     390            0 :     }
     391            0 : 
     392            0 :     if !is_old_enough(min_age, obj, summary) {
     393            0 :         return;
     394            0 :     }
     395              : 
     396            0 :     if matches!(mode, GcMode::DryRun) {
     397            0 :         tracing::info!("Dry run: would delete this key");
     398            0 :         return;
     399            0 :     }
     400            0 : 
     401            0 :     // All validations passed: erase the object
     402            0 :     let cancel = CancellationToken::new();
     403            0 :     match backoff::retry(
     404            0 :         || remote_client.delete(&obj.key, &cancel),
     405            0 :         |_| false,
     406            0 :         3,
     407            0 :         MAX_RETRIES as u32,
     408            0 :         "maybe_delete_tenant_manifest",
     409            0 :         &cancel,
     410            0 :     )
     411            0 :     .await
     412              :     {
     413              :         None => {
     414            0 :             unreachable!("Using a dummy cancellation token");
     415              :         }
     416              :         Some(Ok(_)) => {
     417            0 :             tracing::info!("Successfully deleted tenant manifest");
     418            0 :             summary.tenant_manifests_deleted += 1;
     419              :         }
     420            0 :         Some(Err(e)) => {
     421            0 :             tracing::warn!("Failed to delete tenant manifest: {e}");
     422            0 :             summary.remote_storage_errors += 1;
     423              :         }
     424              :     }
     425            0 : }
     426              : 
     427              : #[allow(clippy::too_many_arguments)]
     428            0 : async fn gc_ancestor(
     429            0 :     remote_client: &GenericRemoteStorage,
     430            0 :     root_target: &RootTarget,
     431            0 :     min_age: &Duration,
     432            0 :     ancestor: TenantShardId,
     433            0 :     refs: &AncestorRefs,
     434            0 :     mode: GcMode,
     435            0 :     summary: &mut GcSummary,
     436            0 : ) -> anyhow::Result<()> {
     437              :     // Scan timelines in the ancestor
     438            0 :     let timelines = stream_tenant_timelines(remote_client, root_target, ancestor).await?;
     439            0 :     let mut timelines = std::pin::pin!(timelines);
     440              : 
     441              :     // Build a list of keys to retain
     442              : 
     443            0 :     while let Some(ttid) = timelines.next().await {
     444            0 :         let ttid = ttid?;
     445              : 
     446            0 :         let data = list_timeline_blobs(remote_client, ttid, root_target).await?;
     447              : 
     448            0 :         let s3_layers = match data.blob_data {
     449              :             BlobDataParseResult::Parsed {
     450              :                 index_part: _,
     451              :                 index_part_generation: _,
     452            0 :                 s3_layers,
     453            0 :             } => s3_layers,
     454              :             BlobDataParseResult::Relic => {
     455              :                 // Post-deletion tenant location: don't try and GC it.
     456            0 :                 continue;
     457              :             }
     458              :             BlobDataParseResult::Incorrect {
     459            0 :                 errors,
     460            0 :                 s3_layers: _, // TODO(yuchen): could still check references to these s3 layers?
     461            0 :             } => {
     462            0 :                 // Our primary purpose isn't to report on bad data, but log this rather than skipping silently
     463            0 :                 tracing::warn!(
     464            0 :                     "Skipping ancestor GC for timeline {ttid}, bad metadata: {errors:?}"
     465              :                 );
     466            0 :                 continue;
     467              :             }
     468              :         };
     469              : 
     470            0 :         let ttid_refs = refs.get_ttid_refcounts(&ttid.as_tenant_timeline_id());
     471            0 :         let ancestor_shard_index = ttid.tenant_shard_id.to_index();
     472              : 
     473            0 :         for (layer_name, layer_gen) in s3_layers {
     474            0 :             let ref_count = ttid_refs
     475            0 :                 .and_then(|m| m.get(&(ancestor_shard_index, layer_name.clone())))
     476            0 :                 .copied()
     477            0 :                 .unwrap_or(0);
     478            0 : 
     479            0 :             if ref_count > 0 {
     480            0 :                 tracing::debug!(%ttid, "Ancestor layer {layer_name}  has {ref_count} refs");
     481            0 :                 continue;
     482            0 :             }
     483            0 : 
     484            0 :             tracing::info!(%ttid, "Ancestor layer {layer_name} is not referenced");
     485              : 
     486              :             // Build the key for the layer we are considering deleting
     487            0 :             let key = root_target.absolute_key(&remote_layer_path(
     488            0 :                 &ttid.tenant_shard_id.tenant_id,
     489            0 :                 &ttid.timeline_id,
     490            0 :                 ancestor_shard_index,
     491            0 :                 &layer_name,
     492            0 :                 layer_gen,
     493            0 :             ));
     494            0 : 
     495            0 :             // We apply a time threshold to GCing objects that are un-referenced: this preserves our ability
     496            0 :             // to roll back a shard split if we have to, by avoiding deleting ancestor layers right away
     497            0 :             let path = RemotePath::from_string(key.strip_prefix("/").unwrap_or(&key)).unwrap();
     498            0 :             if check_is_old_enough(remote_client, &path, min_age, summary).await != Some(true) {
     499            0 :                 continue;
     500            0 :             }
     501              : 
     502            0 :             if !matches!(mode, GcMode::Full) {
     503            0 :                 tracing::info!("Dry run: would delete key {key}");
     504            0 :                 continue;
     505            0 :             }
     506            0 : 
     507            0 :             // All validations passed: erase the object
     508            0 :             match remote_client.delete(&path, &CancellationToken::new()).await {
     509              :                 Ok(_) => {
     510            0 :                     tracing::info!("Successfully deleted unreferenced ancestor layer {key}");
     511            0 :                     summary.ancestor_layers_deleted += 1;
     512              :                 }
     513            0 :                 Err(e) => {
     514            0 :                     tracing::warn!("Failed to delete layer {key}: {e}");
     515            0 :                     summary.remote_storage_errors += 1;
     516              :                 }
     517              :             }
     518              :         }
     519              : 
     520              :         // TODO: if all the layers are gone, clean up the whole timeline dir (remove index)
     521              :     }
     522              : 
     523            0 :     Ok(())
     524            0 : }
     525              : 
     526            0 : async fn gc_tenant_manifests(
     527            0 :     remote_client: &GenericRemoteStorage,
     528            0 :     min_age: Duration,
     529            0 :     target: &RootTarget,
     530            0 :     mode: GcMode,
     531            0 :     tenant_shard_id: TenantShardId,
     532            0 : ) -> anyhow::Result<(GcSummary, Option<RemoteTenantManifestInfo>)> {
     533            0 :     let mut gc_summary = GcSummary::default();
     534            0 :     match list_tenant_manifests(remote_client, tenant_shard_id, target).await? {
     535              :         ListTenantManifestResult::WithErrors {
     536            0 :             errors,
     537              :             unknown_keys: _,
     538              :         } => {
     539            0 :             for (_key, error) in errors {
     540            0 :                 tracing::warn!(%tenant_shard_id, "list_tenant_manifests: {error}");
     541              :             }
     542            0 :             Ok((gc_summary, None))
     543              :         }
     544              :         ListTenantManifestResult::NoErrors {
     545            0 :             latest_generation,
     546            0 :             mut manifests,
     547              :         } => {
     548            0 :             let Some(latest_generation) = latest_generation else {
     549            0 :                 return Ok((gc_summary, None));
     550              :             };
     551            0 :             manifests.sort_by_key(|(generation, _obj)| *generation);
     552            0 :             // skip the two latest generations (they don't neccessarily have to be 1 apart from each other)
     553            0 :             let candidates = manifests.iter().rev().skip(2);
     554            0 :             for (_generation, key) in candidates {
     555            0 :                 maybe_delete_tenant_manifest(
     556            0 :                     remote_client,
     557            0 :                     &min_age,
     558            0 :                     latest_generation.generation,
     559            0 :                     key,
     560            0 :                     mode,
     561            0 :                     &mut gc_summary,
     562            0 :                 )
     563            0 :                 .instrument(
     564            0 :                     info_span!("maybe_delete_tenant_manifest", %tenant_shard_id, ?latest_generation.generation, %key.key),
     565              :                 )
     566            0 :                 .await;
     567              :             }
     568            0 :             Ok((gc_summary, Some(latest_generation)))
     569              :         }
     570              :     }
     571            0 : }
     572              : 
     573            0 : async fn gc_timeline(
     574            0 :     remote_client: &GenericRemoteStorage,
     575            0 :     min_age: &Duration,
     576            0 :     target: &RootTarget,
     577            0 :     mode: GcMode,
     578            0 :     ttid: TenantShardTimelineId,
     579            0 :     accumulator: &Arc<std::sync::Mutex<TenantRefAccumulator>>,
     580            0 :     tenant_manifest_info: Arc<Option<RemoteTenantManifestInfo>>,
     581            0 : ) -> anyhow::Result<GcSummary> {
     582            0 :     let mut summary = GcSummary::default();
     583            0 :     let data = list_timeline_blobs(remote_client, ttid, target).await?;
     584              : 
     585            0 :     let (index_part, latest_gen, candidates) = match &data.blob_data {
     586              :         BlobDataParseResult::Parsed {
     587            0 :             index_part,
     588            0 :             index_part_generation,
     589            0 :             s3_layers: _s3_layers,
     590            0 :         } => (index_part, *index_part_generation, data.unused_index_keys),
     591              :         BlobDataParseResult::Relic => {
     592              :             // Post-deletion tenant location: don't try and GC it.
     593            0 :             return Ok(summary);
     594              :         }
     595              :         BlobDataParseResult::Incorrect {
     596            0 :             errors,
     597            0 :             s3_layers: _,
     598            0 :         } => {
     599            0 :             // Our primary purpose isn't to report on bad data, but log this rather than skipping silently
     600            0 :             tracing::warn!("Skipping timeline {ttid}, bad metadata: {errors:?}");
     601            0 :             return Ok(summary);
     602              :         }
     603              :     };
     604              : 
     605            0 :     if let Some(tenant_manifest_info) = &*tenant_manifest_info {
     606              :         // TODO: this is O(n^2) in the number of offloaded timelines. Do a hashmap lookup instead.
     607            0 :         let maybe_offloaded = tenant_manifest_info
     608            0 :             .manifest
     609            0 :             .offloaded_timelines
     610            0 :             .iter()
     611            0 :             .find(|offloaded_timeline| offloaded_timeline.timeline_id == ttid.timeline_id);
     612            0 :         if let Some(offloaded) = maybe_offloaded {
     613            0 :             let warnings = validate_index_part_with_offloaded(index_part, offloaded);
     614            0 :             let warn = if warnings.is_empty() {
     615            0 :                 false
     616              :             } else {
     617              :                 // Verify that the manifest hasn't changed. If it has, a potential racing change could have been cause for our troubles.
     618            0 :                 match list_tenant_manifests(remote_client, ttid.tenant_shard_id, target).await? {
     619              :                     ListTenantManifestResult::WithErrors {
     620            0 :                         errors,
     621              :                         unknown_keys: _,
     622              :                     } => {
     623            0 :                         for (_key, error) in errors {
     624            0 :                             tracing::warn!(%ttid, "list_tenant_manifests in gc_timeline: {error}");
     625              :                         }
     626            0 :                         true
     627              :                     }
     628              :                     ListTenantManifestResult::NoErrors {
     629            0 :                         latest_generation,
     630              :                         manifests: _,
     631              :                     } => {
     632            0 :                         if let Some(new_latest_gen) = latest_generation {
     633            0 :                             let manifest_changed = (
     634            0 :                                 new_latest_gen.generation,
     635            0 :                                 new_latest_gen.listing_object.last_modified,
     636            0 :                             ) == (
     637            0 :                                 tenant_manifest_info.generation,
     638            0 :                                 tenant_manifest_info.listing_object.last_modified,
     639            0 :                             );
     640            0 :                             if manifest_changed {
     641            0 :                                 tracing::debug!(%ttid, "tenant manifest changed since it was loaded, suppressing {} warnings", warnings.len());
     642            0 :                             }
     643            0 :                             manifest_changed
     644              :                         } else {
     645              :                             // The latest generation is gone. This timeline is in the progress of being deleted?
     646            0 :                             false
     647              :                         }
     648              :                     }
     649              :                 }
     650              :             };
     651            0 :             if warn {
     652            0 :                 for warning in warnings {
     653            0 :                     tracing::warn!(%ttid, "{}", warning);
     654              :                 }
     655            0 :             }
     656            0 :         }
     657            0 :     }
     658              : 
     659            0 :     accumulator.lock().unwrap().update(ttid, index_part);
     660              : 
     661            0 :     for key in candidates {
     662            0 :         maybe_delete_index(remote_client, min_age, latest_gen, &key, mode, &mut summary)
     663            0 :             .instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, %key.key))
     664            0 :             .await;
     665              :     }
     666              : 
     667            0 :     Ok(summary)
     668            0 : }
     669              : 
     670            0 : fn validate_index_part_with_offloaded(
     671            0 :     index_part: &IndexPart,
     672            0 :     offloaded: &OffloadedTimelineManifest,
     673            0 : ) -> Vec<String> {
     674            0 :     let mut warnings = Vec::new();
     675            0 :     if let Some(archived_at_index_part) = index_part.archived_at {
     676            0 :         if archived_at_index_part
     677            0 :             .signed_duration_since(offloaded.archived_at)
     678            0 :             .num_seconds()
     679            0 :             != 0
     680            0 :         {
     681            0 :             warnings.push(format!(
     682            0 :                 "index-part archived_at={} differs from manifest archived_at={}",
     683            0 :                 archived_at_index_part, offloaded.archived_at
     684            0 :             ));
     685            0 :         }
     686            0 :     } else {
     687            0 :         warnings.push("Timeline offloaded in manifest but not archived in index-part".to_string());
     688            0 :     }
     689            0 :     if index_part.metadata.ancestor_timeline() != offloaded.ancestor_timeline_id {
     690            0 :         warnings.push(format!(
     691            0 :             "index-part anestor={:?} differs from manifest ancestor={:?}",
     692            0 :             index_part.metadata.ancestor_timeline(),
     693            0 :             offloaded.ancestor_timeline_id
     694            0 :         ));
     695            0 :     }
     696            0 :     warnings
     697            0 : }
     698              : 
     699              : /// Physical garbage collection: removing unused S3 objects.
     700              : ///
     701              : /// This is distinct from the garbage collection done inside the pageserver, which operates at a higher level
     702              : /// (keys, layers).  This type of garbage collection is about removing:
     703              : /// - Objects that were uploaded but never referenced in the remote index (e.g. because of a shutdown between
     704              : ///   uploading a layer and uploading an index)
     705              : /// - Index objects and tenant manifests from historic generations
     706              : ///
     707              : /// This type of GC is not necessary for correctness: rather it serves to reduce wasted storage capacity, and
     708              : /// make sure that object listings don't get slowed down by large numbers of garbage objects.
     709            0 : pub async fn pageserver_physical_gc(
     710            0 :     bucket_config: &BucketConfig,
     711            0 :     controller_client: Option<&control_api::Client>,
     712            0 :     tenant_shard_ids: Vec<TenantShardId>,
     713            0 :     min_age: Duration,
     714            0 :     mode: GcMode,
     715            0 : ) -> anyhow::Result<GcSummary> {
     716            0 :     let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
     717              : 
     718            0 :     let remote_client = Arc::new(remote_client);
     719            0 :     let tenants = if tenant_shard_ids.is_empty() {
     720            0 :         futures::future::Either::Left(stream_tenants(&remote_client, &target))
     721              :     } else {
     722            0 :         futures::future::Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok)))
     723              :     };
     724              : 
     725              :     // How many tenants to process in parallel.  We need to be mindful of pageservers
     726              :     // accessing the same per tenant prefixes, so use a lower setting than pageservers.
     727              :     const CONCURRENCY: usize = 32;
     728              : 
     729              :     // Accumulate information about each tenant for cross-shard GC step we'll do at the end
     730            0 :     let accumulator = Arc::new(std::sync::Mutex::new(TenantRefAccumulator::default()));
     731              : 
     732              :     // Generate a stream of TenantTimelineId
     733              :     enum GcSummaryOrContent<T> {
     734              :         Content(T),
     735              :         GcSummary(GcSummary),
     736              :     }
     737            0 :     let timelines = tenants.map_ok(|tenant_shard_id| {
     738            0 :         let target_ref = &target;
     739            0 :         let remote_client_ref = &remote_client;
     740            0 :         async move {
     741            0 :             let gc_manifest_result = gc_tenant_manifests(
     742            0 :                 remote_client_ref,
     743            0 :                 min_age,
     744            0 :                 target_ref,
     745            0 :                 mode,
     746            0 :                 tenant_shard_id,
     747            0 :             )
     748            0 :             .await;
     749            0 :             let (summary_from_manifest, tenant_manifest_opt) = match gc_manifest_result {
     750            0 :                 Ok((gc_summary, tenant_manifest)) => (gc_summary, tenant_manifest),
     751            0 :                 Err(e) => {
     752            0 :                     tracing::warn!(%tenant_shard_id, "Error in gc_tenant_manifests: {e}");
     753            0 :                     (GcSummary::default(), None)
     754              :                 }
     755              :             };
     756            0 :             let tenant_manifest_arc = Arc::new(tenant_manifest_opt);
     757            0 :             let summary_from_manifest = Ok(GcSummaryOrContent::<(_, _)>::GcSummary(
     758            0 :                 summary_from_manifest,
     759            0 :             ));
     760            0 :             stream_tenant_timelines(remote_client_ref, target_ref, tenant_shard_id)
     761            0 :                 .await
     762            0 :                 .map(|stream| {
     763            0 :                     stream
     764            0 :                         .zip(futures::stream::iter(std::iter::repeat(
     765            0 :                             tenant_manifest_arc,
     766            0 :                         )))
     767            0 :                         .map(|(ttid_res, tenant_manifest_arc)| {
     768            0 :                             ttid_res.map(move |ttid| {
     769            0 :                                 GcSummaryOrContent::Content((ttid, tenant_manifest_arc))
     770            0 :                             })
     771            0 :                         })
     772            0 :                         .chain(futures::stream::iter([summary_from_manifest].into_iter()))
     773            0 :                 })
     774            0 :         }
     775            0 :     });
     776            0 :     let timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
     777            0 :     let timelines = timelines.try_flatten();
     778            0 : 
     779            0 :     let mut summary = GcSummary::default();
     780            0 : 
     781            0 :     // Drain futures for per-shard GC, populating accumulator as a side effect
     782            0 :     {
     783            0 :         let timelines = timelines.map_ok(|summary_or_ttid| match summary_or_ttid {
     784            0 :             GcSummaryOrContent::Content((ttid, tenant_manifest_arc)) => {
     785            0 :                 futures::future::Either::Left(gc_timeline(
     786            0 :                     &remote_client,
     787            0 :                     &min_age,
     788            0 :                     &target,
     789            0 :                     mode,
     790            0 :                     ttid,
     791            0 :                     &accumulator,
     792            0 :                     tenant_manifest_arc,
     793            0 :                 ))
     794              :             }
     795            0 :             GcSummaryOrContent::GcSummary(gc_summary) => {
     796            0 :                 futures::future::Either::Right(futures::future::ok(gc_summary))
     797              :             }
     798            0 :         });
     799            0 :         let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
     800              : 
     801            0 :         while let Some(i) = timelines.next().await {
     802            0 :             summary.merge(i?);
     803              :         }
     804              :     }
     805              : 
     806              :     // Execute cross-shard GC, using the accumulator's full view of all the shards built in the per-shard GC
     807            0 :     let Some(client) = controller_client else {
     808            0 :         tracing::info!("Skipping ancestor layer GC, because no `--controller-api` was specified");
     809            0 :         return Ok(summary);
     810              :     };
     811              : 
     812            0 :     let (ancestor_shards, ancestor_refs) = Arc::into_inner(accumulator)
     813            0 :         .unwrap()
     814            0 :         .into_inner()
     815            0 :         .unwrap()
     816            0 :         .into_gc_ancestors(client, &mut summary)
     817            0 :         .await;
     818              : 
     819            0 :     for ancestor_shard in ancestor_shards {
     820            0 :         gc_ancestor(
     821            0 :             &remote_client,
     822            0 :             &target,
     823            0 :             &min_age,
     824            0 :             ancestor_shard,
     825            0 :             &ancestor_refs,
     826            0 :             mode,
     827            0 :             &mut summary,
     828            0 :         )
     829            0 :         .instrument(info_span!("gc_ancestor", %ancestor_shard))
     830            0 :         .await?;
     831              :     }
     832              : 
     833            0 :     Ok(summary)
     834            0 : }
        

Generated by: LCOV version 2.1-beta