LCOV - code coverage report
Current view: top level - storage_scrubber/src - scan_pageserver_metadata.rs (source / functions) Coverage Total Hit
Test: 4f58e98c51285c7fa348e0b410c88a10caf68ad2.info Lines: 0.0 % 238 0
Test Date: 2025-01-07 20:58:07 Functions: 0.0 % 18 0

            Line data    Source code
       1              : use std::collections::{HashMap, HashSet};
       2              : 
       3              : use crate::checks::{
       4              :     branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult,
       5              :     RemoteTimelineBlobData, TenantObjectListing, TimelineAnalysis,
       6              : };
       7              : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
       8              : use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
       9              : use futures_util::{StreamExt, TryStreamExt};
      10              : use pageserver::tenant::remote_timeline_client::remote_layer_path;
      11              : use pageserver_api::controller_api::MetadataHealthUpdateRequest;
      12              : use pageserver_api::shard::TenantShardId;
      13              : use remote_storage::GenericRemoteStorage;
      14              : use serde::Serialize;
      15              : use tracing::{info_span, Instrument};
      16              : use utils::id::TenantId;
      17              : use utils::shard::ShardCount;
      18              : 
      19              : #[derive(Serialize, Default)]
      20              : pub struct MetadataSummary {
      21              :     tenant_count: usize,
      22              :     timeline_count: usize,
      23              :     timeline_shard_count: usize,
      24              :     /// Tenant-shard timeline (key) mapping to errors. The key has to be a string because it will be serialized to a JSON.
      25              :     /// The key is generated using `TenantShardTimelineId::to_string()`.
      26              :     with_errors: HashMap<String, Vec<String>>,
      27              :     /// Tenant-shard timeline (key) mapping to warnings. The key has to be a string because it will be serialized to a JSON.
      28              :     /// The key is generated using `TenantShardTimelineId::to_string()`.
      29              :     with_warnings: HashMap<String, Vec<String>>,
      30              :     with_orphans: HashSet<TenantShardTimelineId>,
      31              :     indices_by_version: HashMap<usize, usize>,
      32              : 
      33              :     #[serde(skip)]
      34              :     pub(crate) healthy_tenant_shards: HashSet<TenantShardId>,
      35              :     #[serde(skip)]
      36              :     pub(crate) unhealthy_tenant_shards: HashSet<TenantShardId>,
      37              : }
      38              : 
      39              : impl MetadataSummary {
      40            0 :     fn new() -> Self {
      41            0 :         Self::default()
      42            0 :     }
      43              : 
      44            0 :     fn update_data(&mut self, data: &RemoteTimelineBlobData) {
      45            0 :         self.timeline_shard_count += 1;
      46              :         if let BlobDataParseResult::Parsed {
      47            0 :             index_part,
      48              :             index_part_generation: _,
      49              :             s3_layers: _,
      50            0 :         } = &data.blob_data
      51            0 :         {
      52            0 :             *self
      53            0 :                 .indices_by_version
      54            0 :                 .entry(index_part.version())
      55            0 :                 .or_insert(0) += 1;
      56            0 :         }
      57            0 :     }
      58              : 
      59            0 :     fn update_analysis(
      60            0 :         &mut self,
      61            0 :         id: &TenantShardTimelineId,
      62            0 :         analysis: &TimelineAnalysis,
      63            0 :         verbose: bool,
      64            0 :     ) {
      65            0 :         if analysis.is_healthy() {
      66            0 :             self.healthy_tenant_shards.insert(id.tenant_shard_id);
      67            0 :         } else {
      68            0 :             self.healthy_tenant_shards.remove(&id.tenant_shard_id);
      69            0 :             self.unhealthy_tenant_shards.insert(id.tenant_shard_id);
      70            0 :         }
      71              : 
      72            0 :         if !analysis.errors.is_empty() {
      73            0 :             let entry = self.with_errors.entry(id.to_string()).or_default();
      74            0 :             if verbose {
      75            0 :                 entry.extend(analysis.errors.iter().cloned());
      76            0 :             }
      77            0 :         }
      78              : 
      79            0 :         if !analysis.warnings.is_empty() {
      80            0 :             let entry = self.with_warnings.entry(id.to_string()).or_default();
      81            0 :             if verbose {
      82            0 :                 entry.extend(analysis.warnings.iter().cloned());
      83            0 :             }
      84            0 :         }
      85            0 :     }
      86              : 
      87            0 :     fn notify_timeline_orphan(&mut self, ttid: &TenantShardTimelineId) {
      88            0 :         self.with_orphans.insert(*ttid);
      89            0 :     }
      90              : 
      91              :     /// Long-form output for printing at end of a scan
      92            0 :     pub fn summary_string(&self) -> String {
      93            0 :         let version_summary: String = itertools::join(
      94            0 :             self.indices_by_version
      95            0 :                 .iter()
      96            0 :                 .map(|(k, v)| format!("{k}: {v}")),
      97            0 :             ", ",
      98            0 :         );
      99            0 : 
     100            0 :         format!(
     101            0 :             "Tenants: {}
     102            0 : Timelines: {}
     103            0 : Timeline-shards: {}
     104            0 : With errors: {}
     105            0 : With warnings: {}
     106            0 : With orphan layers: {}
     107            0 : Index versions: {version_summary}
     108            0 : ",
     109            0 :             self.tenant_count,
     110            0 :             self.timeline_count,
     111            0 :             self.timeline_shard_count,
     112            0 :             self.with_errors.len(),
     113            0 :             self.with_warnings.len(),
     114            0 :             self.with_orphans.len(),
     115            0 :         )
     116            0 :     }
     117              : 
     118            0 :     pub fn is_fatal(&self) -> bool {
     119            0 :         !self.with_errors.is_empty()
     120            0 :     }
     121              : 
     122            0 :     pub fn is_empty(&self) -> bool {
     123            0 :         self.timeline_shard_count == 0
     124            0 :     }
     125              : 
     126            0 :     pub fn build_health_update_request(&self) -> MetadataHealthUpdateRequest {
     127            0 :         MetadataHealthUpdateRequest {
     128            0 :             healthy_tenant_shards: self.healthy_tenant_shards.clone(),
     129            0 :             unhealthy_tenant_shards: self.unhealthy_tenant_shards.clone(),
     130            0 :         }
     131            0 :     }
     132              : }
     133              : 
     134              : /// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
     135            0 : pub async fn scan_pageserver_metadata(
     136            0 :     bucket_config: BucketConfig,
     137            0 :     tenant_ids: Vec<TenantShardId>,
     138            0 :     verbose: bool,
     139            0 : ) -> anyhow::Result<MetadataSummary> {
     140            0 :     let (remote_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
     141              : 
     142            0 :     let tenants = if tenant_ids.is_empty() {
     143            0 :         futures::future::Either::Left(stream_tenants(&remote_client, &target))
     144              :     } else {
     145            0 :         futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
     146              :     };
     147              : 
     148              :     // How many tenants to process in parallel.  We need to be mindful of pageservers
     149              :     // accessing the same per tenant prefixes, so use a lower setting than pageservers.
     150              :     const CONCURRENCY: usize = 32;
     151              : 
     152              :     // Generate a stream of TenantTimelineId
     153            0 :     let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
     154            0 :     let timelines = timelines.try_buffered(CONCURRENCY);
     155            0 :     let timelines = timelines.try_flatten();
     156              : 
     157              :     // Generate a stream of S3TimelineBlobData
     158            0 :     async fn report_on_timeline(
     159            0 :         remote_client: &GenericRemoteStorage,
     160            0 :         target: &RootTarget,
     161            0 :         ttid: TenantShardTimelineId,
     162            0 :     ) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
     163            0 :         let data = list_timeline_blobs(remote_client, ttid, target).await?;
     164            0 :         Ok((ttid, data))
     165            0 :     }
     166            0 :     let timelines = timelines.map_ok(|ttid| report_on_timeline(&remote_client, &target, ttid));
     167            0 :     let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
     168            0 : 
     169            0 :     // We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
     170            0 :     // shards in the same tenant might refer to one anothers' keys if a shard split has happened.
     171            0 : 
     172            0 :     let mut tenant_id = None;
     173            0 :     let mut tenant_objects = TenantObjectListing::default();
     174            0 :     let mut tenant_timeline_results = Vec::new();
     175              : 
     176            0 :     async fn analyze_tenant(
     177            0 :         remote_client: &GenericRemoteStorage,
     178            0 :         tenant_id: TenantId,
     179            0 :         summary: &mut MetadataSummary,
     180            0 :         mut tenant_objects: TenantObjectListing,
     181            0 :         timelines: Vec<(TenantShardTimelineId, RemoteTimelineBlobData)>,
     182            0 :         highest_shard_count: ShardCount,
     183            0 :         verbose: bool,
     184            0 :     ) {
     185            0 :         summary.tenant_count += 1;
     186            0 : 
     187            0 :         let mut timeline_ids = HashSet::new();
     188            0 :         let mut timeline_generations = HashMap::new();
     189            0 :         for (ttid, data) in timelines {
     190            0 :             async {
     191            0 :                 if ttid.tenant_shard_id.shard_count == highest_shard_count {
     192              :                     // Only analyze `TenantShardId`s with highest shard count.
     193              : 
     194              :                     // Stash the generation of each timeline, for later use identifying orphan layers
     195              :                     if let BlobDataParseResult::Parsed {
     196            0 :                         index_part,
     197            0 :                         index_part_generation,
     198            0 :                         s3_layers: _s3_layers,
     199            0 :                     } = &data.blob_data
     200              :                     {
     201            0 :                         if index_part.deleted_at.is_some() {
     202              :                             // skip deleted timeline.
     203            0 :                             tracing::info!(
     204            0 :                                 "Skip analysis of {} b/c timeline is already deleted",
     205              :                                 ttid
     206              :                             );
     207            0 :                             return;
     208            0 :                         }
     209            0 :                         timeline_generations.insert(ttid, *index_part_generation);
     210            0 :                     }
     211              : 
     212              :                     // Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
     213              :                     // reference counts for layers across the tenant.
     214            0 :                     let analysis = branch_cleanup_and_check_errors(
     215            0 :                         remote_client,
     216            0 :                         &ttid,
     217            0 :                         &mut tenant_objects,
     218            0 :                         None,
     219            0 :                         None,
     220            0 :                         Some(data),
     221            0 :                     )
     222            0 :                     .await;
     223            0 :                     summary.update_analysis(&ttid, &analysis, verbose);
     224            0 : 
     225            0 :                     timeline_ids.insert(ttid.timeline_id);
     226              :                 } else {
     227            0 :                     tracing::info!(
     228            0 :                         "Skip analysis of {} b/c a lower shard count than {}",
     229              :                         ttid,
     230              :                         highest_shard_count.0,
     231              :                     );
     232              :                 }
     233            0 :             }
     234              :             .instrument(
     235            0 :                 info_span!("analyze-timeline", shard = %ttid.tenant_shard_id.shard_slug(), timeline = %ttid.timeline_id),
     236              :             )
     237            0 :             .await
     238              :         }
     239              : 
     240            0 :         summary.timeline_count += timeline_ids.len();
     241              : 
     242              :         // Identifying orphan layers must be done on a tenant-wide basis, because individual
     243              :         // shards' layers may be referenced by other shards.
     244              :         //
     245              :         // Orphan layers are not a corruption, and not an indication of a problem.  They are just
     246              :         // consuming some space in remote storage, and may be cleaned up at leisure.
     247            0 :         for (shard_index, timeline_id, layer_file, generation) in tenant_objects.get_orphans() {
     248            0 :             let ttid = TenantShardTimelineId {
     249            0 :                 tenant_shard_id: TenantShardId {
     250            0 :                     tenant_id,
     251            0 :                     shard_count: shard_index.shard_count,
     252            0 :                     shard_number: shard_index.shard_number,
     253            0 :                 },
     254            0 :                 timeline_id,
     255            0 :             };
     256              : 
     257            0 :             if let Some(timeline_generation) = timeline_generations.get(&ttid) {
     258            0 :                 if &generation >= timeline_generation {
     259              :                     // Candidate orphan layer is in the current or future generation relative
     260              :                     // to the index we read for this timeline shard, so its absence from the index
     261              :                     // doesn't make it an orphan: more likely, it is a case where the layer was
     262              :                     // uploaded, but the index referencing the layer wasn't written yet.
     263            0 :                     continue;
     264            0 :                 }
     265            0 :             }
     266              : 
     267            0 :             let orphan_path = remote_layer_path(
     268            0 :                 &tenant_id,
     269            0 :                 &timeline_id,
     270            0 :                 shard_index,
     271            0 :                 &layer_file,
     272            0 :                 generation,
     273            0 :             );
     274            0 : 
     275            0 :             tracing::info!("Orphan layer detected: {orphan_path}");
     276              : 
     277            0 :             summary.notify_timeline_orphan(&ttid);
     278              :         }
     279            0 :     }
     280              : 
     281              :     // Iterate through  all the timeline results.  These are in key-order, so
     282              :     // all results for the same tenant will be adjacent.  We accumulate these,
     283              :     // and then call `analyze_tenant` to flush, when we see the next tenant ID.
     284            0 :     let mut summary = MetadataSummary::new();
     285            0 :     let mut highest_shard_count = ShardCount::MIN;
     286            0 :     while let Some(i) = timelines.next().await {
     287            0 :         let (ttid, data) = i?;
     288            0 :         summary.update_data(&data);
     289            0 : 
     290            0 :         match tenant_id {
     291            0 :             Some(prev_tenant_id) => {
     292            0 :                 if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
     293              :                     // New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
     294            0 :                     let tenant_objects = std::mem::take(&mut tenant_objects);
     295            0 :                     let timelines = std::mem::take(&mut tenant_timeline_results);
     296            0 :                     analyze_tenant(
     297            0 :                         &remote_client,
     298            0 :                         prev_tenant_id,
     299            0 :                         &mut summary,
     300            0 :                         tenant_objects,
     301            0 :                         timelines,
     302            0 :                         highest_shard_count,
     303            0 :                         verbose,
     304            0 :                     )
     305            0 :                     .instrument(info_span!("analyze-tenant", tenant = %prev_tenant_id))
     306            0 :                     .await;
     307            0 :                     tenant_id = Some(ttid.tenant_shard_id.tenant_id);
     308            0 :                     highest_shard_count = ttid.tenant_shard_id.shard_count;
     309            0 :                 } else {
     310            0 :                     highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
     311            0 :                 }
     312              :             }
     313            0 :             None => {
     314            0 :                 tenant_id = Some(ttid.tenant_shard_id.tenant_id);
     315            0 :                 highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
     316            0 :             }
     317              :         }
     318              : 
     319            0 :         match &data.blob_data {
     320              :             BlobDataParseResult::Parsed {
     321            0 :                 index_part: _index_part,
     322            0 :                 index_part_generation: _index_part_generation,
     323            0 :                 s3_layers,
     324            0 :             } => {
     325            0 :                 tenant_objects.push(ttid, s3_layers.clone());
     326            0 :             }
     327            0 :             BlobDataParseResult::Relic => (),
     328              :             BlobDataParseResult::Incorrect {
     329              :                 errors: _,
     330            0 :                 s3_layers,
     331            0 :             } => {
     332            0 :                 tenant_objects.push(ttid, s3_layers.clone());
     333            0 :             }
     334              :         }
     335            0 :         tenant_timeline_results.push((ttid, data));
     336              :     }
     337              : 
     338            0 :     if !tenant_timeline_results.is_empty() {
     339            0 :         let tenant_id = tenant_id.expect("Must be set if results are present");
     340            0 :         analyze_tenant(
     341            0 :             &remote_client,
     342            0 :             tenant_id,
     343            0 :             &mut summary,
     344            0 :             tenant_objects,
     345            0 :             tenant_timeline_results,
     346            0 :             highest_shard_count,
     347            0 :             verbose,
     348            0 :         )
     349            0 :         .instrument(info_span!("analyze-tenant", tenant = %tenant_id))
     350            0 :         .await;
     351            0 :     }
     352              : 
     353            0 :     Ok(summary)
     354            0 : }
        

Generated by: LCOV version 2.1-beta