TLA Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::sync::Arc;
3 :
4 : use crate::checks::{
5 : branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
6 : TimelineAnalysis,
7 : };
8 : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
9 : use crate::{init_logging, init_s3_client, BucketConfig, RootTarget, S3Target, CLI_NAME};
10 : use aws_sdk_s3::Client;
11 : use aws_types::region::Region;
12 : use futures_util::{pin_mut, StreamExt, TryStreamExt};
13 : use histogram::Histogram;
14 : use pageserver::tenant::{IndexPart, TENANTS_SEGMENT_NAME};
15 : use utils::id::TenantTimelineId;
16 :
17 : pub struct MetadataSummary {
18 : count: usize,
19 : with_errors: HashSet<TenantTimelineId>,
20 : with_warnings: HashSet<TenantTimelineId>,
21 : with_garbage: HashSet<TenantTimelineId>,
22 : indices_by_version: HashMap<usize, usize>,
23 :
24 : layer_count: MinMaxHisto,
25 : timeline_size_bytes: MinMaxHisto,
26 : layer_size_bytes: MinMaxHisto,
27 : }
28 :
29 : /// A histogram plus minimum and maximum tracking
30 : struct MinMaxHisto {
31 : histo: Histogram,
32 : min: u64,
33 : max: u64,
34 : }
35 :
36 : impl MinMaxHisto {
37 UBC 0 : fn new() -> Self {
38 0 : Self {
39 0 : histo: histogram::Histogram::builder()
40 0 : .build()
41 0 : .expect("Bad histogram params"),
42 0 : min: u64::MAX,
43 0 : max: 0,
44 0 : }
45 0 : }
46 :
47 0 : fn sample(&mut self, v: u64) -> Result<(), histogram::Error> {
48 0 : self.min = std::cmp::min(self.min, v);
49 0 : self.max = std::cmp::max(self.max, v);
50 0 : let r = self.histo.increment(v, 1);
51 0 :
52 0 : if r.is_err() {
53 0 : tracing::warn!("Bad histogram sample: {v}");
54 0 : }
55 :
56 0 : r
57 0 : }
58 :
59 0 : fn oneline(&self) -> String {
60 0 : let percentiles = match self.histo.percentiles(&[1.0, 10.0, 50.0, 90.0, 99.0]) {
61 0 : Ok(p) => p,
62 0 : Err(e) => return format!("No data: {}", e),
63 : };
64 :
65 0 : let percentiles: Vec<u64> = percentiles
66 0 : .iter()
67 0 : .map(|p| p.bucket().low() + p.bucket().high() / 2)
68 0 : .collect();
69 0 :
70 0 : format!(
71 0 : "min {}, 1% {}, 10% {}, 50% {}, 90% {}, 99% {}, max {}",
72 0 : self.min,
73 0 : percentiles[0],
74 0 : percentiles[1],
75 0 : percentiles[2],
76 0 : percentiles[3],
77 0 : percentiles[4],
78 0 : self.max,
79 0 : )
80 0 : }
81 : }
82 :
83 : impl MetadataSummary {
84 0 : fn new() -> Self {
85 0 : Self {
86 0 : count: 0,
87 0 : with_errors: HashSet::new(),
88 0 : with_warnings: HashSet::new(),
89 0 : with_garbage: HashSet::new(),
90 0 : indices_by_version: HashMap::new(),
91 0 : layer_count: MinMaxHisto::new(),
92 0 : timeline_size_bytes: MinMaxHisto::new(),
93 0 : layer_size_bytes: MinMaxHisto::new(),
94 0 : }
95 0 : }
96 :
97 : fn update_histograms(&mut self, index_part: &IndexPart) -> Result<(), histogram::Error> {
98 0 : self.layer_count
99 0 : .sample(index_part.layer_metadata.len() as u64)?;
100 0 : let mut total_size: u64 = 0;
101 0 : for meta in index_part.layer_metadata.values() {
102 0 : total_size += meta.file_size;
103 0 : self.layer_size_bytes.sample(meta.file_size)?;
104 : }
105 0 : self.timeline_size_bytes.sample(total_size)?;
106 :
107 0 : Ok(())
108 0 : }
109 :
110 0 : fn update_data(&mut self, data: &S3TimelineBlobData) {
111 0 : self.count += 1;
112 : if let BlobDataParseResult::Parsed {
113 0 : index_part,
114 : s3_layers: _,
115 0 : } = &data.blob_data
116 : {
117 0 : *self
118 0 : .indices_by_version
119 0 : .entry(index_part.get_version())
120 0 : .or_insert(0) += 1;
121 :
122 0 : if let Err(e) = self.update_histograms(index_part) {
123 : // Value out of range? Warn that the results are untrustworthy
124 0 : tracing::warn!(
125 0 : "Error updating histograms, summary stats may be wrong: {}",
126 0 : e
127 0 : );
128 0 : }
129 0 : }
130 0 : }
131 :
132 0 : fn update_analysis(&mut self, id: &TenantTimelineId, analysis: &TimelineAnalysis) {
133 0 : if !analysis.errors.is_empty() {
134 0 : self.with_errors.insert(*id);
135 0 : }
136 :
137 0 : if !analysis.warnings.is_empty() {
138 0 : self.with_warnings.insert(*id);
139 0 : }
140 0 : }
141 :
142 : /// Long-form output for printing at end of a scan
143 0 : pub fn summary_string(&self) -> String {
144 0 : let version_summary: String = itertools::join(
145 0 : self.indices_by_version
146 0 : .iter()
147 0 : .map(|(k, v)| format!("{k}: {v}")),
148 0 : ", ",
149 0 : );
150 0 :
151 0 : format!(
152 0 : "Timelines: {0}
153 0 : With errors: {1}
154 0 : With warnings: {2}
155 0 : With garbage: {3}
156 0 : Index versions: {version_summary}
157 0 : Timeline size bytes: {4}
158 0 : Layer size bytes: {5}
159 0 : Timeline layer count: {6}
160 0 : ",
161 0 : self.count,
162 0 : self.with_errors.len(),
163 0 : self.with_warnings.len(),
164 0 : self.with_garbage.len(),
165 0 : self.timeline_size_bytes.oneline(),
166 0 : self.layer_size_bytes.oneline(),
167 0 : self.layer_count.oneline(),
168 0 : )
169 0 : }
170 :
171 0 : pub fn is_fatal(&self) -> bool {
172 0 : !self.with_errors.is_empty()
173 0 : }
174 : }
175 :
176 : /// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
177 0 : pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<MetadataSummary> {
178 0 : let file_name = format!(
179 0 : "{}_scan_metadata_{}_{}.log",
180 0 : CLI_NAME,
181 0 : bucket_config.bucket,
182 0 : chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
183 0 : );
184 0 :
185 0 : let _guard = init_logging(&file_name);
186 0 :
187 0 : let s3_client = Arc::new(init_s3_client(
188 0 : bucket_config.sso_account_id,
189 0 : Region::new(bucket_config.region),
190 0 : ));
191 0 : let delimiter = "/";
192 0 : let target = RootTarget::Pageserver(S3Target {
193 0 : bucket_name: bucket_config.bucket.to_string(),
194 0 : prefix_in_bucket: ["pageserver", "v1", TENANTS_SEGMENT_NAME, ""].join(delimiter),
195 0 : delimiter: delimiter.to_string(),
196 0 : });
197 0 :
198 0 : let tenants = stream_tenants(&s3_client, &target);
199 0 :
200 0 : // How many tenants to process in parallel. We need to be mindful of pageservers
201 0 : // accessing the same per tenant prefixes, so use a lower setting than pageservers.
202 0 : const CONCURRENCY: usize = 32;
203 0 :
204 0 : // Generate a stream of TenantTimelineId
205 0 : let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
206 0 : let timelines = timelines.try_buffer_unordered(CONCURRENCY);
207 0 : let timelines = timelines.try_flatten();
208 0 :
209 0 : // Generate a stream of S3TimelineBlobData
210 0 : async fn report_on_timeline(
211 0 : s3_client: &Client,
212 0 : target: &RootTarget,
213 0 : ttid: TenantTimelineId,
214 0 : ) -> anyhow::Result<(TenantTimelineId, S3TimelineBlobData)> {
215 0 : let data = list_timeline_blobs(s3_client, ttid, target).await?;
216 0 : Ok((ttid, data))
217 0 : }
218 0 : let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
219 0 : let timelines = timelines.try_buffer_unordered(CONCURRENCY);
220 0 :
221 0 : let mut summary = MetadataSummary::new();
222 0 : pin_mut!(timelines);
223 0 : while let Some(i) = timelines.next().await {
224 0 : let (ttid, data) = i?;
225 0 : summary.update_data(&data);
226 :
227 0 : let analysis =
228 0 : branch_cleanup_and_check_errors(&ttid, &target, None, None, Some(data)).await;
229 :
230 0 : summary.update_analysis(&ttid, &analysis);
231 : }
232 :
233 0 : Ok(summary)
234 0 : }
|