LCOV - code coverage report
Current view: top level - storage_scrubber/src - scan_pageserver_metadata.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 0.0 % 238 0
Test Date: 2025-03-12 00:01:28 Functions: 0.0 % 18 0

            Line data    Source code
       1              : use std::collections::{HashMap, HashSet};
       2              : 
       3              : use futures_util::{StreamExt, TryStreamExt};
       4              : use pageserver::tenant::remote_timeline_client::remote_layer_path;
       5              : use pageserver_api::controller_api::MetadataHealthUpdateRequest;
       6              : use pageserver_api::shard::TenantShardId;
       7              : use remote_storage::GenericRemoteStorage;
       8              : use serde::Serialize;
       9              : use tracing::{Instrument, info_span};
      10              : use utils::id::TenantId;
      11              : use utils::shard::ShardCount;
      12              : 
      13              : use crate::checks::{
      14              :     BlobDataParseResult, RemoteTimelineBlobData, TenantObjectListing, TimelineAnalysis,
      15              :     branch_cleanup_and_check_errors, list_timeline_blobs,
      16              : };
      17              : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
      18              : use crate::{BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, init_remote};
      19              : 
      20              : #[derive(Serialize, Default)]
      21              : pub struct MetadataSummary {
      22              :     tenant_count: usize,
      23              :     timeline_count: usize,
      24              :     timeline_shard_count: usize,
      25              :     /// Tenant-shard timeline (key) mapping to errors. The key has to be a string because it will be serialized to a JSON.
      26              :     /// The key is generated using `TenantShardTimelineId::to_string()`.
      27              :     with_errors: HashMap<String, Vec<String>>,
      28              :     /// Tenant-shard timeline (key) mapping to warnings. The key has to be a string because it will be serialized to a JSON.
      29              :     /// The key is generated using `TenantShardTimelineId::to_string()`.
      30              :     with_warnings: HashMap<String, Vec<String>>,
      31              :     with_orphans: HashSet<TenantShardTimelineId>,
      32              :     indices_by_version: HashMap<usize, usize>,
      33              : 
      34              :     #[serde(skip)]
      35              :     pub(crate) healthy_tenant_shards: HashSet<TenantShardId>,
      36              :     #[serde(skip)]
      37              :     pub(crate) unhealthy_tenant_shards: HashSet<TenantShardId>,
      38              : }
      39              : 
      40              : impl MetadataSummary {
      41            0 :     fn new() -> Self {
      42            0 :         Self::default()
      43            0 :     }
      44              : 
      45            0 :     fn update_data(&mut self, data: &RemoteTimelineBlobData) {
      46            0 :         self.timeline_shard_count += 1;
      47              :         if let BlobDataParseResult::Parsed {
      48            0 :             index_part,
      49              :             index_part_generation: _,
      50              :             s3_layers: _,
      51              :             index_part_last_modified_time: _,
      52              :             index_part_snapshot_time: _,
      53            0 :         } = &data.blob_data
      54            0 :         {
      55            0 :             *self
      56            0 :                 .indices_by_version
      57            0 :                 .entry(index_part.version())
      58            0 :                 .or_insert(0) += 1;
      59            0 :         }
      60            0 :     }
      61              : 
      62            0 :     fn update_analysis(
      63            0 :         &mut self,
      64            0 :         id: &TenantShardTimelineId,
      65            0 :         analysis: &TimelineAnalysis,
      66            0 :         verbose: bool,
      67            0 :     ) {
      68            0 :         if analysis.is_healthy() {
      69            0 :             self.healthy_tenant_shards.insert(id.tenant_shard_id);
      70            0 :         } else {
      71            0 :             self.healthy_tenant_shards.remove(&id.tenant_shard_id);
      72            0 :             self.unhealthy_tenant_shards.insert(id.tenant_shard_id);
      73            0 :         }
      74              : 
      75            0 :         if !analysis.errors.is_empty() {
      76            0 :             let entry = self.with_errors.entry(id.to_string()).or_default();
      77            0 :             if verbose {
      78            0 :                 entry.extend(analysis.errors.iter().cloned());
      79            0 :             }
      80            0 :         }
      81              : 
      82            0 :         if !analysis.warnings.is_empty() {
      83            0 :             let entry = self.with_warnings.entry(id.to_string()).or_default();
      84            0 :             if verbose {
      85            0 :                 entry.extend(analysis.warnings.iter().cloned());
      86            0 :             }
      87            0 :         }
      88            0 :     }
      89              : 
      90            0 :     fn notify_timeline_orphan(&mut self, ttid: &TenantShardTimelineId) {
      91            0 :         self.with_orphans.insert(*ttid);
      92            0 :     }
      93              : 
      94              :     /// Long-form output for printing at end of a scan
      95            0 :     pub fn summary_string(&self) -> String {
      96            0 :         let version_summary: String = itertools::join(
      97            0 :             self.indices_by_version
      98            0 :                 .iter()
      99            0 :                 .map(|(k, v)| format!("{k}: {v}")),
     100            0 :             ", ",
     101            0 :         );
     102            0 : 
     103            0 :         format!(
     104            0 :             "Tenants: {}
     105            0 : Timelines: {}
     106            0 : Timeline-shards: {}
     107            0 : With errors: {}
     108            0 : With warnings: {}
     109            0 : With orphan layers: {}
     110            0 : Index versions: {version_summary}
     111            0 : ",
     112            0 :             self.tenant_count,
     113            0 :             self.timeline_count,
     114            0 :             self.timeline_shard_count,
     115            0 :             self.with_errors.len(),
     116            0 :             self.with_warnings.len(),
     117            0 :             self.with_orphans.len(),
     118            0 :         )
     119            0 :     }
     120              : 
     121            0 :     pub fn is_fatal(&self) -> bool {
     122            0 :         !self.with_errors.is_empty()
     123            0 :     }
     124              : 
     125            0 :     pub fn is_empty(&self) -> bool {
     126            0 :         self.timeline_shard_count == 0
     127            0 :     }
     128              : 
     129            0 :     pub fn build_health_update_request(&self) -> MetadataHealthUpdateRequest {
     130            0 :         MetadataHealthUpdateRequest {
     131            0 :             healthy_tenant_shards: self.healthy_tenant_shards.clone(),
     132            0 :             unhealthy_tenant_shards: self.unhealthy_tenant_shards.clone(),
     133            0 :         }
     134            0 :     }
     135              : }
     136              : 
     137              : /// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
     138            0 : pub async fn scan_pageserver_metadata(
     139            0 :     bucket_config: BucketConfig,
     140            0 :     tenant_ids: Vec<TenantShardId>,
     141            0 :     verbose: bool,
     142            0 : ) -> anyhow::Result<MetadataSummary> {
     143            0 :     let (remote_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
     144              : 
     145            0 :     let tenants = if tenant_ids.is_empty() {
     146            0 :         futures::future::Either::Left(stream_tenants(&remote_client, &target))
     147              :     } else {
     148            0 :         futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
     149              :     };
     150              : 
     151              :     // How many tenants to process in parallel.  We need to be mindful of pageservers
     152              :     // accessing the same per tenant prefixes, so use a lower setting than pageservers.
     153              :     const CONCURRENCY: usize = 32;
     154              : 
     155              :     // Generate a stream of TenantTimelineId
     156            0 :     let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
     157            0 :     let timelines = timelines.try_buffered(CONCURRENCY);
     158            0 :     let timelines = timelines.try_flatten();
     159              : 
     160              :     // Generate a stream of S3TimelineBlobData
     161            0 :     async fn report_on_timeline(
     162            0 :         remote_client: &GenericRemoteStorage,
     163            0 :         target: &RootTarget,
     164            0 :         ttid: TenantShardTimelineId,
     165            0 :     ) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
     166            0 :         let data = list_timeline_blobs(remote_client, ttid, target).await?;
     167            0 :         Ok((ttid, data))
     168            0 :     }
     169            0 :     let timelines = timelines.map_ok(|ttid| report_on_timeline(&remote_client, &target, ttid));
     170            0 :     let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
     171            0 : 
     172            0 :     // We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
     173            0 :     // shards in the same tenant might refer to one anothers' keys if a shard split has happened.
     174            0 : 
     175            0 :     let mut tenant_id = None;
     176            0 :     let mut tenant_objects = TenantObjectListing::default();
     177            0 :     let mut tenant_timeline_results = Vec::new();
     178              : 
     179            0 :     async fn analyze_tenant(
     180            0 :         remote_client: &GenericRemoteStorage,
     181            0 :         tenant_id: TenantId,
     182            0 :         summary: &mut MetadataSummary,
     183            0 :         mut tenant_objects: TenantObjectListing,
     184            0 :         timelines: Vec<(TenantShardTimelineId, RemoteTimelineBlobData)>,
     185            0 :         highest_shard_count: ShardCount,
     186            0 :         verbose: bool,
     187            0 :     ) {
     188            0 :         summary.tenant_count += 1;
     189            0 : 
     190            0 :         let mut timeline_ids = HashSet::new();
     191            0 :         let mut timeline_generations = HashMap::new();
     192            0 :         for (ttid, data) in timelines {
     193            0 :             async {
     194            0 :                 if ttid.tenant_shard_id.shard_count == highest_shard_count {
     195              :                     // Only analyze `TenantShardId`s with highest shard count.
     196              : 
     197              :                     // Stash the generation of each timeline, for later use identifying orphan layers
     198              :                     if let BlobDataParseResult::Parsed {
     199            0 :                         index_part,
     200            0 :                         index_part_generation,
     201              :                         s3_layers: _,
     202              :                         index_part_last_modified_time: _,
     203              :                         index_part_snapshot_time: _,
     204            0 :                     } = &data.blob_data
     205              :                     {
     206            0 :                         if index_part.deleted_at.is_some() {
     207              :                             // skip deleted timeline.
     208            0 :                             tracing::info!(
     209            0 :                                 "Skip analysis of {} b/c timeline is already deleted",
     210              :                                 ttid
     211              :                             );
     212            0 :                             return;
     213            0 :                         }
     214            0 :                         timeline_generations.insert(ttid, *index_part_generation);
     215            0 :                     }
     216              : 
     217              :                     // Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
     218              :                     // reference counts for layers across the tenant.
     219            0 :                     let analysis = branch_cleanup_and_check_errors(
     220            0 :                         remote_client,
     221            0 :                         &ttid,
     222            0 :                         &mut tenant_objects,
     223            0 :                         None,
     224            0 :                         None,
     225            0 :                         Some(data),
     226            0 :                     )
     227            0 :                     .await;
     228            0 :                     summary.update_analysis(&ttid, &analysis, verbose);
     229            0 : 
     230            0 :                     timeline_ids.insert(ttid.timeline_id);
     231              :                 } else {
     232            0 :                     tracing::info!(
     233            0 :                         "Skip analysis of {} b/c a lower shard count than {}",
     234              :                         ttid,
     235              :                         highest_shard_count.0,
     236              :                     );
     237              :                 }
     238            0 :             }
     239              :             .instrument(
     240            0 :                 info_span!("analyze-timeline", shard = %ttid.tenant_shard_id.shard_slug(), timeline = %ttid.timeline_id),
     241              :             )
     242            0 :             .await
     243              :         }
     244              : 
     245            0 :         summary.timeline_count += timeline_ids.len();
     246              : 
     247              :         // Identifying orphan layers must be done on a tenant-wide basis, because individual
     248              :         // shards' layers may be referenced by other shards.
     249              :         //
     250              :         // Orphan layers are not a corruption, and not an indication of a problem.  They are just
     251              :         // consuming some space in remote storage, and may be cleaned up at leisure.
     252            0 :         for (shard_index, timeline_id, layer_file, generation) in tenant_objects.get_orphans() {
     253            0 :             let ttid = TenantShardTimelineId {
     254            0 :                 tenant_shard_id: TenantShardId {
     255            0 :                     tenant_id,
     256            0 :                     shard_count: shard_index.shard_count,
     257            0 :                     shard_number: shard_index.shard_number,
     258            0 :                 },
     259            0 :                 timeline_id,
     260            0 :             };
     261              : 
     262            0 :             if let Some(timeline_generation) = timeline_generations.get(&ttid) {
     263            0 :                 if &generation >= timeline_generation {
     264              :                     // Candidate orphan layer is in the current or future generation relative
     265              :                     // to the index we read for this timeline shard, so its absence from the index
     266              :                     // doesn't make it an orphan: more likely, it is a case where the layer was
     267              :                     // uploaded, but the index referencing the layer wasn't written yet.
     268            0 :                     continue;
     269            0 :                 }
     270            0 :             }
     271              : 
     272            0 :             let orphan_path = remote_layer_path(
     273            0 :                 &tenant_id,
     274            0 :                 &timeline_id,
     275            0 :                 shard_index,
     276            0 :                 &layer_file,
     277            0 :                 generation,
     278            0 :             );
     279            0 : 
     280            0 :             tracing::info!("Orphan layer detected: {orphan_path}");
     281              : 
     282            0 :             summary.notify_timeline_orphan(&ttid);
     283              :         }
     284            0 :     }
     285              : 
     286              :     // Iterate through  all the timeline results.  These are in key-order, so
     287              :     // all results for the same tenant will be adjacent.  We accumulate these,
     288              :     // and then call `analyze_tenant` to flush, when we see the next tenant ID.
     289            0 :     let mut summary = MetadataSummary::new();
     290            0 :     let mut highest_shard_count = ShardCount::MIN;
     291            0 :     while let Some(i) = timelines.next().await {
     292            0 :         let (ttid, data) = i?;
     293            0 :         summary.update_data(&data);
     294            0 : 
     295            0 :         match tenant_id {
     296            0 :             Some(prev_tenant_id) => {
     297            0 :                 if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
     298              :                     // New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
     299            0 :                     let tenant_objects = std::mem::take(&mut tenant_objects);
     300            0 :                     let timelines = std::mem::take(&mut tenant_timeline_results);
     301            0 :                     analyze_tenant(
     302            0 :                         &remote_client,
     303            0 :                         prev_tenant_id,
     304            0 :                         &mut summary,
     305            0 :                         tenant_objects,
     306            0 :                         timelines,
     307            0 :                         highest_shard_count,
     308            0 :                         verbose,
     309            0 :                     )
     310            0 :                     .instrument(info_span!("analyze-tenant", tenant = %prev_tenant_id))
     311            0 :                     .await;
     312            0 :                     tenant_id = Some(ttid.tenant_shard_id.tenant_id);
     313            0 :                     highest_shard_count = ttid.tenant_shard_id.shard_count;
     314            0 :                 } else {
     315            0 :                     highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
     316            0 :                 }
     317              :             }
     318            0 :             None => {
     319            0 :                 tenant_id = Some(ttid.tenant_shard_id.tenant_id);
     320            0 :                 highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
     321            0 :             }
     322              :         }
     323              : 
     324            0 :         match &data.blob_data {
     325              :             BlobDataParseResult::Parsed {
     326              :                 index_part: _,
     327            0 :                 index_part_generation: _index_part_generation,
     328            0 :                 s3_layers,
     329            0 :                 index_part_last_modified_time: _,
     330            0 :                 index_part_snapshot_time: _,
     331            0 :             } => {
     332            0 :                 tenant_objects.push(ttid, s3_layers.clone());
     333            0 :             }
     334            0 :             BlobDataParseResult::Relic => (),
     335              :             BlobDataParseResult::Incorrect {
     336              :                 errors: _,
     337            0 :                 s3_layers,
     338            0 :             } => {
     339            0 :                 tenant_objects.push(ttid, s3_layers.clone());
     340            0 :             }
     341              :         }
     342            0 :         tenant_timeline_results.push((ttid, data));
     343              :     }
     344              : 
     345            0 :     if !tenant_timeline_results.is_empty() {
     346            0 :         let tenant_id = tenant_id.expect("Must be set if results are present");
     347            0 :         analyze_tenant(
     348            0 :             &remote_client,
     349            0 :             tenant_id,
     350            0 :             &mut summary,
     351            0 :             tenant_objects,
     352            0 :             tenant_timeline_results,
     353            0 :             highest_shard_count,
     354            0 :             verbose,
     355            0 :         )
     356            0 :         .instrument(info_span!("analyze-tenant", tenant = %tenant_id))
     357            0 :         .await;
     358            0 :     }
     359              : 
     360            0 :     Ok(summary)
     361            0 : }
        

Generated by: LCOV version 2.1-beta