LCOV - code coverage report
Current view: top level - storage_scrubber/src - scan_pageserver_metadata.rs (source / functions) Coverage Total Hit
Test: 2b0730d767f560e20b6748f57465922aa8bb805e.info Lines: 0.0 % 215 0
Test Date: 2024-09-25 14:04:07 Functions: 0.0 % 17 0

            Line data    Source code
       1              : use std::collections::{HashMap, HashSet};
       2              : 
       3              : use crate::checks::{
       4              :     branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult,
       5              :     RemoteTimelineBlobData, TenantObjectListing, TimelineAnalysis,
       6              : };
       7              : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
       8              : use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
       9              : use futures_util::{StreamExt, TryStreamExt};
      10              : use pageserver::tenant::remote_timeline_client::remote_layer_path;
      11              : use pageserver_api::controller_api::MetadataHealthUpdateRequest;
      12              : use pageserver_api::shard::TenantShardId;
      13              : use remote_storage::GenericRemoteStorage;
      14              : use serde::Serialize;
      15              : use utils::id::TenantId;
      16              : use utils::shard::ShardCount;
      17              : 
      18              : #[derive(Serialize, Default)]
      19              : pub struct MetadataSummary {
      20              :     tenant_count: usize,
      21              :     timeline_count: usize,
      22              :     timeline_shard_count: usize,
      23              :     with_errors: HashSet<TenantShardTimelineId>,
      24              :     with_warnings: HashSet<TenantShardTimelineId>,
      25              :     with_orphans: HashSet<TenantShardTimelineId>,
      26              :     indices_by_version: HashMap<usize, usize>,
      27              : 
      28              :     #[serde(skip)]
      29              :     pub(crate) healthy_tenant_shards: HashSet<TenantShardId>,
      30              :     #[serde(skip)]
      31              :     pub(crate) unhealthy_tenant_shards: HashSet<TenantShardId>,
      32              : }
      33              : 
      34              : impl MetadataSummary {
      35            0 :     fn new() -> Self {
      36            0 :         Self::default()
      37            0 :     }
      38              : 
      39            0 :     fn update_data(&mut self, data: &RemoteTimelineBlobData) {
      40            0 :         self.timeline_shard_count += 1;
      41              :         if let BlobDataParseResult::Parsed {
      42            0 :             index_part,
      43              :             index_part_generation: _,
      44              :             s3_layers: _,
      45            0 :         } = &data.blob_data
      46            0 :         {
      47            0 :             *self
      48            0 :                 .indices_by_version
      49            0 :                 .entry(index_part.version())
      50            0 :                 .or_insert(0) += 1;
      51            0 :         }
      52            0 :     }
      53              : 
      54            0 :     fn update_analysis(&mut self, id: &TenantShardTimelineId, analysis: &TimelineAnalysis) {
      55            0 :         if analysis.is_healthy() {
      56            0 :             self.healthy_tenant_shards.insert(id.tenant_shard_id);
      57            0 :         } else {
      58            0 :             self.healthy_tenant_shards.remove(&id.tenant_shard_id);
      59            0 :             self.unhealthy_tenant_shards.insert(id.tenant_shard_id);
      60            0 :         }
      61              : 
      62            0 :         if !analysis.errors.is_empty() {
      63            0 :             self.with_errors.insert(*id);
      64            0 :         }
      65              : 
      66            0 :         if !analysis.warnings.is_empty() {
      67            0 :             self.with_warnings.insert(*id);
      68            0 :         }
      69            0 :     }
      70              : 
      71            0 :     fn notify_timeline_orphan(&mut self, ttid: &TenantShardTimelineId) {
      72            0 :         self.with_orphans.insert(*ttid);
      73            0 :     }
      74              : 
      75              :     /// Long-form output for printing at end of a scan
      76            0 :     pub fn summary_string(&self) -> String {
      77            0 :         let version_summary: String = itertools::join(
      78            0 :             self.indices_by_version
      79            0 :                 .iter()
      80            0 :                 .map(|(k, v)| format!("{k}: {v}")),
      81            0 :             ", ",
      82            0 :         );
      83            0 : 
      84            0 :         format!(
      85            0 :             "Tenants: {}
      86            0 : Timelines: {}
      87            0 : Timeline-shards: {}
      88            0 : With errors: {}
      89            0 : With warnings: {}
      90            0 : With orphan layers: {}
      91            0 : Index versions: {version_summary}
      92            0 : ",
      93            0 :             self.tenant_count,
      94            0 :             self.timeline_count,
      95            0 :             self.timeline_shard_count,
      96            0 :             self.with_errors.len(),
      97            0 :             self.with_warnings.len(),
      98            0 :             self.with_orphans.len(),
      99            0 :         )
     100            0 :     }
     101              : 
     102            0 :     pub fn is_fatal(&self) -> bool {
     103            0 :         !self.with_errors.is_empty()
     104            0 :     }
     105              : 
     106            0 :     pub fn is_empty(&self) -> bool {
     107            0 :         self.timeline_shard_count == 0
     108            0 :     }
     109              : 
     110            0 :     pub fn build_health_update_request(&self) -> MetadataHealthUpdateRequest {
     111            0 :         MetadataHealthUpdateRequest {
     112            0 :             healthy_tenant_shards: self.healthy_tenant_shards.clone(),
     113            0 :             unhealthy_tenant_shards: self.unhealthy_tenant_shards.clone(),
     114            0 :         }
     115            0 :     }
     116              : }
     117              : 
     118              : /// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
     119            0 : pub async fn scan_pageserver_metadata(
     120            0 :     bucket_config: BucketConfig,
     121            0 :     tenant_ids: Vec<TenantShardId>,
     122            0 : ) -> anyhow::Result<MetadataSummary> {
     123            0 :     let (remote_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
     124              : 
     125            0 :     let tenants = if tenant_ids.is_empty() {
     126            0 :         futures::future::Either::Left(stream_tenants(&remote_client, &target))
     127              :     } else {
     128            0 :         futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
     129              :     };
     130              : 
     131              :     // How many tenants to process in parallel.  We need to be mindful of pageservers
     132              :     // accessing the same per tenant prefixes, so use a lower setting than pageservers.
     133              :     const CONCURRENCY: usize = 32;
     134              : 
     135              :     // Generate a stream of TenantTimelineId
     136            0 :     let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
     137            0 :     let timelines = timelines.try_buffered(CONCURRENCY);
     138            0 :     let timelines = timelines.try_flatten();
     139              : 
     140              :     // Generate a stream of S3TimelineBlobData
     141            0 :     async fn report_on_timeline(
     142            0 :         remote_client: &GenericRemoteStorage,
     143            0 :         target: &RootTarget,
     144            0 :         ttid: TenantShardTimelineId,
     145            0 :     ) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
     146            0 :         let data = list_timeline_blobs(remote_client, ttid, target).await?;
     147            0 :         Ok((ttid, data))
     148            0 :     }
     149            0 :     let timelines = timelines.map_ok(|ttid| report_on_timeline(&remote_client, &target, ttid));
     150            0 :     let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
     151            0 : 
     152            0 :     // We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
     153            0 :     // shards in the same tenant might refer to one anothers' keys if a shard split has happened.
     154            0 : 
     155            0 :     let mut tenant_id = None;
     156            0 :     let mut tenant_objects = TenantObjectListing::default();
     157            0 :     let mut tenant_timeline_results = Vec::new();
     158              : 
     159            0 :     async fn analyze_tenant(
     160            0 :         remote_client: &GenericRemoteStorage,
     161            0 :         tenant_id: TenantId,
     162            0 :         summary: &mut MetadataSummary,
     163            0 :         mut tenant_objects: TenantObjectListing,
     164            0 :         timelines: Vec<(TenantShardTimelineId, RemoteTimelineBlobData)>,
     165            0 :         highest_shard_count: ShardCount,
     166            0 :     ) {
     167            0 :         summary.tenant_count += 1;
     168            0 : 
     169            0 :         let mut timeline_ids = HashSet::new();
     170            0 :         let mut timeline_generations = HashMap::new();
     171            0 :         for (ttid, data) in timelines {
     172            0 :             if ttid.tenant_shard_id.shard_count == highest_shard_count {
     173              :                 // Only analyze `TenantShardId`s with highest shard count.
     174              : 
     175              :                 // Stash the generation of each timeline, for later use identifying orphan layers
     176              :                 if let BlobDataParseResult::Parsed {
     177            0 :                     index_part,
     178            0 :                     index_part_generation,
     179            0 :                     s3_layers: _s3_layers,
     180            0 :                 } = &data.blob_data
     181              :                 {
     182            0 :                     if index_part.deleted_at.is_some() {
     183              :                         // skip deleted timeline.
     184            0 :                         tracing::info!("Skip analysis of {} b/c timeline is already deleted", ttid);
     185            0 :                         continue;
     186            0 :                     }
     187            0 :                     timeline_generations.insert(ttid, *index_part_generation);
     188            0 :                 }
     189              : 
     190              :                 // Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
     191              :                 // reference counts for layers across the tenant.
     192            0 :                 let analysis = branch_cleanup_and_check_errors(
     193            0 :                     remote_client,
     194            0 :                     &ttid,
     195            0 :                     &mut tenant_objects,
     196            0 :                     None,
     197            0 :                     None,
     198            0 :                     Some(data),
     199            0 :                 )
     200            0 :                 .await;
     201            0 :                 summary.update_analysis(&ttid, &analysis);
     202            0 : 
     203            0 :                 timeline_ids.insert(ttid.timeline_id);
     204              :             } else {
     205            0 :                 tracing::info!(
     206            0 :                     "Skip analysis of {} b/c a lower shard count than {}",
     207              :                     ttid,
     208              :                     highest_shard_count.0,
     209              :                 );
     210              :             }
     211              :         }
     212              : 
     213            0 :         summary.timeline_count += timeline_ids.len();
     214              : 
     215              :         // Identifying orphan layers must be done on a tenant-wide basis, because individual
     216              :         // shards' layers may be referenced by other shards.
     217              :         //
     218              :         // Orphan layers are not a corruption, and not an indication of a problem.  They are just
     219              :         // consuming some space in remote storage, and may be cleaned up at leisure.
     220            0 :         for (shard_index, timeline_id, layer_file, generation) in tenant_objects.get_orphans() {
     221            0 :             let ttid = TenantShardTimelineId {
     222            0 :                 tenant_shard_id: TenantShardId {
     223            0 :                     tenant_id,
     224            0 :                     shard_count: shard_index.shard_count,
     225            0 :                     shard_number: shard_index.shard_number,
     226            0 :                 },
     227            0 :                 timeline_id,
     228            0 :             };
     229              : 
     230            0 :             if let Some(timeline_generation) = timeline_generations.get(&ttid) {
     231            0 :                 if &generation >= timeline_generation {
     232              :                     // Candidate orphan layer is in the current or future generation relative
     233              :                     // to the index we read for this timeline shard, so its absence from the index
     234              :                     // doesn't make it an orphan: more likely, it is a case where the layer was
     235              :                     // uploaded, but the index referencing the layer wasn't written yet.
     236            0 :                     continue;
     237            0 :                 }
     238            0 :             }
     239              : 
     240            0 :             let orphan_path = remote_layer_path(
     241            0 :                 &tenant_id,
     242            0 :                 &timeline_id,
     243            0 :                 shard_index,
     244            0 :                 &layer_file,
     245            0 :                 generation,
     246            0 :             );
     247            0 : 
     248            0 :             tracing::info!("Orphan layer detected: {orphan_path}");
     249              : 
     250            0 :             summary.notify_timeline_orphan(&ttid);
     251              :         }
     252            0 :     }
     253              : 
     254              :     // Iterate through  all the timeline results.  These are in key-order, so
     255              :     // all results for the same tenant will be adjacent.  We accumulate these,
     256              :     // and then call `analyze_tenant` to flush, when we see the next tenant ID.
     257            0 :     let mut summary = MetadataSummary::new();
     258            0 :     let mut highest_shard_count = ShardCount::MIN;
     259            0 :     while let Some(i) = timelines.next().await {
     260            0 :         let (ttid, data) = i?;
     261            0 :         summary.update_data(&data);
     262            0 : 
     263            0 :         match tenant_id {
     264            0 :             None => {
     265            0 :                 tenant_id = Some(ttid.tenant_shard_id.tenant_id);
     266            0 :                 highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
     267            0 :             }
     268            0 :             Some(prev_tenant_id) => {
     269            0 :                 if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
     270              :                     // New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
     271            0 :                     let tenant_objects = std::mem::take(&mut tenant_objects);
     272            0 :                     let timelines = std::mem::take(&mut tenant_timeline_results);
     273            0 :                     analyze_tenant(
     274            0 :                         &remote_client,
     275            0 :                         prev_tenant_id,
     276            0 :                         &mut summary,
     277            0 :                         tenant_objects,
     278            0 :                         timelines,
     279            0 :                         highest_shard_count,
     280            0 :                     )
     281            0 :                     .await;
     282            0 :                     tenant_id = Some(ttid.tenant_shard_id.tenant_id);
     283            0 :                     highest_shard_count = ttid.tenant_shard_id.shard_count;
     284            0 :                 } else {
     285            0 :                     highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
     286            0 :                 }
     287              :             }
     288              :         }
     289              : 
     290            0 :         match &data.blob_data {
     291              :             BlobDataParseResult::Parsed {
     292            0 :                 index_part: _index_part,
     293            0 :                 index_part_generation: _index_part_generation,
     294            0 :                 s3_layers,
     295            0 :             } => {
     296            0 :                 tenant_objects.push(ttid, s3_layers.clone());
     297            0 :             }
     298            0 :             BlobDataParseResult::Relic => (),
     299              :             BlobDataParseResult::Incorrect {
     300              :                 errors: _,
     301            0 :                 s3_layers,
     302            0 :             } => {
     303            0 :                 tenant_objects.push(ttid, s3_layers.clone());
     304            0 :             }
     305              :         }
     306            0 :         tenant_timeline_results.push((ttid, data));
     307              :     }
     308              : 
     309            0 :     if !tenant_timeline_results.is_empty() {
     310            0 :         analyze_tenant(
     311            0 :             &remote_client,
     312            0 :             tenant_id.expect("Must be set if results are present"),
     313            0 :             &mut summary,
     314            0 :             tenant_objects,
     315            0 :             tenant_timeline_results,
     316            0 :             highest_shard_count,
     317            0 :         )
     318            0 :         .await;
     319            0 :     }
     320              : 
     321            0 :     Ok(summary)
     322            0 : }
        

Generated by: LCOV version 2.1-beta