LCOV - code coverage report
Current view: top level - storage_scrubber/src - scan_pageserver_metadata.rs (source / functions) Coverage Total Hit
Test: f081ec316c96fa98335efd15ef501745aa4f015d.info Lines: 0.0 % 261 0
Test Date: 2024-06-25 15:11:17 Functions: 0.0 % 20 0

            Line data    Source code
       1              : use std::collections::{HashMap, HashSet};
       2              : 
       3              : use crate::checks::{
       4              :     branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
       5              :     TenantObjectListing, TimelineAnalysis,
       6              : };
       7              : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
       8              : use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
       9              : use aws_sdk_s3::Client;
      10              : use futures_util::{StreamExt, TryStreamExt};
      11              : use histogram::Histogram;
      12              : use pageserver::tenant::remote_timeline_client::remote_layer_path;
      13              : use pageserver::tenant::IndexPart;
      14              : use pageserver_api::shard::TenantShardId;
      15              : use serde::Serialize;
      16              : use utils::id::TenantId;
      17              : 
      18              : #[derive(Serialize)]
      19              : pub struct MetadataSummary {
      20              :     tenant_count: usize,
      21              :     timeline_count: usize,
      22              :     timeline_shard_count: usize,
      23              :     with_errors: HashSet<TenantShardTimelineId>,
      24              :     with_warnings: HashSet<TenantShardTimelineId>,
      25              :     with_orphans: HashSet<TenantShardTimelineId>,
      26              :     indices_by_version: HashMap<usize, usize>,
      27              : 
      28              :     layer_count: MinMaxHisto,
      29              :     timeline_size_bytes: MinMaxHisto,
      30              :     layer_size_bytes: MinMaxHisto,
      31              : }
      32              : 
      33              : /// A histogram plus minimum and maximum tracking
      34              : #[derive(Serialize)]
      35              : struct MinMaxHisto {
      36              :     #[serde(skip)]
      37              :     histo: Histogram,
      38              :     min: u64,
      39              :     max: u64,
      40              : }
      41              : 
      42              : impl MinMaxHisto {
      43            0 :     fn new() -> Self {
      44            0 :         Self {
      45            0 :             histo: histogram::Histogram::builder()
      46            0 :                 .build()
      47            0 :                 .expect("Bad histogram params"),
      48            0 :             min: u64::MAX,
      49            0 :             max: 0,
      50            0 :         }
      51            0 :     }
      52              : 
      53            0 :     fn sample(&mut self, v: u64) -> Result<(), histogram::Error> {
      54            0 :         self.min = std::cmp::min(self.min, v);
      55            0 :         self.max = std::cmp::max(self.max, v);
      56            0 :         let r = self.histo.increment(v, 1);
      57            0 : 
      58            0 :         if r.is_err() {
      59            0 :             tracing::warn!("Bad histogram sample: {v}");
      60            0 :         }
      61              : 
      62            0 :         r
      63            0 :     }
      64              : 
      65            0 :     fn oneline(&self) -> String {
      66            0 :         let percentiles = match self.histo.percentiles(&[1.0, 10.0, 50.0, 90.0, 99.0]) {
      67            0 :             Ok(p) => p,
      68            0 :             Err(e) => return format!("No data: {}", e),
      69              :         };
      70              : 
      71            0 :         let percentiles: Vec<u64> = percentiles
      72            0 :             .iter()
      73            0 :             .map(|p| p.bucket().low() + p.bucket().high() / 2)
      74            0 :             .collect();
      75            0 : 
      76            0 :         format!(
      77            0 :             "min {}, 1% {}, 10% {}, 50% {}, 90% {}, 99% {}, max {}",
      78            0 :             self.min,
      79            0 :             percentiles[0],
      80            0 :             percentiles[1],
      81            0 :             percentiles[2],
      82            0 :             percentiles[3],
      83            0 :             percentiles[4],
      84            0 :             self.max,
      85            0 :         )
      86            0 :     }
      87              : }
      88              : 
      89              : impl MetadataSummary {
      90            0 :     fn new() -> Self {
      91            0 :         Self {
      92            0 :             tenant_count: 0,
      93            0 :             timeline_count: 0,
      94            0 :             timeline_shard_count: 0,
      95            0 :             with_errors: HashSet::new(),
      96            0 :             with_warnings: HashSet::new(),
      97            0 :             with_orphans: HashSet::new(),
      98            0 :             indices_by_version: HashMap::new(),
      99            0 :             layer_count: MinMaxHisto::new(),
     100            0 :             timeline_size_bytes: MinMaxHisto::new(),
     101            0 :             layer_size_bytes: MinMaxHisto::new(),
     102            0 :         }
     103            0 :     }
     104              : 
     105            0 :     fn update_histograms(&mut self, index_part: &IndexPart) -> Result<(), histogram::Error> {
     106            0 :         self.layer_count
     107            0 :             .sample(index_part.layer_metadata.len() as u64)?;
     108            0 :         let mut total_size: u64 = 0;
     109            0 :         for meta in index_part.layer_metadata.values() {
     110            0 :             total_size += meta.file_size;
     111            0 :             self.layer_size_bytes.sample(meta.file_size)?;
     112              :         }
     113            0 :         self.timeline_size_bytes.sample(total_size)?;
     114              : 
     115            0 :         Ok(())
     116            0 :     }
     117              : 
     118            0 :     fn update_data(&mut self, data: &S3TimelineBlobData) {
     119            0 :         self.timeline_shard_count += 1;
     120              :         if let BlobDataParseResult::Parsed {
     121            0 :             index_part,
     122              :             index_part_generation: _,
     123              :             s3_layers: _,
     124            0 :         } = &data.blob_data
     125              :         {
     126            0 :             *self
     127            0 :                 .indices_by_version
     128            0 :                 .entry(index_part.version())
     129            0 :                 .or_insert(0) += 1;
     130              : 
     131            0 :             if let Err(e) = self.update_histograms(index_part) {
     132              :                 // Value out of range?  Warn that the results are untrustworthy
     133            0 :                 tracing::warn!(
     134            0 :                     "Error updating histograms, summary stats may be wrong: {}",
     135              :                     e
     136              :                 );
     137            0 :             }
     138            0 :         }
     139            0 :     }
     140              : 
     141            0 :     fn update_analysis(&mut self, id: &TenantShardTimelineId, analysis: &TimelineAnalysis) {
     142            0 :         if !analysis.errors.is_empty() {
     143            0 :             self.with_errors.insert(*id);
     144            0 :         }
     145              : 
     146            0 :         if !analysis.warnings.is_empty() {
     147            0 :             self.with_warnings.insert(*id);
     148            0 :         }
     149            0 :     }
     150              : 
     151            0 :     fn notify_timeline_orphan(&mut self, ttid: &TenantShardTimelineId) {
     152            0 :         self.with_orphans.insert(*ttid);
     153            0 :     }
     154              : 
     155              :     /// Long-form output for printing at end of a scan
     156            0 :     pub fn summary_string(&self) -> String {
     157            0 :         let version_summary: String = itertools::join(
     158            0 :             self.indices_by_version
     159            0 :                 .iter()
     160            0 :                 .map(|(k, v)| format!("{k}: {v}")),
     161            0 :             ", ",
     162            0 :         );
     163            0 : 
     164            0 :         format!(
     165            0 :             "Tenants: {}
     166            0 : Timelines: {}
     167            0 : Timeline-shards: {}
     168            0 : With errors: {}
     169            0 : With warnings: {}
     170            0 : With orphan layers: {}
     171            0 : Index versions: {version_summary}
     172            0 : Timeline size bytes: {}
     173            0 : Layer size bytes: {}
     174            0 : Timeline layer count: {}
     175            0 : ",
     176            0 :             self.tenant_count,
     177            0 :             self.timeline_count,
     178            0 :             self.timeline_shard_count,
     179            0 :             self.with_errors.len(),
     180            0 :             self.with_warnings.len(),
     181            0 :             self.with_orphans.len(),
     182            0 :             self.timeline_size_bytes.oneline(),
     183            0 :             self.layer_size_bytes.oneline(),
     184            0 :             self.layer_count.oneline(),
     185            0 :         )
     186            0 :     }
     187              : 
     188            0 :     pub fn is_fatal(&self) -> bool {
     189            0 :         !self.with_errors.is_empty()
     190            0 :     }
     191              : 
     192            0 :     pub fn is_empty(&self) -> bool {
     193            0 :         self.timeline_shard_count == 0
     194            0 :     }
     195              : }
     196              : 
     197              : /// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
     198            0 : pub async fn scan_metadata(
     199            0 :     bucket_config: BucketConfig,
     200            0 :     tenant_ids: Vec<TenantShardId>,
     201            0 : ) -> anyhow::Result<MetadataSummary> {
     202            0 :     let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver)?;
     203              : 
     204            0 :     let tenants = if tenant_ids.is_empty() {
     205            0 :         futures::future::Either::Left(stream_tenants(&s3_client, &target))
     206              :     } else {
     207            0 :         futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
     208              :     };
     209              : 
     210              :     // How many tenants to process in parallel.  We need to be mindful of pageservers
     211              :     // accessing the same per tenant prefixes, so use a lower setting than pageservers.
     212              :     const CONCURRENCY: usize = 32;
     213              : 
     214              :     // Generate a stream of TenantTimelineId
     215            0 :     let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
     216            0 :     let timelines = timelines.try_buffered(CONCURRENCY);
     217            0 :     let timelines = timelines.try_flatten();
     218            0 : 
     219            0 :     // Generate a stream of S3TimelineBlobData
     220            0 :     async fn report_on_timeline(
     221            0 :         s3_client: &Client,
     222            0 :         target: &RootTarget,
     223            0 :         ttid: TenantShardTimelineId,
     224            0 :     ) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> {
     225            0 :         let data = list_timeline_blobs(s3_client, ttid, target).await?;
     226            0 :         Ok((ttid, data))
     227            0 :     }
     228            0 :     let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
     229            0 :     let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
     230            0 : 
     231            0 :     // We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
     232            0 :     // shards in the same tenant might refer to one anothers' keys if a shard split has happened.
     233            0 : 
     234            0 :     let mut tenant_id = None;
     235            0 :     let mut tenant_objects = TenantObjectListing::default();
     236            0 :     let mut tenant_timeline_results = Vec::new();
     237            0 : 
     238            0 :     fn analyze_tenant(
     239            0 :         tenant_id: TenantId,
     240            0 :         summary: &mut MetadataSummary,
     241            0 :         mut tenant_objects: TenantObjectListing,
     242            0 :         timelines: Vec<(TenantShardTimelineId, S3TimelineBlobData)>,
     243            0 :     ) {
     244            0 :         summary.tenant_count += 1;
     245            0 : 
     246            0 :         let mut timeline_ids = HashSet::new();
     247            0 :         let mut timeline_generations = HashMap::new();
     248            0 :         for (ttid, data) in timelines {
     249            0 :             timeline_ids.insert(ttid.timeline_id);
     250            0 :             // Stash the generation of each timeline, for later use identifying orphan layers
     251            0 :             if let BlobDataParseResult::Parsed {
     252            0 :                 index_part: _index_part,
     253            0 :                 index_part_generation,
     254            0 :                 s3_layers: _s3_layers,
     255            0 :             } = &data.blob_data
     256            0 :             {
     257            0 :                 timeline_generations.insert(ttid, *index_part_generation);
     258            0 :             }
     259            0 : 
     260            0 :             // Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
     261            0 :             // reference counts for layers across the tenant.
     262            0 :             let analysis =
     263            0 :                 branch_cleanup_and_check_errors(&ttid, &mut tenant_objects, None, None, Some(data));
     264            0 :             summary.update_analysis(&ttid, &analysis);
     265            0 :         }
     266            0 : 
     267            0 :         summary.timeline_count += timeline_ids.len();
     268            0 : 
     269            0 :         // Identifying orphan layers must be done on a tenant-wide basis, because individual
     270            0 :         // shards' layers may be referenced by other shards.
     271            0 :         //
     272            0 :         // Orphan layers are not a corruption, and not an indication of a problem.  They are just
     273            0 :         // consuming some space in remote storage, and may be cleaned up at leisure.
     274            0 :         for (shard_index, timeline_id, layer_file, generation) in tenant_objects.get_orphans() {
     275            0 :             let ttid = TenantShardTimelineId {
     276            0 :                 tenant_shard_id: TenantShardId {
     277            0 :                     tenant_id,
     278            0 :                     shard_count: shard_index.shard_count,
     279            0 :                     shard_number: shard_index.shard_number,
     280            0 :                 },
     281            0 :                 timeline_id,
     282            0 :             };
     283            0 : 
     284            0 :             if let Some(timeline_generation) = timeline_generations.get(&ttid) {
     285            0 :                 if &generation >= timeline_generation {
     286            0 :                     // Candidate orphan layer is in the current or future generation relative
     287            0 :                     // to the index we read for this timeline shard, so its absence from the index
     288            0 :                     // doesn't make it an orphan: more likely, it is a case where the layer was
     289            0 :                     // uploaded, but the index referencing the layer wasn't written yet.
     290            0 :                     continue;
     291            0 :                 }
     292            0 :             }
     293            0 : 
     294            0 :             let orphan_path = remote_layer_path(
     295            0 :                 &tenant_id,
     296            0 :                 &timeline_id,
     297            0 :                 shard_index,
     298            0 :                 &layer_file,
     299            0 :                 generation,
     300            0 :             );
     301            0 : 
     302            0 :             tracing::info!("Orphan layer detected: {orphan_path}");
     303            0 : 
     304            0 :             summary.notify_timeline_orphan(&ttid);
     305            0 :         }
     306            0 :     }
     307            0 : 
     308            0 :     // Iterate through  all the timeline results.  These are in key-order, so
     309            0 :     // all results for the same tenant will be adjacent.  We accumulate these,
     310            0 :     // and then call `analyze_tenant` to flush, when we see the next tenant ID.
     311            0 :     let mut summary = MetadataSummary::new();
     312            0 :     while let Some(i) = timelines.next().await {
     313            0 :         let (ttid, data) = i?;
     314            0 :         summary.update_data(&data);
     315            0 : 
     316            0 :         match tenant_id {
     317            0 :             None => tenant_id = Some(ttid.tenant_shard_id.tenant_id),
     318            0 :             Some(prev_tenant_id) => {
     319            0 :                 if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
     320            0 :                     let tenant_objects = std::mem::take(&mut tenant_objects);
     321            0 :                     let timelines = std::mem::take(&mut tenant_timeline_results);
     322            0 :                     analyze_tenant(prev_tenant_id, &mut summary, tenant_objects, timelines);
     323            0 :                     tenant_id = Some(ttid.tenant_shard_id.tenant_id);
     324            0 :                 }
     325              :             }
     326              :         }
     327              : 
     328              :         if let BlobDataParseResult::Parsed {
     329            0 :             index_part: _index_part,
     330            0 :             index_part_generation: _index_part_generation,
     331            0 :             s3_layers,
     332            0 :         } = &data.blob_data
     333            0 :         {
     334            0 :             tenant_objects.push(ttid, s3_layers.clone());
     335            0 :         }
     336            0 :         tenant_timeline_results.push((ttid, data));
     337              :     }
     338              : 
     339            0 :     if !tenant_timeline_results.is_empty() {
     340            0 :         analyze_tenant(
     341            0 :             tenant_id.expect("Must be set if results are present"),
     342            0 :             &mut summary,
     343            0 :             tenant_objects,
     344            0 :             tenant_timeline_results,
     345            0 :         );
     346            0 :     }
     347              : 
     348            0 :     Ok(summary)
     349            0 : }
        

Generated by: LCOV version 2.1-beta