LCOV - differential code coverage report
Current view: top level - s3_scrubber/src - scan_metadata.rs (source / functions) Coverage Total Hit UBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 0.0 % 266 0 266
Current Date: 2024-01-09 02:06:09 Functions: 0.0 % 28 0 28
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use std::collections::{HashMap, HashSet};
       2                 : 
       3                 : use crate::checks::{
       4                 :     branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
       5                 :     TenantObjectListing, TimelineAnalysis,
       6                 : };
       7                 : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
       8                 : use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
       9                 : use aws_sdk_s3::Client;
      10                 : use futures_util::{pin_mut, StreamExt, TryStreamExt};
      11                 : use histogram::Histogram;
      12                 : use pageserver::tenant::remote_timeline_client::remote_layer_path;
      13                 : use pageserver::tenant::IndexPart;
      14                 : use pageserver_api::shard::TenantShardId;
      15                 : use serde::Serialize;
      16                 : use utils::id::TenantId;
      17                 : 
      18 UBC           0 : #[derive(Serialize)]
      19                 : pub struct MetadataSummary {
      20                 :     tenant_count: usize,
      21                 :     timeline_count: usize,
      22                 :     timeline_shard_count: usize,
      23                 :     with_errors: HashSet<TenantShardTimelineId>,
      24                 :     with_warnings: HashSet<TenantShardTimelineId>,
      25                 :     with_orphans: HashSet<TenantShardTimelineId>,
      26                 :     indices_by_version: HashMap<usize, usize>,
      27                 : 
      28                 :     layer_count: MinMaxHisto,
      29                 :     timeline_size_bytes: MinMaxHisto,
      30                 :     layer_size_bytes: MinMaxHisto,
      31                 : }
      32                 : 
      33                 : /// A histogram plus minimum and maximum tracking
      34               0 : #[derive(Serialize)]
      35                 : struct MinMaxHisto {
      36                 :     #[serde(skip)]
      37                 :     histo: Histogram,
      38                 :     min: u64,
      39                 :     max: u64,
      40                 : }
      41                 : 
      42                 : impl MinMaxHisto {
      43               0 :     fn new() -> Self {
      44               0 :         Self {
      45               0 :             histo: histogram::Histogram::builder()
      46               0 :                 .build()
      47               0 :                 .expect("Bad histogram params"),
      48               0 :             min: u64::MAX,
      49               0 :             max: 0,
      50               0 :         }
      51               0 :     }
      52                 : 
      53               0 :     fn sample(&mut self, v: u64) -> Result<(), histogram::Error> {
      54               0 :         self.min = std::cmp::min(self.min, v);
      55               0 :         self.max = std::cmp::max(self.max, v);
      56               0 :         let r = self.histo.increment(v, 1);
      57               0 : 
      58               0 :         if r.is_err() {
      59               0 :             tracing::warn!("Bad histogram sample: {v}");
      60               0 :         }
      61                 : 
      62               0 :         r
      63               0 :     }
      64                 : 
      65               0 :     fn oneline(&self) -> String {
      66               0 :         let percentiles = match self.histo.percentiles(&[1.0, 10.0, 50.0, 90.0, 99.0]) {
      67               0 :             Ok(p) => p,
      68               0 :             Err(e) => return format!("No data: {}", e),
      69                 :         };
      70                 : 
      71               0 :         let percentiles: Vec<u64> = percentiles
      72               0 :             .iter()
      73               0 :             .map(|p| p.bucket().low() + p.bucket().high() / 2)
      74               0 :             .collect();
      75               0 : 
      76               0 :         format!(
      77               0 :             "min {}, 1% {}, 10% {}, 50% {}, 90% {}, 99% {}, max {}",
      78               0 :             self.min,
      79               0 :             percentiles[0],
      80               0 :             percentiles[1],
      81               0 :             percentiles[2],
      82               0 :             percentiles[3],
      83               0 :             percentiles[4],
      84               0 :             self.max,
      85               0 :         )
      86               0 :     }
      87                 : }
      88                 : 
      89                 : impl MetadataSummary {
      90               0 :     fn new() -> Self {
      91               0 :         Self {
      92               0 :             tenant_count: 0,
      93               0 :             timeline_count: 0,
      94               0 :             timeline_shard_count: 0,
      95               0 :             with_errors: HashSet::new(),
      96               0 :             with_warnings: HashSet::new(),
      97               0 :             with_orphans: HashSet::new(),
      98               0 :             indices_by_version: HashMap::new(),
      99               0 :             layer_count: MinMaxHisto::new(),
     100               0 :             timeline_size_bytes: MinMaxHisto::new(),
     101               0 :             layer_size_bytes: MinMaxHisto::new(),
     102               0 :         }
     103               0 :     }
     104                 : 
     105               0 :     fn update_histograms(&mut self, index_part: &IndexPart) -> Result<(), histogram::Error> {
     106               0 :         self.layer_count
     107               0 :             .sample(index_part.layer_metadata.len() as u64)?;
     108               0 :         let mut total_size: u64 = 0;
     109               0 :         for meta in index_part.layer_metadata.values() {
     110               0 :             total_size += meta.file_size;
     111               0 :             self.layer_size_bytes.sample(meta.file_size)?;
     112                 :         }
     113               0 :         self.timeline_size_bytes.sample(total_size)?;
     114                 : 
     115               0 :         Ok(())
     116               0 :     }
     117                 : 
     118               0 :     fn update_data(&mut self, data: &S3TimelineBlobData) {
     119               0 :         self.timeline_shard_count += 1;
     120                 :         if let BlobDataParseResult::Parsed {
     121               0 :             index_part,
     122                 :             index_part_generation: _,
     123                 :             s3_layers: _,
     124               0 :         } = &data.blob_data
     125                 :         {
     126               0 :             *self
     127               0 :                 .indices_by_version
     128               0 :                 .entry(index_part.get_version())
     129               0 :                 .or_insert(0) += 1;
     130                 : 
     131               0 :             if let Err(e) = self.update_histograms(index_part) {
     132                 :                 // Value out of range?  Warn that the results are untrustworthy
     133               0 :                 tracing::warn!(
     134               0 :                     "Error updating histograms, summary stats may be wrong: {}",
     135               0 :                     e
     136               0 :                 );
     137               0 :             }
     138               0 :         }
     139               0 :     }
     140                 : 
     141               0 :     fn update_analysis(&mut self, id: &TenantShardTimelineId, analysis: &TimelineAnalysis) {
     142               0 :         if !analysis.errors.is_empty() {
     143               0 :             self.with_errors.insert(*id);
     144               0 :         }
     145                 : 
     146               0 :         if !analysis.warnings.is_empty() {
     147               0 :             self.with_warnings.insert(*id);
     148               0 :         }
     149               0 :     }
     150                 : 
     151               0 :     fn notify_timeline_orphan(&mut self, ttid: &TenantShardTimelineId) {
     152               0 :         self.with_orphans.insert(*ttid);
     153               0 :     }
     154                 : 
     155                 :     /// Long-form output for printing at end of a scan
     156               0 :     pub fn summary_string(&self) -> String {
     157               0 :         let version_summary: String = itertools::join(
     158               0 :             self.indices_by_version
     159               0 :                 .iter()
     160               0 :                 .map(|(k, v)| format!("{k}: {v}")),
     161               0 :             ", ",
     162               0 :         );
     163               0 : 
     164               0 :         format!(
     165               0 :             "Tenants: {}
     166               0 : Timelines: {}
     167               0 : Timeline-shards: {}
     168               0 : With errors: {}
     169               0 : With warnings: {}
     170               0 : With orphan layers: {}
     171               0 : Index versions: {version_summary}
     172               0 : Timeline size bytes: {}
     173               0 : Layer size bytes: {}
     174               0 : Timeline layer count: {}
     175               0 : ",
     176               0 :             self.tenant_count,
     177               0 :             self.timeline_count,
     178               0 :             self.timeline_shard_count,
     179               0 :             self.with_errors.len(),
     180               0 :             self.with_warnings.len(),
     181               0 :             self.with_orphans.len(),
     182               0 :             self.timeline_size_bytes.oneline(),
     183               0 :             self.layer_size_bytes.oneline(),
     184               0 :             self.layer_count.oneline(),
     185               0 :         )
     186               0 :     }
     187                 : 
     188               0 :     pub fn is_fatal(&self) -> bool {
     189               0 :         !self.with_errors.is_empty()
     190               0 :     }
     191                 : 
     192               0 :     pub fn is_empty(&self) -> bool {
     193               0 :         self.timeline_shard_count == 0
     194               0 :     }
     195                 : }
     196                 : 
     197                 : /// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
     198               0 : pub async fn scan_metadata(
     199               0 :     bucket_config: BucketConfig,
     200               0 :     tenant_ids: Vec<TenantShardId>,
     201               0 : ) -> anyhow::Result<MetadataSummary> {
     202               0 :     let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver)?;
     203                 : 
     204               0 :     let tenants = if tenant_ids.is_empty() {
     205               0 :         futures::future::Either::Left(stream_tenants(&s3_client, &target))
     206                 :     } else {
     207               0 :         futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
     208                 :     };
     209                 : 
     210                 :     // How many tenants to process in parallel.  We need to be mindful of pageservers
     211                 :     // accessing the same per tenant prefixes, so use a lower setting than pageservers.
     212                 :     const CONCURRENCY: usize = 32;
     213                 : 
     214                 :     // Generate a stream of TenantTimelineId
     215               0 :     let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
     216               0 :     let timelines = timelines.try_buffered(CONCURRENCY);
     217               0 :     let timelines = timelines.try_flatten();
     218               0 : 
     219               0 :     // Generate a stream of S3TimelineBlobData
     220               0 :     async fn report_on_timeline(
     221               0 :         s3_client: &Client,
     222               0 :         target: &RootTarget,
     223               0 :         ttid: TenantShardTimelineId,
     224               0 :     ) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> {
     225               0 :         let data = list_timeline_blobs(s3_client, ttid, target).await?;
     226               0 :         Ok((ttid, data))
     227               0 :     }
     228               0 :     let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
     229               0 :     let timelines = timelines.try_buffered(CONCURRENCY);
     230               0 : 
     231               0 :     // We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
     232               0 :     // shards in the same tenant might refer to one anothers' keys if a shard split has happened.
     233               0 : 
     234               0 :     let mut tenant_id = None;
     235               0 :     let mut tenant_objects = TenantObjectListing::default();
     236               0 :     let mut tenant_timeline_results = Vec::new();
     237               0 : 
     238               0 :     fn analyze_tenant(
     239               0 :         tenant_id: TenantId,
     240               0 :         summary: &mut MetadataSummary,
     241               0 :         mut tenant_objects: TenantObjectListing,
     242               0 :         timelines: Vec<(TenantShardTimelineId, S3TimelineBlobData)>,
     243               0 :     ) {
     244               0 :         summary.tenant_count += 1;
     245               0 : 
     246               0 :         let mut timeline_ids = HashSet::new();
     247               0 :         let mut timeline_generations = HashMap::new();
     248               0 :         for (ttid, data) in timelines {
     249               0 :             timeline_ids.insert(ttid.timeline_id);
     250               0 :             // Stash the generation of each timeline, for later use identifying orphan layers
     251               0 :             if let BlobDataParseResult::Parsed {
     252               0 :                 index_part: _index_part,
     253               0 :                 index_part_generation,
     254               0 :                 s3_layers: _s3_layers,
     255               0 :             } = &data.blob_data
     256               0 :             {
     257               0 :                 timeline_generations.insert(ttid, *index_part_generation);
     258               0 :             }
     259               0 : 
     260               0 :             // Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
     261               0 :             // reference counts for layers across the tenant.
     262               0 :             let analysis =
     263               0 :                 branch_cleanup_and_check_errors(&ttid, &mut tenant_objects, None, None, Some(data));
     264               0 :             summary.update_analysis(&ttid, &analysis);
     265               0 :         }
     266               0 : 
     267               0 :         summary.timeline_count += timeline_ids.len();
     268               0 : 
     269               0 :         // Identifying orphan layers must be done on a tenant-wide basis, because individual
     270               0 :         // shards' layers may be referenced by other shards.
     271               0 :         //
     272               0 :         // Orphan layers are not a corruption, and not an indication of a problem.  They are just
     273               0 :         // consuming some space in remote storage, and may be cleaned up at leisure.
     274               0 :         for (shard_index, timeline_id, layer_file, generation) in tenant_objects.get_orphans() {
     275               0 :             let ttid = TenantShardTimelineId {
     276               0 :                 tenant_shard_id: TenantShardId {
     277               0 :                     tenant_id,
     278               0 :                     shard_count: shard_index.shard_count,
     279               0 :                     shard_number: shard_index.shard_number,
     280               0 :                 },
     281               0 :                 timeline_id,
     282               0 :             };
     283               0 : 
     284               0 :             if let Some(timeline_generation) = timeline_generations.get(&ttid) {
     285               0 :                 if &generation >= timeline_generation {
     286               0 :                     // Candidate orphan layer is in the current or future generation relative
     287               0 :                     // to the index we read for this timeline shard, so its absence from the index
     288               0 :                     // doesn't make it an orphan: more likely, it is a case where the layer was
     289               0 :                     // uploaded, but the index referencing the layer wasn't written yet.
     290               0 :                     continue;
     291               0 :                 }
     292               0 :             }
     293               0 : 
     294               0 :             let orphan_path = remote_layer_path(
     295               0 :                 &tenant_id,
     296               0 :                 &timeline_id,
     297               0 :                 shard_index,
     298               0 :                 &layer_file,
     299               0 :                 generation,
     300               0 :             );
     301               0 : 
     302               0 :             tracing::info!("Orphan layer detected: {orphan_path}");
     303               0 : 
     304               0 :             summary.notify_timeline_orphan(&ttid);
     305               0 :         }
     306               0 :     }
     307               0 : 
     308               0 :     // Iterate through  all the timeline results.  These are in key-order, so
     309               0 :     // all results for the same tenant will be adjacent.  We accumulate these,
     310               0 :     // and then call `analyze_tenant` to flush, when we see the next tenant ID.
     311               0 :     let mut summary = MetadataSummary::new();
     312               0 :     pin_mut!(timelines);
     313               0 :     while let Some(i) = timelines.next().await {
     314               0 :         let (ttid, data) = i?;
     315               0 :         summary.update_data(&data);
     316               0 : 
     317               0 :         match tenant_id {
     318               0 :             None => tenant_id = Some(ttid.tenant_shard_id.tenant_id),
     319               0 :             Some(prev_tenant_id) => {
     320               0 :                 if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
     321               0 :                     let tenant_objects = std::mem::take(&mut tenant_objects);
     322               0 :                     let timelines = std::mem::take(&mut tenant_timeline_results);
     323               0 :                     analyze_tenant(prev_tenant_id, &mut summary, tenant_objects, timelines);
     324               0 :                     tenant_id = Some(ttid.tenant_shard_id.tenant_id);
     325               0 :                 }
     326                 :             }
     327                 :         }
     328                 : 
     329                 :         if let BlobDataParseResult::Parsed {
     330               0 :             index_part: _index_part,
     331               0 :             index_part_generation: _index_part_generation,
     332               0 :             s3_layers,
     333               0 :         } = &data.blob_data
     334               0 :         {
     335               0 :             tenant_objects.push(ttid, s3_layers.clone());
     336               0 :         }
     337               0 :         tenant_timeline_results.push((ttid, data));
     338                 :     }
     339                 : 
     340               0 :     if !tenant_timeline_results.is_empty() {
     341               0 :         analyze_tenant(
     342               0 :             tenant_id.expect("Must be set if results are present"),
     343               0 :             &mut summary,
     344               0 :             tenant_objects,
     345               0 :             tenant_timeline_results,
     346               0 :         );
     347               0 :     }
     348                 : 
     349               0 :     Ok(summary)
     350               0 : }
        

Generated by: LCOV version 2.1-beta