LCOV - differential code coverage report
Current view: top level - s3_scrubber/src - scan_metadata.rs (source / functions) Coverage Total Hit UBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 0.0 % 168 0 168
Current Date: 2023-10-19 02:04:12 Functions: 0.0 % 19 0 19
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use std::collections::{HashMap, HashSet};
       2                 : use std::sync::Arc;
       3                 : 
       4                 : use crate::checks::{
       5                 :     branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
       6                 :     TimelineAnalysis,
       7                 : };
       8                 : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
       9                 : use crate::{init_logging, init_s3_client, BucketConfig, RootTarget, S3Target, CLI_NAME};
      10                 : use aws_sdk_s3::Client;
      11                 : use aws_types::region::Region;
      12                 : use futures_util::{pin_mut, StreamExt, TryStreamExt};
      13                 : use histogram::Histogram;
      14                 : use pageserver::tenant::{IndexPart, TENANTS_SEGMENT_NAME};
      15                 : use utils::id::TenantTimelineId;
      16                 : 
      17                 : pub struct MetadataSummary {
      18                 :     count: usize,
      19                 :     with_errors: HashSet<TenantTimelineId>,
      20                 :     with_warnings: HashSet<TenantTimelineId>,
      21                 :     with_garbage: HashSet<TenantTimelineId>,
      22                 :     indices_by_version: HashMap<usize, usize>,
      23                 : 
      24                 :     layer_count: MinMaxHisto,
      25                 :     timeline_size_bytes: MinMaxHisto,
      26                 :     layer_size_bytes: MinMaxHisto,
      27                 : }
      28                 : 
      29                 : /// A histogram plus minimum and maximum tracking
      30                 : struct MinMaxHisto {
      31                 :     histo: Histogram,
      32                 :     min: u64,
      33                 :     max: u64,
      34                 : }
      35                 : 
      36                 : impl MinMaxHisto {
      37 UBC           0 :     fn new() -> Self {
      38               0 :         Self {
      39               0 :             histo: histogram::Histogram::builder()
      40               0 :                 .build()
      41               0 :                 .expect("Bad histogram params"),
      42               0 :             min: u64::MAX,
      43               0 :             max: 0,
      44               0 :         }
      45               0 :     }
      46                 : 
      47               0 :     fn sample(&mut self, v: u64) -> Result<(), histogram::Error> {
      48               0 :         self.min = std::cmp::min(self.min, v);
      49               0 :         self.max = std::cmp::max(self.max, v);
      50               0 :         let r = self.histo.increment(v, 1);
      51               0 : 
      52               0 :         if r.is_err() {
      53               0 :             tracing::warn!("Bad histogram sample: {v}");
      54               0 :         }
      55                 : 
      56               0 :         r
      57               0 :     }
      58                 : 
      59               0 :     fn oneline(&self) -> String {
      60               0 :         let percentiles = match self.histo.percentiles(&[1.0, 10.0, 50.0, 90.0, 99.0]) {
      61               0 :             Ok(p) => p,
      62               0 :             Err(e) => return format!("No data: {}", e),
      63                 :         };
      64                 : 
      65               0 :         let percentiles: Vec<u64> = percentiles
      66               0 :             .iter()
      67               0 :             .map(|p| p.bucket().low() + p.bucket().high() / 2)
      68               0 :             .collect();
      69               0 : 
      70               0 :         format!(
      71               0 :             "min {}, 1% {}, 10% {}, 50% {}, 90% {}, 99% {}, max {}",
      72               0 :             self.min,
      73               0 :             percentiles[0],
      74               0 :             percentiles[1],
      75               0 :             percentiles[2],
      76               0 :             percentiles[3],
      77               0 :             percentiles[4],
      78               0 :             self.max,
      79               0 :         )
      80               0 :     }
      81                 : }
      82                 : 
      83                 : impl MetadataSummary {
      84               0 :     fn new() -> Self {
      85               0 :         Self {
      86               0 :             count: 0,
      87               0 :             with_errors: HashSet::new(),
      88               0 :             with_warnings: HashSet::new(),
      89               0 :             with_garbage: HashSet::new(),
      90               0 :             indices_by_version: HashMap::new(),
      91               0 :             layer_count: MinMaxHisto::new(),
      92               0 :             timeline_size_bytes: MinMaxHisto::new(),
      93               0 :             layer_size_bytes: MinMaxHisto::new(),
      94               0 :         }
      95               0 :     }
      96                 : 
      97                 :     fn update_histograms(&mut self, index_part: &IndexPart) -> Result<(), histogram::Error> {
      98               0 :         self.layer_count
      99               0 :             .sample(index_part.layer_metadata.len() as u64)?;
     100               0 :         let mut total_size: u64 = 0;
     101               0 :         for meta in index_part.layer_metadata.values() {
     102               0 :             total_size += meta.file_size;
     103               0 :             self.layer_size_bytes.sample(meta.file_size)?;
     104                 :         }
     105               0 :         self.timeline_size_bytes.sample(total_size)?;
     106                 : 
     107               0 :         Ok(())
     108               0 :     }
     109                 : 
     110               0 :     fn update_data(&mut self, data: &S3TimelineBlobData) {
     111               0 :         self.count += 1;
     112                 :         if let BlobDataParseResult::Parsed {
     113               0 :             index_part,
     114                 :             s3_layers: _,
     115               0 :         } = &data.blob_data
     116                 :         {
     117               0 :             *self
     118               0 :                 .indices_by_version
     119               0 :                 .entry(index_part.get_version())
     120               0 :                 .or_insert(0) += 1;
     121                 : 
     122               0 :             if let Err(e) = self.update_histograms(index_part) {
     123                 :                 // Value out of range?  Warn that the results are untrustworthy
     124               0 :                 tracing::warn!(
     125               0 :                     "Error updating histograms, summary stats may be wrong: {}",
     126               0 :                     e
     127               0 :                 );
     128               0 :             }
     129               0 :         }
     130               0 :     }
     131                 : 
     132               0 :     fn update_analysis(&mut self, id: &TenantTimelineId, analysis: &TimelineAnalysis) {
     133               0 :         if !analysis.errors.is_empty() {
     134               0 :             self.with_errors.insert(*id);
     135               0 :         }
     136                 : 
     137               0 :         if !analysis.warnings.is_empty() {
     138               0 :             self.with_warnings.insert(*id);
     139               0 :         }
     140               0 :     }
     141                 : 
     142                 :     /// Long-form output for printing at end of a scan
     143               0 :     pub fn summary_string(&self) -> String {
     144               0 :         let version_summary: String = itertools::join(
     145               0 :             self.indices_by_version
     146               0 :                 .iter()
     147               0 :                 .map(|(k, v)| format!("{k}: {v}")),
     148               0 :             ", ",
     149               0 :         );
     150               0 : 
     151               0 :         format!(
     152               0 :             "Timelines: {0}
     153               0 : With errors: {1}
     154               0 : With warnings: {2}
     155               0 : With garbage: {3}
     156               0 : Index versions: {version_summary}
     157               0 : Timeline size bytes: {4}
     158               0 : Layer size bytes: {5}
     159               0 : Timeline layer count: {6}
     160               0 : ",
     161               0 :             self.count,
     162               0 :             self.with_errors.len(),
     163               0 :             self.with_warnings.len(),
     164               0 :             self.with_garbage.len(),
     165               0 :             self.timeline_size_bytes.oneline(),
     166               0 :             self.layer_size_bytes.oneline(),
     167               0 :             self.layer_count.oneline(),
     168               0 :         )
     169               0 :     }
     170                 : 
     171               0 :     pub fn is_fatal(&self) -> bool {
     172               0 :         !self.with_errors.is_empty()
     173               0 :     }
     174                 : }
     175                 : 
     176                 : /// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
     177               0 : pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<MetadataSummary> {
     178               0 :     let file_name = format!(
     179               0 :         "{}_scan_metadata_{}_{}.log",
     180               0 :         CLI_NAME,
     181               0 :         bucket_config.bucket,
     182               0 :         chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
     183               0 :     );
     184               0 : 
     185               0 :     let _guard = init_logging(&file_name);
     186               0 : 
     187               0 :     let s3_client = Arc::new(init_s3_client(
     188               0 :         bucket_config.sso_account_id,
     189               0 :         Region::new(bucket_config.region),
     190               0 :     ));
     191               0 :     let delimiter = "/";
     192               0 :     let target = RootTarget::Pageserver(S3Target {
     193               0 :         bucket_name: bucket_config.bucket.to_string(),
     194               0 :         prefix_in_bucket: ["pageserver", "v1", TENANTS_SEGMENT_NAME, ""].join(delimiter),
     195               0 :         delimiter: delimiter.to_string(),
     196               0 :     });
     197               0 : 
     198               0 :     let tenants = stream_tenants(&s3_client, &target);
     199               0 : 
     200               0 :     // How many tenants to process in parallel.  We need to be mindful of pageservers
     201               0 :     // accessing the same per tenant prefixes, so use a lower setting than pageservers.
     202               0 :     const CONCURRENCY: usize = 32;
     203               0 : 
     204               0 :     // Generate a stream of TenantTimelineId
     205               0 :     let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
     206               0 :     let timelines = timelines.try_buffer_unordered(CONCURRENCY);
     207               0 :     let timelines = timelines.try_flatten();
     208               0 : 
     209               0 :     // Generate a stream of S3TimelineBlobData
     210               0 :     async fn report_on_timeline(
     211               0 :         s3_client: &Client,
     212               0 :         target: &RootTarget,
     213               0 :         ttid: TenantTimelineId,
     214               0 :     ) -> anyhow::Result<(TenantTimelineId, S3TimelineBlobData)> {
     215               0 :         let data = list_timeline_blobs(s3_client, ttid, target).await?;
     216               0 :         Ok((ttid, data))
     217               0 :     }
     218               0 :     let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
     219               0 :     let timelines = timelines.try_buffer_unordered(CONCURRENCY);
     220               0 : 
     221               0 :     let mut summary = MetadataSummary::new();
     222               0 :     pin_mut!(timelines);
     223               0 :     while let Some(i) = timelines.next().await {
     224               0 :         let (ttid, data) = i?;
     225               0 :         summary.update_data(&data);
     226                 : 
     227               0 :         let analysis =
     228               0 :             branch_cleanup_and_check_errors(&ttid, &target, None, None, Some(data)).await;
     229                 : 
     230               0 :         summary.update_analysis(&ttid, &analysis);
     231                 :     }
     232                 : 
     233               0 :     Ok(summary)
     234               0 : }
        

Generated by: LCOV version 2.1-beta