Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 :
3 : use crate::checks::{
4 : branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
5 : TenantObjectListing, TimelineAnalysis,
6 : };
7 : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
8 : use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
9 : use aws_sdk_s3::Client;
10 : use futures_util::{pin_mut, StreamExt, TryStreamExt};
11 : use histogram::Histogram;
12 : use pageserver::tenant::remote_timeline_client::remote_layer_path;
13 : use pageserver::tenant::IndexPart;
14 : use pageserver_api::shard::TenantShardId;
15 : use serde::Serialize;
16 : use utils::id::TenantId;
17 :
18 0 : #[derive(Serialize)]
19 : pub struct MetadataSummary {
20 : tenant_count: usize,
21 : timeline_count: usize,
22 : timeline_shard_count: usize,
23 : with_errors: HashSet<TenantShardTimelineId>,
24 : with_warnings: HashSet<TenantShardTimelineId>,
25 : with_orphans: HashSet<TenantShardTimelineId>,
26 : indices_by_version: HashMap<usize, usize>,
27 :
28 : layer_count: MinMaxHisto,
29 : timeline_size_bytes: MinMaxHisto,
30 : layer_size_bytes: MinMaxHisto,
31 : }
32 :
33 : /// A histogram plus minimum and maximum tracking
34 0 : #[derive(Serialize)]
35 : struct MinMaxHisto {
36 : #[serde(skip)]
37 : histo: Histogram,
38 : min: u64,
39 : max: u64,
40 : }
41 :
42 : impl MinMaxHisto {
43 0 : fn new() -> Self {
44 0 : Self {
45 0 : histo: histogram::Histogram::builder()
46 0 : .build()
47 0 : .expect("Bad histogram params"),
48 0 : min: u64::MAX,
49 0 : max: 0,
50 0 : }
51 0 : }
52 :
53 0 : fn sample(&mut self, v: u64) -> Result<(), histogram::Error> {
54 0 : self.min = std::cmp::min(self.min, v);
55 0 : self.max = std::cmp::max(self.max, v);
56 0 : let r = self.histo.increment(v, 1);
57 0 :
58 0 : if r.is_err() {
59 0 : tracing::warn!("Bad histogram sample: {v}");
60 0 : }
61 :
62 0 : r
63 0 : }
64 :
65 0 : fn oneline(&self) -> String {
66 0 : let percentiles = match self.histo.percentiles(&[1.0, 10.0, 50.0, 90.0, 99.0]) {
67 0 : Ok(p) => p,
68 0 : Err(e) => return format!("No data: {}", e),
69 : };
70 :
71 0 : let percentiles: Vec<u64> = percentiles
72 0 : .iter()
73 0 : .map(|p| p.bucket().low() + p.bucket().high() / 2)
74 0 : .collect();
75 0 :
76 0 : format!(
77 0 : "min {}, 1% {}, 10% {}, 50% {}, 90% {}, 99% {}, max {}",
78 0 : self.min,
79 0 : percentiles[0],
80 0 : percentiles[1],
81 0 : percentiles[2],
82 0 : percentiles[3],
83 0 : percentiles[4],
84 0 : self.max,
85 0 : )
86 0 : }
87 : }
88 :
89 : impl MetadataSummary {
90 0 : fn new() -> Self {
91 0 : Self {
92 0 : tenant_count: 0,
93 0 : timeline_count: 0,
94 0 : timeline_shard_count: 0,
95 0 : with_errors: HashSet::new(),
96 0 : with_warnings: HashSet::new(),
97 0 : with_orphans: HashSet::new(),
98 0 : indices_by_version: HashMap::new(),
99 0 : layer_count: MinMaxHisto::new(),
100 0 : timeline_size_bytes: MinMaxHisto::new(),
101 0 : layer_size_bytes: MinMaxHisto::new(),
102 0 : }
103 0 : }
104 :
105 0 : fn update_histograms(&mut self, index_part: &IndexPart) -> Result<(), histogram::Error> {
106 0 : self.layer_count
107 0 : .sample(index_part.layer_metadata.len() as u64)?;
108 0 : let mut total_size: u64 = 0;
109 0 : for meta in index_part.layer_metadata.values() {
110 0 : total_size += meta.file_size;
111 0 : self.layer_size_bytes.sample(meta.file_size)?;
112 : }
113 0 : self.timeline_size_bytes.sample(total_size)?;
114 :
115 0 : Ok(())
116 0 : }
117 :
118 0 : fn update_data(&mut self, data: &S3TimelineBlobData) {
119 0 : self.timeline_shard_count += 1;
120 : if let BlobDataParseResult::Parsed {
121 0 : index_part,
122 : index_part_generation: _,
123 : s3_layers: _,
124 0 : } = &data.blob_data
125 : {
126 0 : *self
127 0 : .indices_by_version
128 0 : .entry(index_part.get_version())
129 0 : .or_insert(0) += 1;
130 :
131 0 : if let Err(e) = self.update_histograms(index_part) {
132 : // Value out of range? Warn that the results are untrustworthy
133 0 : tracing::warn!(
134 0 : "Error updating histograms, summary stats may be wrong: {}",
135 0 : e
136 0 : );
137 0 : }
138 0 : }
139 0 : }
140 :
141 0 : fn update_analysis(&mut self, id: &TenantShardTimelineId, analysis: &TimelineAnalysis) {
142 0 : if !analysis.errors.is_empty() {
143 0 : self.with_errors.insert(*id);
144 0 : }
145 :
146 0 : if !analysis.warnings.is_empty() {
147 0 : self.with_warnings.insert(*id);
148 0 : }
149 0 : }
150 :
151 0 : fn notify_timeline_orphan(&mut self, ttid: &TenantShardTimelineId) {
152 0 : self.with_orphans.insert(*ttid);
153 0 : }
154 :
155 : /// Long-form output for printing at end of a scan
156 0 : pub fn summary_string(&self) -> String {
157 0 : let version_summary: String = itertools::join(
158 0 : self.indices_by_version
159 0 : .iter()
160 0 : .map(|(k, v)| format!("{k}: {v}")),
161 0 : ", ",
162 0 : );
163 0 :
164 0 : format!(
165 0 : "Tenants: {}
166 0 : Timelines: {}
167 0 : Timeline-shards: {}
168 0 : With errors: {}
169 0 : With warnings: {}
170 0 : With orphan layers: {}
171 0 : Index versions: {version_summary}
172 0 : Timeline size bytes: {}
173 0 : Layer size bytes: {}
174 0 : Timeline layer count: {}
175 0 : ",
176 0 : self.tenant_count,
177 0 : self.timeline_count,
178 0 : self.timeline_shard_count,
179 0 : self.with_errors.len(),
180 0 : self.with_warnings.len(),
181 0 : self.with_orphans.len(),
182 0 : self.timeline_size_bytes.oneline(),
183 0 : self.layer_size_bytes.oneline(),
184 0 : self.layer_count.oneline(),
185 0 : )
186 0 : }
187 :
188 0 : pub fn is_fatal(&self) -> bool {
189 0 : !self.with_errors.is_empty()
190 0 : }
191 :
192 0 : pub fn is_empty(&self) -> bool {
193 0 : self.timeline_shard_count == 0
194 0 : }
195 : }
196 :
197 : /// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
198 0 : pub async fn scan_metadata(
199 0 : bucket_config: BucketConfig,
200 0 : tenant_ids: Vec<TenantShardId>,
201 0 : ) -> anyhow::Result<MetadataSummary> {
202 0 : let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver)?;
203 :
204 0 : let tenants = if tenant_ids.is_empty() {
205 0 : futures::future::Either::Left(stream_tenants(&s3_client, &target))
206 : } else {
207 0 : futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
208 : };
209 :
210 : // How many tenants to process in parallel. We need to be mindful of pageservers
211 : // accessing the same per tenant prefixes, so use a lower setting than pageservers.
212 : const CONCURRENCY: usize = 32;
213 :
214 : // Generate a stream of TenantTimelineId
215 0 : let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
216 0 : let timelines = timelines.try_buffered(CONCURRENCY);
217 0 : let timelines = timelines.try_flatten();
218 0 :
219 0 : // Generate a stream of S3TimelineBlobData
220 0 : async fn report_on_timeline(
221 0 : s3_client: &Client,
222 0 : target: &RootTarget,
223 0 : ttid: TenantShardTimelineId,
224 0 : ) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> {
225 0 : let data = list_timeline_blobs(s3_client, ttid, target).await?;
226 0 : Ok((ttid, data))
227 0 : }
228 0 : let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
229 0 : let timelines = timelines.try_buffered(CONCURRENCY);
230 0 :
231 0 : // We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
232 0 : // shards in the same tenant might refer to one anothers' keys if a shard split has happened.
233 0 :
234 0 : let mut tenant_id = None;
235 0 : let mut tenant_objects = TenantObjectListing::default();
236 0 : let mut tenant_timeline_results = Vec::new();
237 0 :
238 0 : fn analyze_tenant(
239 0 : tenant_id: TenantId,
240 0 : summary: &mut MetadataSummary,
241 0 : mut tenant_objects: TenantObjectListing,
242 0 : timelines: Vec<(TenantShardTimelineId, S3TimelineBlobData)>,
243 0 : ) {
244 0 : summary.tenant_count += 1;
245 0 :
246 0 : let mut timeline_ids = HashSet::new();
247 0 : let mut timeline_generations = HashMap::new();
248 0 : for (ttid, data) in timelines {
249 0 : timeline_ids.insert(ttid.timeline_id);
250 0 : // Stash the generation of each timeline, for later use identifying orphan layers
251 0 : if let BlobDataParseResult::Parsed {
252 0 : index_part: _index_part,
253 0 : index_part_generation,
254 0 : s3_layers: _s3_layers,
255 0 : } = &data.blob_data
256 0 : {
257 0 : timeline_generations.insert(ttid, *index_part_generation);
258 0 : }
259 0 :
260 0 : // Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
261 0 : // reference counts for layers across the tenant.
262 0 : let analysis =
263 0 : branch_cleanup_and_check_errors(&ttid, &mut tenant_objects, None, None, Some(data));
264 0 : summary.update_analysis(&ttid, &analysis);
265 0 : }
266 0 :
267 0 : summary.timeline_count += timeline_ids.len();
268 0 :
269 0 : // Identifying orphan layers must be done on a tenant-wide basis, because individual
270 0 : // shards' layers may be referenced by other shards.
271 0 : //
272 0 : // Orphan layers are not a corruption, and not an indication of a problem. They are just
273 0 : // consuming some space in remote storage, and may be cleaned up at leisure.
274 0 : for (shard_index, timeline_id, layer_file, generation) in tenant_objects.get_orphans() {
275 0 : let ttid = TenantShardTimelineId {
276 0 : tenant_shard_id: TenantShardId {
277 0 : tenant_id,
278 0 : shard_count: shard_index.shard_count,
279 0 : shard_number: shard_index.shard_number,
280 0 : },
281 0 : timeline_id,
282 0 : };
283 0 :
284 0 : if let Some(timeline_generation) = timeline_generations.get(&ttid) {
285 0 : if &generation >= timeline_generation {
286 0 : // Candidate orphan layer is in the current or future generation relative
287 0 : // to the index we read for this timeline shard, so its absence from the index
288 0 : // doesn't make it an orphan: more likely, it is a case where the layer was
289 0 : // uploaded, but the index referencing the layer wasn't written yet.
290 0 : continue;
291 0 : }
292 0 : }
293 0 :
294 0 : let orphan_path = remote_layer_path(
295 0 : &tenant_id,
296 0 : &timeline_id,
297 0 : shard_index,
298 0 : &layer_file,
299 0 : generation,
300 0 : );
301 0 :
302 0 : tracing::info!("Orphan layer detected: {orphan_path}");
303 0 :
304 0 : summary.notify_timeline_orphan(&ttid);
305 0 : }
306 0 : }
307 0 :
308 0 : // Iterate through all the timeline results. These are in key-order, so
309 0 : // all results for the same tenant will be adjacent. We accumulate these,
310 0 : // and then call `analyze_tenant` to flush, when we see the next tenant ID.
311 0 : let mut summary = MetadataSummary::new();
312 0 : pin_mut!(timelines);
313 0 : while let Some(i) = timelines.next().await {
314 0 : let (ttid, data) = i?;
315 0 : summary.update_data(&data);
316 0 :
317 0 : match tenant_id {
318 0 : None => tenant_id = Some(ttid.tenant_shard_id.tenant_id),
319 0 : Some(prev_tenant_id) => {
320 0 : if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
321 0 : let tenant_objects = std::mem::take(&mut tenant_objects);
322 0 : let timelines = std::mem::take(&mut tenant_timeline_results);
323 0 : analyze_tenant(prev_tenant_id, &mut summary, tenant_objects, timelines);
324 0 : tenant_id = Some(ttid.tenant_shard_id.tenant_id);
325 0 : }
326 : }
327 : }
328 :
329 : if let BlobDataParseResult::Parsed {
330 0 : index_part: _index_part,
331 0 : index_part_generation: _index_part_generation,
332 0 : s3_layers,
333 0 : } = &data.blob_data
334 0 : {
335 0 : tenant_objects.push(ttid, s3_layers.clone());
336 0 : }
337 0 : tenant_timeline_results.push((ttid, data));
338 : }
339 :
340 0 : if !tenant_timeline_results.is_empty() {
341 0 : analyze_tenant(
342 0 : tenant_id.expect("Must be set if results are present"),
343 0 : &mut summary,
344 0 : tenant_objects,
345 0 : tenant_timeline_results,
346 0 : );
347 0 : }
348 :
349 0 : Ok(summary)
350 0 : }
|