LCOV - code coverage report
Current view: top level - storage_scrubber/src - scan_pageserver_metadata.rs (source / functions) Coverage Total Hit
Test: a2f0f8a80fbf1089336086fa360ce27fa555cb1a.info Lines: 0.0 % 223 0
Test Date: 2024-11-20 17:59:39 Functions: 0.0 % 18 0

            Line data    Source code
       1              : use std::collections::{HashMap, HashSet};
       2              : 
       3              : use crate::checks::{
       4              :     branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult,
       5              :     RemoteTimelineBlobData, TenantObjectListing, TimelineAnalysis,
       6              : };
       7              : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
       8              : use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
       9              : use futures_util::{StreamExt, TryStreamExt};
      10              : use pageserver::tenant::remote_timeline_client::remote_layer_path;
      11              : use pageserver_api::controller_api::MetadataHealthUpdateRequest;
      12              : use pageserver_api::shard::TenantShardId;
      13              : use remote_storage::GenericRemoteStorage;
      14              : use serde::Serialize;
      15              : use tracing::{info_span, Instrument};
      16              : use utils::id::TenantId;
      17              : use utils::shard::ShardCount;
      18              : 
      19              : #[derive(Serialize, Default)]
      20              : pub struct MetadataSummary {
      21              :     tenant_count: usize,
      22              :     timeline_count: usize,
      23              :     timeline_shard_count: usize,
      24              :     with_errors: HashSet<TenantShardTimelineId>,
      25              :     with_warnings: HashSet<TenantShardTimelineId>,
      26              :     with_orphans: HashSet<TenantShardTimelineId>,
      27              :     indices_by_version: HashMap<usize, usize>,
      28              : 
      29              :     #[serde(skip)]
      30              :     pub(crate) healthy_tenant_shards: HashSet<TenantShardId>,
      31              :     #[serde(skip)]
      32              :     pub(crate) unhealthy_tenant_shards: HashSet<TenantShardId>,
      33              : }
      34              : 
      35              : impl MetadataSummary {
      36            0 :     fn new() -> Self {
      37            0 :         Self::default()
      38            0 :     }
      39              : 
      40            0 :     fn update_data(&mut self, data: &RemoteTimelineBlobData) {
      41            0 :         self.timeline_shard_count += 1;
      42              :         if let BlobDataParseResult::Parsed {
      43            0 :             index_part,
      44              :             index_part_generation: _,
      45              :             s3_layers: _,
      46            0 :         } = &data.blob_data
      47            0 :         {
      48            0 :             *self
      49            0 :                 .indices_by_version
      50            0 :                 .entry(index_part.version())
      51            0 :                 .or_insert(0) += 1;
      52            0 :         }
      53            0 :     }
      54              : 
      55            0 :     fn update_analysis(&mut self, id: &TenantShardTimelineId, analysis: &TimelineAnalysis) {
      56            0 :         if analysis.is_healthy() {
      57            0 :             self.healthy_tenant_shards.insert(id.tenant_shard_id);
      58            0 :         } else {
      59            0 :             self.healthy_tenant_shards.remove(&id.tenant_shard_id);
      60            0 :             self.unhealthy_tenant_shards.insert(id.tenant_shard_id);
      61            0 :         }
      62              : 
      63            0 :         if !analysis.errors.is_empty() {
      64            0 :             self.with_errors.insert(*id);
      65            0 :         }
      66              : 
      67            0 :         if !analysis.warnings.is_empty() {
      68            0 :             self.with_warnings.insert(*id);
      69            0 :         }
      70            0 :     }
      71              : 
      72            0 :     fn notify_timeline_orphan(&mut self, ttid: &TenantShardTimelineId) {
      73            0 :         self.with_orphans.insert(*ttid);
      74            0 :     }
      75              : 
      76              :     /// Long-form output for printing at end of a scan
      77            0 :     pub fn summary_string(&self) -> String {
      78            0 :         let version_summary: String = itertools::join(
      79            0 :             self.indices_by_version
      80            0 :                 .iter()
      81            0 :                 .map(|(k, v)| format!("{k}: {v}")),
      82            0 :             ", ",
      83            0 :         );
      84            0 : 
      85            0 :         format!(
      86            0 :             "Tenants: {}
      87            0 : Timelines: {}
      88            0 : Timeline-shards: {}
      89            0 : With errors: {}
      90            0 : With warnings: {}
      91            0 : With orphan layers: {}
      92            0 : Index versions: {version_summary}
      93            0 : ",
      94            0 :             self.tenant_count,
      95            0 :             self.timeline_count,
      96            0 :             self.timeline_shard_count,
      97            0 :             self.with_errors.len(),
      98            0 :             self.with_warnings.len(),
      99            0 :             self.with_orphans.len(),
     100            0 :         )
     101            0 :     }
     102              : 
     103            0 :     pub fn is_fatal(&self) -> bool {
     104            0 :         !self.with_errors.is_empty()
     105            0 :     }
     106              : 
     107            0 :     pub fn is_empty(&self) -> bool {
     108            0 :         self.timeline_shard_count == 0
     109            0 :     }
     110              : 
     111            0 :     pub fn build_health_update_request(&self) -> MetadataHealthUpdateRequest {
     112            0 :         MetadataHealthUpdateRequest {
     113            0 :             healthy_tenant_shards: self.healthy_tenant_shards.clone(),
     114            0 :             unhealthy_tenant_shards: self.unhealthy_tenant_shards.clone(),
     115            0 :         }
     116            0 :     }
     117              : }
     118              : 
     119              : /// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
     120            0 : pub async fn scan_pageserver_metadata(
     121            0 :     bucket_config: BucketConfig,
     122            0 :     tenant_ids: Vec<TenantShardId>,
     123            0 : ) -> anyhow::Result<MetadataSummary> {
     124            0 :     let (remote_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
     125              : 
     126            0 :     let tenants = if tenant_ids.is_empty() {
     127            0 :         futures::future::Either::Left(stream_tenants(&remote_client, &target))
     128              :     } else {
     129            0 :         futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
     130              :     };
     131              : 
     132              :     // How many tenants to process in parallel.  We need to be mindful of pageservers
     133              :     // accessing the same per tenant prefixes, so use a lower setting than pageservers.
     134              :     const CONCURRENCY: usize = 32;
     135              : 
     136              :     // Generate a stream of TenantTimelineId
     137            0 :     let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
     138            0 :     let timelines = timelines.try_buffered(CONCURRENCY);
     139            0 :     let timelines = timelines.try_flatten();
     140              : 
     141              :     // Generate a stream of S3TimelineBlobData
     142            0 :     async fn report_on_timeline(
     143            0 :         remote_client: &GenericRemoteStorage,
     144            0 :         target: &RootTarget,
     145            0 :         ttid: TenantShardTimelineId,
     146            0 :     ) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
     147            0 :         let data = list_timeline_blobs(remote_client, ttid, target).await?;
     148            0 :         Ok((ttid, data))
     149            0 :     }
     150            0 :     let timelines = timelines.map_ok(|ttid| report_on_timeline(&remote_client, &target, ttid));
     151            0 :     let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
     152            0 : 
     153            0 :     // We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
     154            0 :     // shards in the same tenant might refer to one anothers' keys if a shard split has happened.
     155            0 : 
     156            0 :     let mut tenant_id = None;
     157            0 :     let mut tenant_objects = TenantObjectListing::default();
     158            0 :     let mut tenant_timeline_results = Vec::new();
     159              : 
     160            0 :     async fn analyze_tenant(
     161            0 :         remote_client: &GenericRemoteStorage,
     162            0 :         tenant_id: TenantId,
     163            0 :         summary: &mut MetadataSummary,
     164            0 :         mut tenant_objects: TenantObjectListing,
     165            0 :         timelines: Vec<(TenantShardTimelineId, RemoteTimelineBlobData)>,
     166            0 :         highest_shard_count: ShardCount,
     167            0 :     ) {
     168            0 :         summary.tenant_count += 1;
     169            0 : 
     170            0 :         let mut timeline_ids = HashSet::new();
     171            0 :         let mut timeline_generations = HashMap::new();
     172            0 :         for (ttid, data) in timelines {
     173            0 :             async {
     174            0 :                 if ttid.tenant_shard_id.shard_count == highest_shard_count {
     175              :                     // Only analyze `TenantShardId`s with highest shard count.
     176              : 
     177              :                     // Stash the generation of each timeline, for later use identifying orphan layers
     178              :                     if let BlobDataParseResult::Parsed {
     179            0 :                         index_part,
     180            0 :                         index_part_generation,
     181            0 :                         s3_layers: _s3_layers,
     182            0 :                     } = &data.blob_data
     183              :                     {
     184            0 :                         if index_part.deleted_at.is_some() {
     185              :                             // skip deleted timeline.
     186            0 :                             tracing::info!(
     187            0 :                                 "Skip analysis of {} b/c timeline is already deleted",
     188              :                                 ttid
     189              :                             );
     190            0 :                             return;
     191            0 :                         }
     192            0 :                         timeline_generations.insert(ttid, *index_part_generation);
     193            0 :                     }
     194              : 
     195              :                     // Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
     196              :                     // reference counts for layers across the tenant.
     197            0 :                     let analysis = branch_cleanup_and_check_errors(
     198            0 :                         remote_client,
     199            0 :                         &ttid,
     200            0 :                         &mut tenant_objects,
     201            0 :                         None,
     202            0 :                         None,
     203            0 :                         Some(data),
     204            0 :                     )
     205            0 :                     .await;
     206            0 :                     summary.update_analysis(&ttid, &analysis);
     207            0 : 
     208            0 :                     timeline_ids.insert(ttid.timeline_id);
     209              :                 } else {
     210            0 :                     tracing::info!(
     211            0 :                         "Skip analysis of {} b/c a lower shard count than {}",
     212              :                         ttid,
     213              :                         highest_shard_count.0,
     214              :                     );
     215              :                 }
     216            0 :             }
     217              :             .instrument(
     218            0 :                 info_span!("analyze-timeline", shard = %ttid.tenant_shard_id.shard_slug(), timeline = %ttid.timeline_id),
     219              :             )
     220            0 :             .await
     221              :         }
     222              : 
     223            0 :         summary.timeline_count += timeline_ids.len();
     224              : 
     225              :         // Identifying orphan layers must be done on a tenant-wide basis, because individual
     226              :         // shards' layers may be referenced by other shards.
     227              :         //
     228              :         // Orphan layers are not a corruption, and not an indication of a problem.  They are just
     229              :         // consuming some space in remote storage, and may be cleaned up at leisure.
     230            0 :         for (shard_index, timeline_id, layer_file, generation) in tenant_objects.get_orphans() {
     231            0 :             let ttid = TenantShardTimelineId {
     232            0 :                 tenant_shard_id: TenantShardId {
     233            0 :                     tenant_id,
     234            0 :                     shard_count: shard_index.shard_count,
     235            0 :                     shard_number: shard_index.shard_number,
     236            0 :                 },
     237            0 :                 timeline_id,
     238            0 :             };
     239              : 
     240            0 :             if let Some(timeline_generation) = timeline_generations.get(&ttid) {
     241            0 :                 if &generation >= timeline_generation {
     242              :                     // Candidate orphan layer is in the current or future generation relative
     243              :                     // to the index we read for this timeline shard, so its absence from the index
     244              :                     // doesn't make it an orphan: more likely, it is a case where the layer was
     245              :                     // uploaded, but the index referencing the layer wasn't written yet.
     246            0 :                     continue;
     247            0 :                 }
     248            0 :             }
     249              : 
     250            0 :             let orphan_path = remote_layer_path(
     251            0 :                 &tenant_id,
     252            0 :                 &timeline_id,
     253            0 :                 shard_index,
     254            0 :                 &layer_file,
     255            0 :                 generation,
     256            0 :             );
     257            0 : 
     258            0 :             tracing::info!("Orphan layer detected: {orphan_path}");
     259              : 
     260            0 :             summary.notify_timeline_orphan(&ttid);
     261              :         }
     262            0 :     }
     263              : 
     264              :     // Iterate through  all the timeline results.  These are in key-order, so
     265              :     // all results for the same tenant will be adjacent.  We accumulate these,
     266              :     // and then call `analyze_tenant` to flush, when we see the next tenant ID.
     267            0 :     let mut summary = MetadataSummary::new();
     268            0 :     let mut highest_shard_count = ShardCount::MIN;
     269            0 :     while let Some(i) = timelines.next().await {
     270            0 :         let (ttid, data) = i?;
     271            0 :         summary.update_data(&data);
     272            0 : 
     273            0 :         match tenant_id {
     274            0 :             None => {
     275            0 :                 tenant_id = Some(ttid.tenant_shard_id.tenant_id);
     276            0 :                 highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
     277            0 :             }
     278            0 :             Some(prev_tenant_id) => {
     279            0 :                 if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
     280              :                     // New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
     281            0 :                     let tenant_objects = std::mem::take(&mut tenant_objects);
     282            0 :                     let timelines = std::mem::take(&mut tenant_timeline_results);
     283            0 :                     analyze_tenant(
     284            0 :                         &remote_client,
     285            0 :                         prev_tenant_id,
     286            0 :                         &mut summary,
     287            0 :                         tenant_objects,
     288            0 :                         timelines,
     289            0 :                         highest_shard_count,
     290            0 :                     )
     291            0 :                     .instrument(info_span!("analyze-tenant", tenant = %prev_tenant_id))
     292            0 :                     .await;
     293            0 :                     tenant_id = Some(ttid.tenant_shard_id.tenant_id);
     294            0 :                     highest_shard_count = ttid.tenant_shard_id.shard_count;
     295            0 :                 } else {
     296            0 :                     highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
     297            0 :                 }
     298              :             }
     299              :         }
     300              : 
     301            0 :         match &data.blob_data {
     302              :             BlobDataParseResult::Parsed {
     303            0 :                 index_part: _index_part,
     304            0 :                 index_part_generation: _index_part_generation,
     305            0 :                 s3_layers,
     306            0 :             } => {
     307            0 :                 tenant_objects.push(ttid, s3_layers.clone());
     308            0 :             }
     309            0 :             BlobDataParseResult::Relic => (),
     310              :             BlobDataParseResult::Incorrect {
     311              :                 errors: _,
     312            0 :                 s3_layers,
     313            0 :             } => {
     314            0 :                 tenant_objects.push(ttid, s3_layers.clone());
     315            0 :             }
     316              :         }
     317            0 :         tenant_timeline_results.push((ttid, data));
     318              :     }
     319              : 
     320            0 :     if !tenant_timeline_results.is_empty() {
     321            0 :         let tenant_id = tenant_id.expect("Must be set if results are present");
     322            0 :         analyze_tenant(
     323            0 :             &remote_client,
     324            0 :             tenant_id,
     325            0 :             &mut summary,
     326            0 :             tenant_objects,
     327            0 :             tenant_timeline_results,
     328            0 :             highest_shard_count,
     329            0 :         )
     330            0 :         .instrument(info_span!("analyze-tenant", tenant = %tenant_id))
     331            0 :         .await;
     332            0 :     }
     333              : 
     334            0 :     Ok(summary)
     335            0 : }
        

Generated by: LCOV version 2.1-beta