Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 :
3 : use futures_util::{StreamExt, TryStreamExt};
4 : use pageserver::tenant::remote_timeline_client::remote_layer_path;
5 : use pageserver_api::controller_api::MetadataHealthUpdateRequest;
6 : use pageserver_api::shard::TenantShardId;
7 : use remote_storage::GenericRemoteStorage;
8 : use serde::Serialize;
9 : use tracing::{Instrument, info_span};
10 : use utils::id::TenantId;
11 : use utils::shard::ShardCount;
12 :
13 : use crate::checks::{
14 : BlobDataParseResult, RemoteTimelineBlobData, TenantObjectListing, TimelineAnalysis,
15 : branch_cleanup_and_check_errors, list_timeline_blobs,
16 : };
17 : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
18 : use crate::{BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, init_remote};
19 :
20 : #[derive(Serialize, Default)]
21 : pub struct MetadataSummary {
22 : tenant_count: usize,
23 : timeline_count: usize,
24 : timeline_shard_count: usize,
25 : /// Tenant-shard timeline (key) mapping to errors. The key has to be a string because it will be serialized to a JSON.
26 : /// The key is generated using `TenantShardTimelineId::to_string()`.
27 : with_errors: HashMap<String, Vec<String>>,
28 : /// Tenant-shard timeline (key) mapping to warnings. The key has to be a string because it will be serialized to a JSON.
29 : /// The key is generated using `TenantShardTimelineId::to_string()`.
30 : with_warnings: HashMap<String, Vec<String>>,
31 : with_orphans: HashSet<TenantShardTimelineId>,
32 : indices_by_version: HashMap<usize, usize>,
33 :
34 : #[serde(skip)]
35 : pub(crate) healthy_tenant_shards: HashSet<TenantShardId>,
36 : #[serde(skip)]
37 : pub(crate) unhealthy_tenant_shards: HashSet<TenantShardId>,
38 : }
39 :
40 : impl MetadataSummary {
41 0 : fn new() -> Self {
42 0 : Self::default()
43 0 : }
44 :
45 0 : fn update_data(&mut self, data: &RemoteTimelineBlobData) {
46 0 : self.timeline_shard_count += 1;
47 : if let BlobDataParseResult::Parsed {
48 0 : index_part,
49 : index_part_generation: _,
50 : s3_layers: _,
51 : index_part_last_modified_time: _,
52 : index_part_snapshot_time: _,
53 0 : } = &data.blob_data
54 0 : {
55 0 : *self
56 0 : .indices_by_version
57 0 : .entry(index_part.version())
58 0 : .or_insert(0) += 1;
59 0 : }
60 0 : }
61 :
62 0 : fn update_analysis(
63 0 : &mut self,
64 0 : id: &TenantShardTimelineId,
65 0 : analysis: &TimelineAnalysis,
66 0 : verbose: bool,
67 0 : ) {
68 0 : if analysis.is_healthy() {
69 0 : self.healthy_tenant_shards.insert(id.tenant_shard_id);
70 0 : } else {
71 0 : self.healthy_tenant_shards.remove(&id.tenant_shard_id);
72 0 : self.unhealthy_tenant_shards.insert(id.tenant_shard_id);
73 0 : }
74 :
75 0 : if !analysis.errors.is_empty() {
76 0 : let entry = self.with_errors.entry(id.to_string()).or_default();
77 0 : if verbose {
78 0 : entry.extend(analysis.errors.iter().cloned());
79 0 : }
80 0 : }
81 :
82 0 : if !analysis.warnings.is_empty() {
83 0 : let entry = self.with_warnings.entry(id.to_string()).or_default();
84 0 : if verbose {
85 0 : entry.extend(analysis.warnings.iter().cloned());
86 0 : }
87 0 : }
88 0 : }
89 :
90 0 : fn notify_timeline_orphan(&mut self, ttid: &TenantShardTimelineId) {
91 0 : self.with_orphans.insert(*ttid);
92 0 : }
93 :
94 : /// Long-form output for printing at end of a scan
95 0 : pub fn summary_string(&self) -> String {
96 0 : let version_summary: String = itertools::join(
97 0 : self.indices_by_version
98 0 : .iter()
99 0 : .map(|(k, v)| format!("{k}: {v}")),
100 0 : ", ",
101 : );
102 :
103 0 : format!(
104 0 : "Tenants: {}
105 0 : Timelines: {}
106 0 : Timeline-shards: {}
107 0 : With errors: {}
108 0 : With warnings: {}
109 0 : With orphan layers: {}
110 0 : Index versions: {version_summary}
111 0 : ",
112 : self.tenant_count,
113 : self.timeline_count,
114 : self.timeline_shard_count,
115 0 : self.with_errors.len(),
116 0 : self.with_warnings.len(),
117 0 : self.with_orphans.len(),
118 : )
119 0 : }
120 :
121 0 : pub fn is_fatal(&self) -> bool {
122 0 : !self.with_errors.is_empty()
123 0 : }
124 :
125 0 : pub fn is_empty(&self) -> bool {
126 0 : self.timeline_shard_count == 0
127 0 : }
128 :
129 0 : pub fn build_health_update_request(&self) -> MetadataHealthUpdateRequest {
130 0 : MetadataHealthUpdateRequest {
131 0 : healthy_tenant_shards: self.healthy_tenant_shards.clone(),
132 0 : unhealthy_tenant_shards: self.unhealthy_tenant_shards.clone(),
133 0 : }
134 0 : }
135 : }
136 :
137 : /// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
138 0 : pub async fn scan_pageserver_metadata(
139 0 : bucket_config: BucketConfig,
140 0 : tenant_ids: Vec<TenantShardId>,
141 0 : verbose: bool,
142 0 : ) -> anyhow::Result<MetadataSummary> {
143 0 : let (remote_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
144 :
145 0 : let tenants = if tenant_ids.is_empty() {
146 0 : futures::future::Either::Left(stream_tenants(&remote_client, &target))
147 : } else {
148 0 : futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
149 : };
150 :
151 : // How many tenants to process in parallel. We need to be mindful of pageservers
152 : // accessing the same per tenant prefixes, so use a lower setting than pageservers.
153 : const CONCURRENCY: usize = 32;
154 :
155 : // Generate a stream of TenantTimelineId
156 0 : let timelines = tenants.map_ok(|t| {
157 0 : tracing::info!("Found tenant: {}", t);
158 0 : stream_tenant_timelines(&remote_client, &target, t)
159 0 : });
160 0 : let timelines = timelines.try_buffered(CONCURRENCY);
161 0 : let timelines = timelines.try_flatten();
162 :
163 : // Generate a stream of S3TimelineBlobData
164 0 : async fn report_on_timeline(
165 0 : remote_client: &GenericRemoteStorage,
166 0 : target: &RootTarget,
167 0 : ttid: TenantShardTimelineId,
168 0 : ) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
169 0 : let data = list_timeline_blobs(remote_client, ttid, target).await?;
170 0 : Ok((ttid, data))
171 0 : }
172 0 : let timelines = timelines.map_ok(|ttid| report_on_timeline(&remote_client, &target, ttid));
173 0 : let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
174 :
175 : // We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
176 : // shards in the same tenant might refer to one anothers' keys if a shard split has happened.
177 :
178 0 : let mut tenant_id = None;
179 0 : let mut tenant_objects = TenantObjectListing::default();
180 0 : let mut tenant_timeline_results = Vec::new();
181 :
182 0 : async fn analyze_tenant(
183 0 : remote_client: &GenericRemoteStorage,
184 0 : tenant_id: TenantId,
185 0 : summary: &mut MetadataSummary,
186 0 : mut tenant_objects: TenantObjectListing,
187 0 : timelines: Vec<(TenantShardTimelineId, RemoteTimelineBlobData)>,
188 0 : highest_shard_count: ShardCount,
189 0 : verbose: bool,
190 0 : ) {
191 0 : summary.tenant_count += 1;
192 :
193 0 : let mut timeline_ids = HashSet::new();
194 0 : let mut timeline_generations = HashMap::new();
195 0 : for (ttid, data) in timelines {
196 0 : async {
197 0 : if ttid.tenant_shard_id.shard_count == highest_shard_count {
198 : // Only analyze `TenantShardId`s with highest shard count.
199 :
200 : // Stash the generation of each timeline, for later use identifying orphan layers
201 : if let BlobDataParseResult::Parsed {
202 0 : index_part,
203 0 : index_part_generation,
204 : s3_layers: _,
205 : index_part_last_modified_time: _,
206 : index_part_snapshot_time: _,
207 0 : } = &data.blob_data
208 : {
209 0 : if index_part.deleted_at.is_some() {
210 : // skip deleted timeline.
211 0 : tracing::info!(
212 0 : "Skip analysis of {} b/c timeline is already deleted",
213 : ttid
214 : );
215 0 : return;
216 0 : }
217 0 : timeline_generations.insert(ttid, *index_part_generation);
218 0 : }
219 :
220 : // Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
221 : // reference counts for layers across the tenant.
222 0 : let analysis = branch_cleanup_and_check_errors(
223 0 : remote_client,
224 0 : &ttid,
225 0 : &mut tenant_objects,
226 0 : None,
227 0 : None,
228 0 : Some(data),
229 0 : )
230 0 : .await;
231 0 : summary.update_analysis(&ttid, &analysis, verbose);
232 :
233 0 : timeline_ids.insert(ttid.timeline_id);
234 : } else {
235 0 : tracing::info!(
236 0 : "Skip analysis of {} b/c a lower shard count than {}",
237 : ttid,
238 : highest_shard_count.0,
239 : );
240 : }
241 0 : }
242 0 : .instrument(
243 0 : info_span!("analyze-timeline", shard = %ttid.tenant_shard_id.shard_slug(), timeline = %ttid.timeline_id),
244 : )
245 0 : .await
246 : }
247 :
248 0 : summary.timeline_count += timeline_ids.len();
249 :
250 : // Identifying orphan layers must be done on a tenant-wide basis, because individual
251 : // shards' layers may be referenced by other shards.
252 : //
253 : // Orphan layers are not a corruption, and not an indication of a problem. They are just
254 : // consuming some space in remote storage, and may be cleaned up at leisure.
255 0 : for (shard_index, timeline_id, layer_file, generation) in tenant_objects.get_orphans() {
256 0 : let ttid = TenantShardTimelineId {
257 0 : tenant_shard_id: TenantShardId {
258 0 : tenant_id,
259 0 : shard_count: shard_index.shard_count,
260 0 : shard_number: shard_index.shard_number,
261 0 : },
262 0 : timeline_id,
263 0 : };
264 :
265 0 : if let Some(timeline_generation) = timeline_generations.get(&ttid) {
266 0 : if &generation >= timeline_generation {
267 : // Candidate orphan layer is in the current or future generation relative
268 : // to the index we read for this timeline shard, so its absence from the index
269 : // doesn't make it an orphan: more likely, it is a case where the layer was
270 : // uploaded, but the index referencing the layer wasn't written yet.
271 0 : continue;
272 0 : }
273 0 : }
274 :
275 0 : let orphan_path = remote_layer_path(
276 0 : &tenant_id,
277 0 : &timeline_id,
278 0 : shard_index,
279 0 : &layer_file,
280 0 : generation,
281 : );
282 :
283 0 : tracing::info!("Orphan layer detected: {orphan_path}");
284 :
285 0 : summary.notify_timeline_orphan(&ttid);
286 : }
287 0 : }
288 :
289 : // Iterate through all the timeline results. These are in key-order, so
290 : // all results for the same tenant will be adjacent. We accumulate these,
291 : // and then call `analyze_tenant` to flush, when we see the next tenant ID.
292 0 : let mut summary = MetadataSummary::new();
293 0 : let mut highest_shard_count = ShardCount::MIN;
294 0 : while let Some(i) = timelines.next().await {
295 0 : let (ttid, data) = i?;
296 0 : summary.update_data(&data);
297 :
298 0 : match tenant_id {
299 0 : Some(prev_tenant_id) => {
300 0 : if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
301 : // New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
302 0 : let tenant_objects = std::mem::take(&mut tenant_objects);
303 0 : let timelines = std::mem::take(&mut tenant_timeline_results);
304 0 : analyze_tenant(
305 0 : &remote_client,
306 0 : prev_tenant_id,
307 0 : &mut summary,
308 0 : tenant_objects,
309 0 : timelines,
310 0 : highest_shard_count,
311 0 : verbose,
312 : )
313 0 : .instrument(info_span!("analyze-tenant", tenant = %prev_tenant_id))
314 0 : .await;
315 0 : tenant_id = Some(ttid.tenant_shard_id.tenant_id);
316 0 : highest_shard_count = ttid.tenant_shard_id.shard_count;
317 0 : } else {
318 0 : highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
319 0 : }
320 : }
321 0 : None => {
322 0 : tenant_id = Some(ttid.tenant_shard_id.tenant_id);
323 0 : highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
324 0 : }
325 : }
326 :
327 0 : match &data.blob_data {
328 : BlobDataParseResult::Parsed {
329 : index_part: _,
330 0 : index_part_generation: _index_part_generation,
331 0 : s3_layers,
332 : index_part_last_modified_time: _,
333 : index_part_snapshot_time: _,
334 0 : } => {
335 0 : tenant_objects.push(ttid, s3_layers.clone());
336 0 : }
337 0 : BlobDataParseResult::Relic => (),
338 : BlobDataParseResult::Incorrect {
339 : errors: _,
340 0 : s3_layers,
341 0 : } => {
342 0 : tenant_objects.push(ttid, s3_layers.clone());
343 0 : }
344 : }
345 0 : tenant_timeline_results.push((ttid, data));
346 : }
347 :
348 0 : if !tenant_timeline_results.is_empty() {
349 0 : let tenant_id = tenant_id.expect("Must be set if results are present");
350 0 : analyze_tenant(
351 0 : &remote_client,
352 0 : tenant_id,
353 0 : &mut summary,
354 0 : tenant_objects,
355 0 : tenant_timeline_results,
356 0 : highest_shard_count,
357 0 : verbose,
358 : )
359 0 : .instrument(info_span!("analyze-tenant", tenant = %tenant_id))
360 0 : .await;
361 0 : }
362 :
363 0 : Ok(summary)
364 0 : }
|