LCOV - code coverage report
Current view: top level - storage_scrubber/src - pageserver_physical_gc.rs (source / functions) Coverage Total Hit
Test: 050dd70dd490b28fffe527eae9fb8a1222b5c59c.info Lines: 0.0 % 146 0
Test Date: 2024-06-25 21:28:46 Functions: 0.0 % 11 0

            Line data    Source code
       1              : use std::time::{Duration, UNIX_EPOCH};
       2              : 
       3              : use crate::checks::{list_timeline_blobs, BlobDataParseResult};
       4              : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
       5              : use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
       6              : use aws_sdk_s3::Client;
       7              : use futures_util::{StreamExt, TryStreamExt};
       8              : use pageserver::tenant::remote_timeline_client::parse_remote_index_path;
       9              : use pageserver::tenant::IndexPart;
      10              : use pageserver_api::shard::TenantShardId;
      11              : use remote_storage::RemotePath;
      12              : use serde::Serialize;
      13              : use tracing::{info_span, Instrument};
      14              : use utils::generation::Generation;
      15              : 
      16              : #[derive(Serialize, Default)]
      17              : pub struct GcSummary {
      18              :     indices_deleted: usize,
      19              :     remote_storage_errors: usize,
      20              : }
      21              : 
      22            0 : #[derive(clap::ValueEnum, Debug, Clone, Copy)]
      23              : pub enum GcMode {
      24              :     // Delete nothing
      25              :     DryRun,
      26              : 
      27              :     // Enable only removing old-generation indices
      28              :     IndicesOnly,
      29              :     // Enable all forms of GC
      30              :     // TODO: this will be used when shard split ancestor layer deletion is added
      31              :     // All,
      32              : }
      33              : 
      34              : impl std::fmt::Display for GcMode {
      35            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      36            0 :         match self {
      37            0 :             GcMode::DryRun => write!(f, "dry-run"),
      38            0 :             GcMode::IndicesOnly => write!(f, "indices-only"),
      39              :         }
      40            0 :     }
      41              : }
      42              : 
      43            0 : async fn maybe_delete_index(
      44            0 :     s3_client: &Client,
      45            0 :     bucket_config: &BucketConfig,
      46            0 :     min_age: &Duration,
      47            0 :     latest_gen: Generation,
      48            0 :     key: &str,
      49            0 :     mode: GcMode,
      50            0 :     summary: &mut GcSummary,
      51            0 : ) {
      52            0 :     // Validation: we will only delete things that parse cleanly
      53            0 :     let basename = key.rsplit_once('/').unwrap().1;
      54            0 :     let candidate_generation =
      55            0 :         match parse_remote_index_path(RemotePath::from_string(basename).unwrap()) {
      56            0 :             Some(g) => g,
      57              :             None => {
      58            0 :                 if basename == IndexPart::FILE_NAME {
      59              :                     // A legacy pre-generation index
      60            0 :                     Generation::none()
      61              :                 } else {
      62              :                     // A strange key: we will not delete this because we don't understand it.
      63            0 :                     tracing::warn!("Bad index key");
      64            0 :                     return;
      65              :                 }
      66              :             }
      67              :         };
      68              : 
      69              :     // Validation: we will only delete indices more than one generation old, to avoid interfering
      70              :     // in typical migrations, even if they are very long running.
      71            0 :     if candidate_generation >= latest_gen {
      72              :         // This shouldn't happen: when we loaded metadata, it should have selected the latest
      73              :         // generation already, and only populated [`S3TimelineBlobData::unused_index_keys`]
      74              :         // with older generations.
      75            0 :         tracing::warn!("Deletion candidate is >= latest generation, this is a bug!");
      76            0 :         return;
      77            0 :     } else if candidate_generation.next() == latest_gen {
      78              :         // Skip deleting the latest-1th generation's index.
      79            0 :         return;
      80            0 :     }
      81              : 
      82              :     // Validation: we will only delete indices after one week, so that during incidents we will have
      83              :     // easy access to recent indices.
      84            0 :     let age: Duration = match s3_client
      85            0 :         .head_object()
      86            0 :         .bucket(&bucket_config.bucket)
      87            0 :         .key(key)
      88            0 :         .send()
      89            0 :         .await
      90              :     {
      91            0 :         Ok(response) => match response.last_modified {
      92              :             None => {
      93            0 :                 tracing::warn!("Missing last_modified");
      94            0 :                 summary.remote_storage_errors += 1;
      95            0 :                 return;
      96              :             }
      97            0 :             Some(last_modified) => {
      98            0 :                 let last_modified =
      99            0 :                     UNIX_EPOCH + Duration::from_secs_f64(last_modified.as_secs_f64());
     100            0 :                 match last_modified.elapsed() {
     101            0 :                     Ok(e) => e,
     102              :                     Err(_) => {
     103            0 :                         tracing::warn!("Bad last_modified time: {last_modified:?}");
     104            0 :                         return;
     105              :                     }
     106              :                 }
     107              :             }
     108              :         },
     109            0 :         Err(e) => {
     110            0 :             tracing::warn!("Failed to HEAD {key}: {e}");
     111            0 :             summary.remote_storage_errors += 1;
     112            0 :             return;
     113              :         }
     114              :     };
     115            0 :     if &age < min_age {
     116            0 :         tracing::info!(
     117            0 :             "Skipping young object {} < {}",
     118            0 :             age.as_secs_f64(),
     119            0 :             min_age.as_secs_f64()
     120              :         );
     121            0 :         return;
     122            0 :     }
     123              : 
     124            0 :     if matches!(mode, GcMode::DryRun) {
     125            0 :         tracing::info!("Dry run: would delete this key");
     126            0 :         return;
     127            0 :     }
     128            0 : 
     129            0 :     // All validations passed: erase the object
     130            0 :     match s3_client
     131            0 :         .delete_object()
     132            0 :         .bucket(&bucket_config.bucket)
     133            0 :         .key(key)
     134            0 :         .send()
     135            0 :         .await
     136              :     {
     137              :         Ok(_) => {
     138            0 :             tracing::info!("Successfully deleted index");
     139            0 :             summary.indices_deleted += 1;
     140              :         }
     141            0 :         Err(e) => {
     142            0 :             tracing::warn!("Failed to delete index: {e}");
     143            0 :             summary.remote_storage_errors += 1;
     144              :         }
     145              :     }
     146            0 : }
     147              : 
     148              : /// Physical garbage collection: removing unused S3 objects.  This is distinct from the garbage collection
     149              : /// done inside the pageserver, which operates at a higher level (keys, layers).  This type of garbage collection
     150              : /// is about removing:
     151              : /// - Objects that were uploaded but never referenced in the remote index (e.g. because of a shutdown between
     152              : ///   uploading a layer and uploading an index)
     153              : /// - Index objects from historic generations
     154              : ///
     155              : /// This type of GC is not necessary for correctness: rather it serves to reduce wasted storage capacity, and
     156              : /// make sure that object listings don't get slowed down by large numbers of garbage objects.
     157            0 : pub async fn pageserver_physical_gc(
     158            0 :     bucket_config: BucketConfig,
     159            0 :     tenant_ids: Vec<TenantShardId>,
     160            0 :     min_age: Duration,
     161            0 :     mode: GcMode,
     162            0 : ) -> anyhow::Result<GcSummary> {
     163            0 :     let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
     164              : 
     165            0 :     let tenants = if tenant_ids.is_empty() {
     166            0 :         futures::future::Either::Left(stream_tenants(&s3_client, &target))
     167              :     } else {
     168            0 :         futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
     169              :     };
     170              : 
     171              :     // How many tenants to process in parallel.  We need to be mindful of pageservers
     172              :     // accessing the same per tenant prefixes, so use a lower setting than pageservers.
     173              :     const CONCURRENCY: usize = 32;
     174              : 
     175              :     // Generate a stream of TenantTimelineId
     176            0 :     let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
     177            0 :     let timelines = timelines.try_buffered(CONCURRENCY);
     178            0 :     let timelines = timelines.try_flatten();
     179            0 : 
     180            0 :     // Generate a stream of S3TimelineBlobData
     181            0 :     async fn gc_timeline(
     182            0 :         s3_client: &Client,
     183            0 :         bucket_config: &BucketConfig,
     184            0 :         min_age: &Duration,
     185            0 :         target: &RootTarget,
     186            0 :         mode: GcMode,
     187            0 :         ttid: TenantShardTimelineId,
     188            0 :     ) -> anyhow::Result<GcSummary> {
     189            0 :         let mut summary = GcSummary::default();
     190            0 :         let data = list_timeline_blobs(s3_client, ttid, target).await?;
     191            0 : 
     192            0 :         let (latest_gen, candidates) = match &data.blob_data {
     193            0 :             BlobDataParseResult::Parsed {
     194            0 :                 index_part: _index_part,
     195            0 :                 index_part_generation,
     196            0 :                 s3_layers: _s3_layers,
     197            0 :             } => (*index_part_generation, data.unused_index_keys),
     198            0 :             BlobDataParseResult::Relic => {
     199            0 :                 // Post-deletion tenant location: don't try and GC it.
     200            0 :                 return Ok(summary);
     201            0 :             }
     202            0 :             BlobDataParseResult::Incorrect(reasons) => {
     203            0 :                 // Our primary purpose isn't to report on bad data, but log this rather than skipping silently
     204            0 :                 tracing::warn!("Skipping timeline {ttid}, bad metadata: {reasons:?}");
     205            0 :                 return Ok(summary);
     206            0 :             }
     207            0 :         };
     208            0 : 
     209            0 :         for key in candidates {
     210            0 :             maybe_delete_index(
     211            0 :                 s3_client,
     212            0 :                 bucket_config,
     213            0 :                 min_age,
     214            0 :                 latest_gen,
     215            0 :                 &key,
     216            0 :                 mode,
     217            0 :                 &mut summary,
     218            0 :             )
     219            0 :             .instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, key))
     220            0 :             .await;
     221            0 :         }
     222            0 : 
     223            0 :         Ok(summary)
     224            0 :     }
     225            0 :     let timelines = timelines
     226            0 :         .map_ok(|ttid| gc_timeline(&s3_client, &bucket_config, &min_age, &target, mode, ttid));
     227            0 :     let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
     228            0 : 
     229            0 :     let mut summary = GcSummary::default();
     230              : 
     231            0 :     while let Some(i) = timelines.next().await {
     232            0 :         let tl_summary = i?;
     233              : 
     234            0 :         summary.indices_deleted += tl_summary.indices_deleted;
     235            0 :         summary.remote_storage_errors += tl_summary.remote_storage_errors;
     236              :     }
     237              : 
     238            0 :     Ok(summary)
     239            0 : }
        

Generated by: LCOV version 2.1-beta