LCOV - code coverage report
Current view: top level - storage_scrubber/src - scan_pageserver_metadata.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 0.0 % 238 0
Test Date: 2025-02-20 13:11:02 Functions: 0.0 % 18 0

            Line data    Source code
       1              : use std::collections::{HashMap, HashSet};
       2              : 
       3              : use crate::checks::{
       4              :     branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult,
       5              :     RemoteTimelineBlobData, TenantObjectListing, TimelineAnalysis,
       6              : };
       7              : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
       8              : use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
       9              : use futures_util::{StreamExt, TryStreamExt};
      10              : use pageserver::tenant::remote_timeline_client::remote_layer_path;
      11              : use pageserver_api::controller_api::MetadataHealthUpdateRequest;
      12              : use pageserver_api::shard::TenantShardId;
      13              : use remote_storage::GenericRemoteStorage;
      14              : use serde::Serialize;
      15              : use tracing::{info_span, Instrument};
      16              : use utils::id::TenantId;
      17              : use utils::shard::ShardCount;
      18              : 
      19              : #[derive(Serialize, Default)]
      20              : pub struct MetadataSummary {
      21              :     tenant_count: usize,
      22              :     timeline_count: usize,
      23              :     timeline_shard_count: usize,
      24              :     /// Tenant-shard timeline (key) mapping to errors. The key has to be a string because it will be serialized to a JSON.
      25              :     /// The key is generated using `TenantShardTimelineId::to_string()`.
      26              :     with_errors: HashMap<String, Vec<String>>,
      27              :     /// Tenant-shard timeline (key) mapping to warnings. The key has to be a string because it will be serialized to a JSON.
      28              :     /// The key is generated using `TenantShardTimelineId::to_string()`.
      29              :     with_warnings: HashMap<String, Vec<String>>,
      30              :     with_orphans: HashSet<TenantShardTimelineId>,
      31              :     indices_by_version: HashMap<usize, usize>,
      32              : 
      33              :     #[serde(skip)]
      34              :     pub(crate) healthy_tenant_shards: HashSet<TenantShardId>,
      35              :     #[serde(skip)]
      36              :     pub(crate) unhealthy_tenant_shards: HashSet<TenantShardId>,
      37              : }
      38              : 
      39              : impl MetadataSummary {
      40            0 :     fn new() -> Self {
      41            0 :         Self::default()
      42            0 :     }
      43              : 
      44            0 :     fn update_data(&mut self, data: &RemoteTimelineBlobData) {
      45            0 :         self.timeline_shard_count += 1;
      46              :         if let BlobDataParseResult::Parsed {
      47            0 :             index_part,
      48              :             index_part_generation: _,
      49              :             s3_layers: _,
      50              :             index_part_last_modified_time: _,
      51              :             index_part_snapshot_time: _,
      52            0 :         } = &data.blob_data
      53            0 :         {
      54            0 :             *self
      55            0 :                 .indices_by_version
      56            0 :                 .entry(index_part.version())
      57            0 :                 .or_insert(0) += 1;
      58            0 :         }
      59            0 :     }
      60              : 
      61            0 :     fn update_analysis(
      62            0 :         &mut self,
      63            0 :         id: &TenantShardTimelineId,
      64            0 :         analysis: &TimelineAnalysis,
      65            0 :         verbose: bool,
      66            0 :     ) {
      67            0 :         if analysis.is_healthy() {
      68            0 :             self.healthy_tenant_shards.insert(id.tenant_shard_id);
      69            0 :         } else {
      70            0 :             self.healthy_tenant_shards.remove(&id.tenant_shard_id);
      71            0 :             self.unhealthy_tenant_shards.insert(id.tenant_shard_id);
      72            0 :         }
      73              : 
      74            0 :         if !analysis.errors.is_empty() {
      75            0 :             let entry = self.with_errors.entry(id.to_string()).or_default();
      76            0 :             if verbose {
      77            0 :                 entry.extend(analysis.errors.iter().cloned());
      78            0 :             }
      79            0 :         }
      80              : 
      81            0 :         if !analysis.warnings.is_empty() {
      82            0 :             let entry = self.with_warnings.entry(id.to_string()).or_default();
      83            0 :             if verbose {
      84            0 :                 entry.extend(analysis.warnings.iter().cloned());
      85            0 :             }
      86            0 :         }
      87            0 :     }
      88              : 
      89            0 :     fn notify_timeline_orphan(&mut self, ttid: &TenantShardTimelineId) {
      90            0 :         self.with_orphans.insert(*ttid);
      91            0 :     }
      92              : 
      93              :     /// Long-form output for printing at end of a scan
      94            0 :     pub fn summary_string(&self) -> String {
      95            0 :         let version_summary: String = itertools::join(
      96            0 :             self.indices_by_version
      97            0 :                 .iter()
      98            0 :                 .map(|(k, v)| format!("{k}: {v}")),
      99            0 :             ", ",
     100            0 :         );
     101            0 : 
     102            0 :         format!(
     103            0 :             "Tenants: {}
     104            0 : Timelines: {}
     105            0 : Timeline-shards: {}
     106            0 : With errors: {}
     107            0 : With warnings: {}
     108            0 : With orphan layers: {}
     109            0 : Index versions: {version_summary}
     110            0 : ",
     111            0 :             self.tenant_count,
     112            0 :             self.timeline_count,
     113            0 :             self.timeline_shard_count,
     114            0 :             self.with_errors.len(),
     115            0 :             self.with_warnings.len(),
     116            0 :             self.with_orphans.len(),
     117            0 :         )
     118            0 :     }
     119              : 
     120            0 :     pub fn is_fatal(&self) -> bool {
     121            0 :         !self.with_errors.is_empty()
     122            0 :     }
     123              : 
     124            0 :     pub fn is_empty(&self) -> bool {
     125            0 :         self.timeline_shard_count == 0
     126            0 :     }
     127              : 
     128            0 :     pub fn build_health_update_request(&self) -> MetadataHealthUpdateRequest {
     129            0 :         MetadataHealthUpdateRequest {
     130            0 :             healthy_tenant_shards: self.healthy_tenant_shards.clone(),
     131            0 :             unhealthy_tenant_shards: self.unhealthy_tenant_shards.clone(),
     132            0 :         }
     133            0 :     }
     134              : }
     135              : 
     136              : /// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
     137            0 : pub async fn scan_pageserver_metadata(
     138            0 :     bucket_config: BucketConfig,
     139            0 :     tenant_ids: Vec<TenantShardId>,
     140            0 :     verbose: bool,
     141            0 : ) -> anyhow::Result<MetadataSummary> {
     142            0 :     let (remote_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
     143              : 
     144            0 :     let tenants = if tenant_ids.is_empty() {
     145            0 :         futures::future::Either::Left(stream_tenants(&remote_client, &target))
     146              :     } else {
     147            0 :         futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
     148              :     };
     149              : 
     150              :     // How many tenants to process in parallel.  We need to be mindful of pageservers
     151              :     // accessing the same per tenant prefixes, so use a lower setting than pageservers.
     152              :     const CONCURRENCY: usize = 32;
     153              : 
     154              :     // Generate a stream of TenantTimelineId
     155            0 :     let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
     156            0 :     let timelines = timelines.try_buffered(CONCURRENCY);
     157            0 :     let timelines = timelines.try_flatten();
     158              : 
     159              :     // Generate a stream of S3TimelineBlobData
     160            0 :     async fn report_on_timeline(
     161            0 :         remote_client: &GenericRemoteStorage,
     162            0 :         target: &RootTarget,
     163            0 :         ttid: TenantShardTimelineId,
     164            0 :     ) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
     165            0 :         let data = list_timeline_blobs(remote_client, ttid, target).await?;
     166            0 :         Ok((ttid, data))
     167            0 :     }
     168            0 :     let timelines = timelines.map_ok(|ttid| report_on_timeline(&remote_client, &target, ttid));
     169            0 :     let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
     170            0 : 
     171            0 :     // We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
     172            0 :     // shards in the same tenant might refer to one anothers' keys if a shard split has happened.
     173            0 : 
     174            0 :     let mut tenant_id = None;
     175            0 :     let mut tenant_objects = TenantObjectListing::default();
     176            0 :     let mut tenant_timeline_results = Vec::new();
     177              : 
     178            0 :     async fn analyze_tenant(
     179            0 :         remote_client: &GenericRemoteStorage,
     180            0 :         tenant_id: TenantId,
     181            0 :         summary: &mut MetadataSummary,
     182            0 :         mut tenant_objects: TenantObjectListing,
     183            0 :         timelines: Vec<(TenantShardTimelineId, RemoteTimelineBlobData)>,
     184            0 :         highest_shard_count: ShardCount,
     185            0 :         verbose: bool,
     186            0 :     ) {
     187            0 :         summary.tenant_count += 1;
     188            0 : 
     189            0 :         let mut timeline_ids = HashSet::new();
     190            0 :         let mut timeline_generations = HashMap::new();
     191            0 :         for (ttid, data) in timelines {
     192            0 :             async {
     193            0 :                 if ttid.tenant_shard_id.shard_count == highest_shard_count {
     194              :                     // Only analyze `TenantShardId`s with highest shard count.
     195              : 
     196              :                     // Stash the generation of each timeline, for later use identifying orphan layers
     197              :                     if let BlobDataParseResult::Parsed {
     198            0 :                         index_part,
     199            0 :                         index_part_generation,
     200              :                         s3_layers: _,
     201              :                         index_part_last_modified_time: _,
     202              :                         index_part_snapshot_time: _,
     203            0 :                     } = &data.blob_data
     204              :                     {
     205            0 :                         if index_part.deleted_at.is_some() {
     206              :                             // skip deleted timeline.
     207            0 :                             tracing::info!(
     208            0 :                                 "Skip analysis of {} b/c timeline is already deleted",
     209              :                                 ttid
     210              :                             );
     211            0 :                             return;
     212            0 :                         }
     213            0 :                         timeline_generations.insert(ttid, *index_part_generation);
     214            0 :                     }
     215              : 
     216              :                     // Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
     217              :                     // reference counts for layers across the tenant.
     218            0 :                     let analysis = branch_cleanup_and_check_errors(
     219            0 :                         remote_client,
     220            0 :                         &ttid,
     221            0 :                         &mut tenant_objects,
     222            0 :                         None,
     223            0 :                         None,
     224            0 :                         Some(data),
     225            0 :                     )
     226            0 :                     .await;
     227            0 :                     summary.update_analysis(&ttid, &analysis, verbose);
     228            0 : 
     229            0 :                     timeline_ids.insert(ttid.timeline_id);
     230              :                 } else {
     231            0 :                     tracing::info!(
     232            0 :                         "Skip analysis of {} b/c a lower shard count than {}",
     233              :                         ttid,
     234              :                         highest_shard_count.0,
     235              :                     );
     236              :                 }
     237            0 :             }
     238              :             .instrument(
     239            0 :                 info_span!("analyze-timeline", shard = %ttid.tenant_shard_id.shard_slug(), timeline = %ttid.timeline_id),
     240              :             )
     241            0 :             .await
     242              :         }
     243              : 
     244            0 :         summary.timeline_count += timeline_ids.len();
     245              : 
     246              :         // Identifying orphan layers must be done on a tenant-wide basis, because individual
     247              :         // shards' layers may be referenced by other shards.
     248              :         //
     249              :         // Orphan layers are not a corruption, and not an indication of a problem.  They are just
     250              :         // consuming some space in remote storage, and may be cleaned up at leisure.
     251            0 :         for (shard_index, timeline_id, layer_file, generation) in tenant_objects.get_orphans() {
     252            0 :             let ttid = TenantShardTimelineId {
     253            0 :                 tenant_shard_id: TenantShardId {
     254            0 :                     tenant_id,
     255            0 :                     shard_count: shard_index.shard_count,
     256            0 :                     shard_number: shard_index.shard_number,
     257            0 :                 },
     258            0 :                 timeline_id,
     259            0 :             };
     260              : 
     261            0 :             if let Some(timeline_generation) = timeline_generations.get(&ttid) {
     262            0 :                 if &generation >= timeline_generation {
     263              :                     // Candidate orphan layer is in the current or future generation relative
     264              :                     // to the index we read for this timeline shard, so its absence from the index
     265              :                     // doesn't make it an orphan: more likely, it is a case where the layer was
     266              :                     // uploaded, but the index referencing the layer wasn't written yet.
     267            0 :                     continue;
     268            0 :                 }
     269            0 :             }
     270              : 
     271            0 :             let orphan_path = remote_layer_path(
     272            0 :                 &tenant_id,
     273            0 :                 &timeline_id,
     274            0 :                 shard_index,
     275            0 :                 &layer_file,
     276            0 :                 generation,
     277            0 :             );
     278            0 : 
     279            0 :             tracing::info!("Orphan layer detected: {orphan_path}");
     280              : 
     281            0 :             summary.notify_timeline_orphan(&ttid);
     282              :         }
     283            0 :     }
     284              : 
     285              :     // Iterate through  all the timeline results.  These are in key-order, so
     286              :     // all results for the same tenant will be adjacent.  We accumulate these,
     287              :     // and then call `analyze_tenant` to flush, when we see the next tenant ID.
     288            0 :     let mut summary = MetadataSummary::new();
     289            0 :     let mut highest_shard_count = ShardCount::MIN;
     290            0 :     while let Some(i) = timelines.next().await {
     291            0 :         let (ttid, data) = i?;
     292            0 :         summary.update_data(&data);
     293            0 : 
     294            0 :         match tenant_id {
     295            0 :             Some(prev_tenant_id) => {
     296            0 :                 if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
     297              :                     // New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
     298            0 :                     let tenant_objects = std::mem::take(&mut tenant_objects);
     299            0 :                     let timelines = std::mem::take(&mut tenant_timeline_results);
     300            0 :                     analyze_tenant(
     301            0 :                         &remote_client,
     302            0 :                         prev_tenant_id,
     303            0 :                         &mut summary,
     304            0 :                         tenant_objects,
     305            0 :                         timelines,
     306            0 :                         highest_shard_count,
     307            0 :                         verbose,
     308            0 :                     )
     309            0 :                     .instrument(info_span!("analyze-tenant", tenant = %prev_tenant_id))
     310            0 :                     .await;
     311            0 :                     tenant_id = Some(ttid.tenant_shard_id.tenant_id);
     312            0 :                     highest_shard_count = ttid.tenant_shard_id.shard_count;
     313            0 :                 } else {
     314            0 :                     highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
     315            0 :                 }
     316              :             }
     317            0 :             None => {
     318            0 :                 tenant_id = Some(ttid.tenant_shard_id.tenant_id);
     319            0 :                 highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
     320            0 :             }
     321              :         }
     322              : 
     323            0 :         match &data.blob_data {
     324              :             BlobDataParseResult::Parsed {
     325              :                 index_part: _,
     326            0 :                 index_part_generation: _index_part_generation,
     327            0 :                 s3_layers,
     328            0 :                 index_part_last_modified_time: _,
     329            0 :                 index_part_snapshot_time: _,
     330            0 :             } => {
     331            0 :                 tenant_objects.push(ttid, s3_layers.clone());
     332            0 :             }
     333            0 :             BlobDataParseResult::Relic => (),
     334              :             BlobDataParseResult::Incorrect {
     335              :                 errors: _,
     336            0 :                 s3_layers,
     337            0 :             } => {
     338            0 :                 tenant_objects.push(ttid, s3_layers.clone());
     339            0 :             }
     340              :         }
     341            0 :         tenant_timeline_results.push((ttid, data));
     342              :     }
     343              : 
     344            0 :     if !tenant_timeline_results.is_empty() {
     345            0 :         let tenant_id = tenant_id.expect("Must be set if results are present");
     346            0 :         analyze_tenant(
     347            0 :             &remote_client,
     348            0 :             tenant_id,
     349            0 :             &mut summary,
     350            0 :             tenant_objects,
     351            0 :             tenant_timeline_results,
     352            0 :             highest_shard_count,
     353            0 :             verbose,
     354            0 :         )
     355            0 :         .instrument(info_span!("analyze-tenant", tenant = %tenant_id))
     356            0 :         .await;
     357            0 :     }
     358              : 
     359            0 :     Ok(summary)
     360            0 : }
        

Generated by: LCOV version 2.1-beta