LCOV - code coverage report
Current view: top level - s3_scrubber/src - scan_metadata.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 0.0 % 168 0
Test Date: 2023-09-06 10:18:01 Functions: 0.0 % 19 0

            Line data    Source code
       1              : use std::collections::{HashMap, HashSet};
       2              : use std::sync::Arc;
       3              : 
       4              : use crate::checks::{
       5              :     branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
       6              :     TimelineAnalysis,
       7              : };
       8              : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
       9              : use crate::{init_logging, init_s3_client, BucketConfig, RootTarget, S3Target, CLI_NAME};
      10              : use aws_sdk_s3::Client;
      11              : use aws_types::region::Region;
      12              : use futures_util::{pin_mut, StreamExt, TryStreamExt};
      13              : use histogram::Histogram;
      14              : use pageserver::tenant::{IndexPart, TENANTS_SEGMENT_NAME};
      15              : use utils::id::TenantTimelineId;
      16              : 
      17              : pub struct MetadataSummary {
      18              :     count: usize,
      19              :     with_errors: HashSet<TenantTimelineId>,
      20              :     with_warnings: HashSet<TenantTimelineId>,
      21              :     with_garbage: HashSet<TenantTimelineId>,
      22              :     indices_by_version: HashMap<usize, usize>,
      23              : 
      24              :     layer_count: MinMaxHisto,
      25              :     timeline_size_bytes: MinMaxHisto,
      26              :     layer_size_bytes: MinMaxHisto,
      27              : }
      28              : 
      29              : /// A histogram plus minimum and maximum tracking
      30              : struct MinMaxHisto {
      31              :     histo: Histogram,
      32              :     min: u64,
      33              :     max: u64,
      34              : }
      35              : 
      36              : impl MinMaxHisto {
      37            0 :     fn new() -> Self {
      38            0 :         Self {
      39            0 :             histo: histogram::Histogram::builder()
      40            0 :                 .build()
      41            0 :                 .expect("Bad histogram params"),
      42            0 :             min: u64::MAX,
      43            0 :             max: 0,
      44            0 :         }
      45            0 :     }
      46              : 
      47            0 :     fn sample(&mut self, v: u64) -> Result<(), histogram::Error> {
      48            0 :         self.min = std::cmp::min(self.min, v);
      49            0 :         self.max = std::cmp::max(self.max, v);
      50            0 :         let r = self.histo.increment(v, 1);
      51            0 : 
      52            0 :         if r.is_err() {
      53            0 :             tracing::warn!("Bad histogram sample: {v}");
      54            0 :         }
      55              : 
      56            0 :         r
      57            0 :     }
      58              : 
      59            0 :     fn oneline(&self) -> String {
      60            0 :         let percentiles = match self.histo.percentiles(&[1.0, 10.0, 50.0, 90.0, 99.0]) {
      61            0 :             Ok(p) => p,
      62            0 :             Err(e) => return format!("No data: {}", e),
      63              :         };
      64              : 
      65            0 :         let percentiles: Vec<u64> = percentiles
      66            0 :             .iter()
      67            0 :             .map(|p| p.bucket().low() + p.bucket().high() / 2)
      68            0 :             .collect();
      69            0 : 
      70            0 :         format!(
      71            0 :             "min {}, 1% {}, 10% {}, 50% {}, 90% {}, 99% {}, max {}",
      72            0 :             self.min,
      73            0 :             percentiles[0],
      74            0 :             percentiles[1],
      75            0 :             percentiles[2],
      76            0 :             percentiles[3],
      77            0 :             percentiles[4],
      78            0 :             self.max,
      79            0 :         )
      80            0 :     }
      81              : }
      82              : 
      83              : impl MetadataSummary {
      84            0 :     fn new() -> Self {
      85            0 :         Self {
      86            0 :             count: 0,
      87            0 :             with_errors: HashSet::new(),
      88            0 :             with_warnings: HashSet::new(),
      89            0 :             with_garbage: HashSet::new(),
      90            0 :             indices_by_version: HashMap::new(),
      91            0 :             layer_count: MinMaxHisto::new(),
      92            0 :             timeline_size_bytes: MinMaxHisto::new(),
      93            0 :             layer_size_bytes: MinMaxHisto::new(),
      94            0 :         }
      95            0 :     }
      96              : 
      97              :     fn update_histograms(&mut self, index_part: &IndexPart) -> Result<(), histogram::Error> {
      98            0 :         self.layer_count
      99            0 :             .sample(index_part.layer_metadata.len() as u64)?;
     100            0 :         let mut total_size: u64 = 0;
     101            0 :         for meta in index_part.layer_metadata.values() {
     102            0 :             total_size += meta.file_size;
     103            0 :             self.layer_size_bytes.sample(meta.file_size)?;
     104              :         }
     105            0 :         self.timeline_size_bytes.sample(total_size)?;
     106              : 
     107            0 :         Ok(())
     108            0 :     }
     109              : 
     110            0 :     fn update_data(&mut self, data: &S3TimelineBlobData) {
     111            0 :         self.count += 1;
     112              :         if let BlobDataParseResult::Parsed {
     113            0 :             index_part,
     114              :             s3_layers: _,
     115            0 :         } = &data.blob_data
     116              :         {
     117            0 :             *self
     118            0 :                 .indices_by_version
     119            0 :                 .entry(index_part.get_version())
     120            0 :                 .or_insert(0) += 1;
     121              : 
     122            0 :             if let Err(e) = self.update_histograms(index_part) {
     123              :                 // Value out of range?  Warn that the results are untrustworthy
     124            0 :                 tracing::warn!(
     125            0 :                     "Error updating histograms, summary stats may be wrong: {}",
     126            0 :                     e
     127            0 :                 );
     128            0 :             }
     129            0 :         }
     130            0 :     }
     131              : 
     132            0 :     fn update_analysis(&mut self, id: &TenantTimelineId, analysis: &TimelineAnalysis) {
     133            0 :         if !analysis.errors.is_empty() {
     134            0 :             self.with_errors.insert(*id);
     135            0 :         }
     136              : 
     137            0 :         if !analysis.warnings.is_empty() {
     138            0 :             self.with_warnings.insert(*id);
     139            0 :         }
     140            0 :     }
     141              : 
     142              :     /// Long-form output for printing at end of a scan
     143            0 :     pub fn summary_string(&self) -> String {
     144            0 :         let version_summary: String = itertools::join(
     145            0 :             self.indices_by_version
     146            0 :                 .iter()
     147            0 :                 .map(|(k, v)| format!("{k}: {v}")),
     148            0 :             ", ",
     149            0 :         );
     150            0 : 
     151            0 :         format!(
     152            0 :             "Timelines: {0}
     153            0 : With errors: {1}
     154            0 : With warnings: {2}
     155            0 : With garbage: {3}
     156            0 : Index versions: {version_summary}
     157            0 : Timeline size bytes: {4}
     158            0 : Layer size bytes: {5}
     159            0 : Timeline layer count: {6}
     160            0 : ",
     161            0 :             self.count,
     162            0 :             self.with_errors.len(),
     163            0 :             self.with_warnings.len(),
     164            0 :             self.with_garbage.len(),
     165            0 :             self.timeline_size_bytes.oneline(),
     166            0 :             self.layer_size_bytes.oneline(),
     167            0 :             self.layer_count.oneline(),
     168            0 :         )
     169            0 :     }
     170              : 
     171            0 :     pub fn is_fatal(&self) -> bool {
     172            0 :         !self.with_errors.is_empty()
     173            0 :     }
     174              : }
     175              : 
     176              : /// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
     177            0 : pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<MetadataSummary> {
     178            0 :     let file_name = format!(
     179            0 :         "{}_scan_metadata_{}_{}.log",
     180            0 :         CLI_NAME,
     181            0 :         bucket_config.bucket,
     182            0 :         chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
     183            0 :     );
     184            0 : 
     185            0 :     let _guard = init_logging(&file_name);
     186            0 : 
     187            0 :     let s3_client = Arc::new(init_s3_client(
     188            0 :         bucket_config.sso_account_id,
     189            0 :         Region::new(bucket_config.region),
     190            0 :     ));
     191            0 :     let delimiter = "/";
     192            0 :     let target = RootTarget::Pageserver(S3Target {
     193            0 :         bucket_name: bucket_config.bucket.to_string(),
     194            0 :         prefix_in_bucket: ["pageserver", "v1", TENANTS_SEGMENT_NAME, ""].join(delimiter),
     195            0 :         delimiter: delimiter.to_string(),
     196            0 :     });
     197            0 : 
     198            0 :     let tenants = stream_tenants(&s3_client, &target);
     199            0 : 
     200            0 :     // How many tenants to process in parallel.  We need to be mindful of pageservers
     201            0 :     // accessing the same per tenant prefixes, so use a lower setting than pageservers.
     202            0 :     const CONCURRENCY: usize = 32;
     203            0 : 
     204            0 :     // Generate a stream of TenantTimelineId
     205            0 :     let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
     206            0 :     let timelines = timelines.try_buffer_unordered(CONCURRENCY);
     207            0 :     let timelines = timelines.try_flatten();
     208            0 : 
     209            0 :     // Generate a stream of S3TimelineBlobData
     210            0 :     async fn report_on_timeline(
     211            0 :         s3_client: &Client,
     212            0 :         target: &RootTarget,
     213            0 :         ttid: TenantTimelineId,
     214            0 :     ) -> anyhow::Result<(TenantTimelineId, S3TimelineBlobData)> {
     215            0 :         let data = list_timeline_blobs(s3_client, ttid, target).await?;
     216            0 :         Ok((ttid, data))
     217            0 :     }
     218            0 :     let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
     219            0 :     let timelines = timelines.try_buffer_unordered(CONCURRENCY);
     220            0 : 
     221            0 :     let mut summary = MetadataSummary::new();
     222            0 :     pin_mut!(timelines);
     223            0 :     while let Some(i) = timelines.next().await {
     224            0 :         let (ttid, data) = i?;
     225            0 :         summary.update_data(&data);
     226              : 
     227            0 :         let analysis =
     228            0 :             branch_cleanup_and_check_errors(&ttid, &target, None, None, Some(data)).await;
     229              : 
     230            0 :         summary.update_analysis(&ttid, &analysis);
     231              :     }
     232              : 
     233            0 :     Ok(summary)
     234            0 : }
        

Generated by: LCOV version 2.1-beta