Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 :
3 : use anyhow::Context;
4 : use aws_sdk_s3::Client;
5 : use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
6 : use pageserver_api::shard::ShardIndex;
7 : use tracing::{error, info, warn};
8 : use utils::generation::Generation;
9 : use utils::id::TimelineId;
10 :
11 : use crate::cloud_admin_api::BranchData;
12 : use crate::metadata_stream::stream_listing;
13 : use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId};
14 : use futures_util::StreamExt;
15 : use pageserver::tenant::remote_timeline_client::parse_remote_index_path;
16 : use pageserver::tenant::storage_layer::LayerName;
17 : use pageserver::tenant::IndexPart;
18 : use remote_storage::RemotePath;
19 :
20 : pub(crate) struct TimelineAnalysis {
21 : /// Anomalies detected
22 : pub(crate) errors: Vec<String>,
23 :
24 : /// Healthy-but-noteworthy, like old-versioned structures that are readable but
25 : /// worth reporting for awareness that we must not remove that old version decoding
26 : /// yet.
27 : pub(crate) warnings: Vec<String>,
28 :
29 : /// Keys not referenced in metadata: candidates for removal, but NOT NECESSARILY: beware
30 : /// of races between reading the metadata and reading the objects.
31 : pub(crate) garbage_keys: Vec<String>,
32 : }
33 :
34 : impl TimelineAnalysis {
35 0 : fn new() -> Self {
36 0 : Self {
37 0 : errors: Vec::new(),
38 0 : warnings: Vec::new(),
39 0 : garbage_keys: Vec::new(),
40 0 : }
41 0 : }
42 : }
43 :
44 0 : pub(crate) fn branch_cleanup_and_check_errors(
45 0 : id: &TenantShardTimelineId,
46 0 : tenant_objects: &mut TenantObjectListing,
47 0 : s3_active_branch: Option<&BranchData>,
48 0 : console_branch: Option<BranchData>,
49 0 : s3_data: Option<S3TimelineBlobData>,
50 0 : ) -> TimelineAnalysis {
51 0 : let mut result = TimelineAnalysis::new();
52 0 :
53 0 : info!("Checking timeline {id}");
54 :
55 0 : if let Some(s3_active_branch) = s3_active_branch {
56 0 : info!(
57 0 : "Checking console status for timeline for branch {:?}/{:?}",
58 : s3_active_branch.project_id, s3_active_branch.id
59 : );
60 0 : match console_branch {
61 0 : Some(_) => {result.errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
62 0 : s3_active_branch.id, s3_active_branch.project_id))
63 : },
64 : None => {
65 0 : result.errors.push(format!("Timeline has no branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
66 0 : s3_active_branch.id, s3_active_branch.project_id))
67 : }
68 : };
69 0 : }
70 :
71 0 : match s3_data {
72 0 : Some(s3_data) => {
73 0 : result.garbage_keys.extend(s3_data.unknown_keys);
74 0 :
75 0 : match s3_data.blob_data {
76 : BlobDataParseResult::Parsed {
77 0 : index_part,
78 0 : index_part_generation: _index_part_generation,
79 0 : s3_layers: _s3_layers,
80 0 : } => {
81 0 : if !IndexPart::KNOWN_VERSIONS.contains(&index_part.version()) {
82 0 : result
83 0 : .errors
84 0 : .push(format!("index_part.json version: {}", index_part.version()))
85 0 : }
86 :
87 0 : if &index_part.version() != IndexPart::KNOWN_VERSIONS.last().unwrap() {
88 0 : result.warnings.push(format!(
89 0 : "index_part.json version is not latest: {}",
90 0 : index_part.version()
91 0 : ))
92 0 : }
93 :
94 0 : if index_part.metadata.disk_consistent_lsn()
95 0 : != index_part.duplicated_disk_consistent_lsn()
96 : {
97 0 : result.errors.push(format!(
98 0 : "Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
99 0 : index_part.metadata.disk_consistent_lsn(),
100 0 : index_part.duplicated_disk_consistent_lsn(),
101 0 : ))
102 0 : }
103 :
104 0 : if index_part.layer_metadata.is_empty() {
105 : // not an error, can happen for branches with zero writes, but notice that
106 0 : info!("index_part.json has no layers");
107 0 : }
108 :
109 0 : for (layer, metadata) in index_part.layer_metadata {
110 0 : if metadata.file_size == 0 {
111 0 : result.errors.push(format!(
112 0 : "index_part.json contains a layer {} that has 0 size in its layer metadata", layer,
113 0 : ))
114 0 : }
115 :
116 0 : if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
117 : // FIXME: this will emit false positives if an index was
118 : // uploaded concurrently with our scan. To make this check
119 : // correct, we need to try sending a HEAD request for the
120 : // layer we think is missing.
121 0 : result.errors.push(format!(
122 0 : "index_part.json contains a layer {}{} (shard {}) that is not present in remote storage",
123 0 : layer,
124 0 : metadata.generation.get_suffix(),
125 0 : metadata.shard
126 0 : ))
127 0 : }
128 : }
129 : }
130 0 : BlobDataParseResult::Relic => {}
131 0 : BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
132 0 : parse_errors
133 0 : .into_iter()
134 0 : .map(|error| format!("parse error: {error}")),
135 0 : ),
136 : }
137 : }
138 0 : None => result
139 0 : .errors
140 0 : .push("Timeline has no data on S3 at all".to_string()),
141 : }
142 :
143 0 : if result.errors.is_empty() {
144 0 : info!("No check errors found");
145 : } else {
146 0 : warn!("Timeline metadata errors: {0:?}", result.errors);
147 : }
148 :
149 0 : if !result.warnings.is_empty() {
150 0 : warn!("Timeline metadata warnings: {0:?}", result.warnings);
151 0 : }
152 :
153 0 : if !result.garbage_keys.is_empty() {
154 0 : error!(
155 0 : "The following keys should be removed from S3: {0:?}",
156 : result.garbage_keys
157 : )
158 0 : }
159 :
160 0 : result
161 0 : }
162 :
163 : #[derive(Default)]
164 : pub(crate) struct LayerRef {
165 : ref_count: usize,
166 : }
167 :
168 : /// Top-level index of objects in a tenant. This may be used by any shard-timeline within
169 : /// the tenant to query whether an object exists.
170 : #[derive(Default)]
171 : pub(crate) struct TenantObjectListing {
172 : shard_timelines: HashMap<(ShardIndex, TimelineId), HashMap<(LayerName, Generation), LayerRef>>,
173 : }
174 :
175 : impl TenantObjectListing {
176 : /// Having done an S3 listing of the keys within a timeline prefix, merge them into the overall
177 : /// list of layer keys for the Tenant.
178 0 : pub(crate) fn push(
179 0 : &mut self,
180 0 : ttid: TenantShardTimelineId,
181 0 : layers: HashSet<(LayerName, Generation)>,
182 0 : ) {
183 0 : let shard_index = ShardIndex::new(
184 0 : ttid.tenant_shard_id.shard_number,
185 0 : ttid.tenant_shard_id.shard_count,
186 0 : );
187 0 : let replaced = self.shard_timelines.insert(
188 0 : (shard_index, ttid.timeline_id),
189 0 : layers
190 0 : .into_iter()
191 0 : .map(|l| (l, LayerRef::default()))
192 0 : .collect(),
193 0 : );
194 0 :
195 0 : assert!(
196 0 : replaced.is_none(),
197 0 : "Built from an S3 object listing, which should never repeat a key"
198 : );
199 0 : }
200 :
201 : /// Having loaded a timeline index, check if a layer referenced by the index exists. If it does,
202 : /// the layer's refcount will be incremented. Later, after calling this for all references in all indices
203 : /// in a tenant, orphan layers may be detected by their zero refcounts.
204 : ///
205 : /// Returns true if the layer exists
206 0 : pub(crate) fn check_ref(
207 0 : &mut self,
208 0 : timeline_id: TimelineId,
209 0 : layer_file: &LayerName,
210 0 : metadata: &LayerFileMetadata,
211 0 : ) -> bool {
212 0 : let Some(shard_tl) = self.shard_timelines.get_mut(&(metadata.shard, timeline_id)) else {
213 0 : return false;
214 : };
215 :
216 0 : let Some(layer_ref) = shard_tl.get_mut(&(layer_file.clone(), metadata.generation)) else {
217 0 : return false;
218 : };
219 :
220 0 : layer_ref.ref_count += 1;
221 0 :
222 0 : true
223 0 : }
224 :
225 0 : pub(crate) fn get_orphans(&self) -> Vec<(ShardIndex, TimelineId, LayerName, Generation)> {
226 0 : let mut result = Vec::new();
227 0 : for ((shard_index, timeline_id), layers) in &self.shard_timelines {
228 0 : for ((layer_file, generation), layer_ref) in layers {
229 0 : if layer_ref.ref_count == 0 {
230 0 : result.push((*shard_index, *timeline_id, layer_file.clone(), *generation))
231 0 : }
232 : }
233 : }
234 :
235 0 : result
236 0 : }
237 : }
238 :
239 : #[derive(Debug)]
240 : pub(crate) struct S3TimelineBlobData {
241 : pub(crate) blob_data: BlobDataParseResult,
242 :
243 : // Index objects that were not used when loading `blob_data`, e.g. those from old generations
244 : pub(crate) unused_index_keys: Vec<String>,
245 :
246 : // Objects whose keys were not recognized at all, i.e. not layer files, not indices
247 : pub(crate) unknown_keys: Vec<String>,
248 : }
249 :
250 : #[derive(Debug)]
251 : pub(crate) enum BlobDataParseResult {
252 : Parsed {
253 : index_part: Box<IndexPart>,
254 : index_part_generation: Generation,
255 : s3_layers: HashSet<(LayerName, Generation)>,
256 : },
257 : /// The remains of a deleted Timeline (i.e. an initdb archive only)
258 : Relic,
259 : Incorrect(Vec<String>),
260 : }
261 :
262 0 : fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> {
263 0 : match name.rsplit_once('-') {
264 : // FIXME: this is gross, just use a regex?
265 0 : Some((layer_filename, gen)) if gen.len() == 8 => {
266 0 : let layer = layer_filename.parse::<LayerName>()?;
267 0 : let gen =
268 0 : Generation::parse_suffix(gen).ok_or("Malformed generation suffix".to_string())?;
269 0 : Ok((layer, gen))
270 : }
271 0 : _ => Ok((name.parse::<LayerName>()?, Generation::none())),
272 : }
273 0 : }
274 :
275 0 : pub(crate) async fn list_timeline_blobs(
276 0 : s3_client: &Client,
277 0 : id: TenantShardTimelineId,
278 0 : s3_root: &RootTarget,
279 0 : ) -> anyhow::Result<S3TimelineBlobData> {
280 0 : let mut s3_layers = HashSet::new();
281 0 :
282 0 : let mut errors = Vec::new();
283 0 : let mut unknown_keys = Vec::new();
284 0 :
285 0 : let mut timeline_dir_target = s3_root.timeline_root(&id);
286 0 : timeline_dir_target.delimiter = String::new();
287 0 :
288 0 : let mut index_part_keys: Vec<String> = Vec::new();
289 0 : let mut initdb_archive: bool = false;
290 0 :
291 0 : let mut stream = std::pin::pin!(stream_listing(s3_client, &timeline_dir_target));
292 0 : while let Some(obj) = stream.next().await {
293 0 : let obj = obj?;
294 0 : let key = obj.key();
295 0 :
296 0 : let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket);
297 0 : match blob_name {
298 0 : Some(name) if name.starts_with("index_part.json") => {
299 0 : tracing::debug!("Index key {key}");
300 0 : index_part_keys.push(key.to_owned())
301 : }
302 0 : Some("initdb.tar.zst") => {
303 0 : tracing::debug!("initdb archive {key}");
304 0 : initdb_archive = true;
305 : }
306 0 : Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
307 0 : Ok((new_layer, gen)) => {
308 0 : tracing::debug!("Parsed layer key: {} {:?}", new_layer, gen);
309 0 : s3_layers.insert((new_layer, gen));
310 : }
311 0 : Err(e) => {
312 0 : tracing::info!("Error parsing key {maybe_layer_name}");
313 0 : errors.push(
314 0 : format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
315 0 : );
316 0 : unknown_keys.push(key.to_string());
317 : }
318 : },
319 : None => {
320 0 : tracing::warn!("Unknown key {}", key);
321 0 : errors.push(format!("S3 list response got an object with odd key {key}"));
322 0 : unknown_keys.push(key.to_string());
323 : }
324 : }
325 : }
326 :
327 0 : if index_part_keys.is_empty() && s3_layers.is_empty() && initdb_archive {
328 0 : tracing::debug!(
329 0 : "Timeline is empty apart from initdb archive: expected post-deletion state."
330 : );
331 0 : return Ok(S3TimelineBlobData {
332 0 : blob_data: BlobDataParseResult::Relic,
333 0 : unused_index_keys: index_part_keys,
334 0 : unknown_keys: Vec::new(),
335 0 : });
336 0 : }
337 :
338 : // Choose the index_part with the highest generation
339 0 : let (index_part_object, index_part_generation) = match index_part_keys
340 0 : .iter()
341 0 : .filter_map(|key| {
342 0 : // Stripping the index key to the last part, because RemotePath doesn't
343 0 : // like absolute paths, and depending on prefix_in_bucket it's possible
344 0 : // for the keys we read back to start with a slash.
345 0 : let basename = key.rsplit_once('/').unwrap().1;
346 0 : parse_remote_index_path(RemotePath::from_string(basename).unwrap()).map(|g| (key, g))
347 0 : })
348 0 : .max_by_key(|i| i.1)
349 0 : .map(|(k, g)| (k.clone(), g))
350 : {
351 0 : Some((key, gen)) => (Some(key), gen),
352 : None => {
353 : // Legacy/missing case: one or zero index parts, which did not have a generation
354 0 : (index_part_keys.pop(), Generation::none())
355 : }
356 : };
357 :
358 0 : match index_part_object.as_ref() {
359 0 : Some(selected) => index_part_keys.retain(|k| k != selected),
360 0 : None => {
361 0 : errors.push("S3 list response got no index_part.json file".to_string());
362 0 : }
363 : }
364 :
365 0 : if let Some(index_part_object_key) = index_part_object.as_ref() {
366 0 : let index_part_bytes = download_object_with_retries(
367 0 : s3_client,
368 0 : &timeline_dir_target.bucket_name,
369 0 : index_part_object_key,
370 0 : )
371 0 : .await
372 0 : .context("index_part.json download")?;
373 :
374 0 : match serde_json::from_slice(&index_part_bytes) {
375 0 : Ok(index_part) => {
376 0 : return Ok(S3TimelineBlobData {
377 0 : blob_data: BlobDataParseResult::Parsed {
378 0 : index_part: Box::new(index_part),
379 0 : index_part_generation,
380 0 : s3_layers,
381 0 : },
382 0 : unused_index_keys: index_part_keys,
383 0 : unknown_keys,
384 0 : })
385 : }
386 0 : Err(index_parse_error) => errors.push(format!(
387 0 : "index_part.json body parsing error: {index_parse_error}"
388 0 : )),
389 : }
390 0 : }
391 :
392 0 : if errors.is_empty() {
393 0 : errors.push(
394 0 : "Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
395 0 : );
396 0 : }
397 :
398 0 : Ok(S3TimelineBlobData {
399 0 : blob_data: BlobDataParseResult::Incorrect(errors),
400 0 : unused_index_keys: index_part_keys,
401 0 : unknown_keys,
402 0 : })
403 0 : }
|