Line data Source code
1 : use std::collections::{BTreeSet, HashMap, HashSet};
2 :
3 : use anyhow::Context;
4 : use itertools::Itertools;
5 : use pageserver::tenant::layer_map::LayerMap;
6 : use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
7 : use pageserver_api::shard::ShardIndex;
8 : use tokio_util::sync::CancellationToken;
9 : use tracing::{error, info, warn};
10 : use utils::generation::Generation;
11 : use utils::id::TimelineId;
12 :
13 : use crate::cloud_admin_api::BranchData;
14 : use crate::metadata_stream::stream_listing;
15 : use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId};
16 : use futures_util::StreamExt;
17 : use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
18 : use pageserver::tenant::storage_layer::LayerName;
19 : use pageserver::tenant::IndexPart;
20 : use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath};
21 :
22 : pub(crate) struct TimelineAnalysis {
23 : /// Anomalies detected
24 : pub(crate) errors: Vec<String>,
25 :
26 : /// Healthy-but-noteworthy, like old-versioned structures that are readable but
27 : /// worth reporting for awareness that we must not remove that old version decoding
28 : /// yet.
29 : pub(crate) warnings: Vec<String>,
30 :
31 : /// Keys not referenced in metadata: candidates for removal, but NOT NECESSARILY: beware
32 : /// of races between reading the metadata and reading the objects.
33 : pub(crate) garbage_keys: Vec<String>,
34 : }
35 :
36 : impl TimelineAnalysis {
37 0 : fn new() -> Self {
38 0 : Self {
39 0 : errors: Vec::new(),
40 0 : warnings: Vec::new(),
41 0 : garbage_keys: Vec::new(),
42 0 : }
43 0 : }
44 :
45 : /// Whether a timeline is healthy.
46 0 : pub(crate) fn is_healthy(&self) -> bool {
47 0 : self.errors.is_empty() && self.warnings.is_empty()
48 0 : }
49 : }
50 :
51 : /// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong).
52 : /// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
53 : ///
54 : /// ```plain
55 : /// | | | |
56 : /// | 1 | | 2 | | 3 |
57 : /// | | | | | |
58 : /// ```
59 : ///
60 : /// This is not a valid layer map because the LSN range of layer 1 intersects with the LSN range of layer 2. 1 and 2 should have
61 : /// the same LSN range.
62 : ///
63 : /// The exception is that when layer 2 only contains a single key, it could be split over the LSN range. For example,
64 : ///
65 : /// ```plain
66 : /// | | | 2 | | |
67 : /// | 1 | |-------| | 3 |
68 : /// | | | 4 | | |
69 : ///
70 : /// If layer 2 and 4 contain the same single key, this is also a valid layer map.
71 0 : fn check_valid_layermap(metadata: &HashMap<LayerName, LayerFileMetadata>) -> Option<String> {
72 0 : let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
73 0 : let mut all_delta_layers = Vec::new();
74 0 : for (name, _) in metadata.iter() {
75 0 : if let LayerName::Delta(layer) = name {
76 0 : if layer.key_range.start.next() != layer.key_range.end {
77 0 : all_delta_layers.push(layer.clone());
78 0 : }
79 0 : }
80 : }
81 0 : for layer in &all_delta_layers {
82 0 : let lsn_range = &layer.lsn_range;
83 0 : lsn_split_point.insert(lsn_range.start);
84 0 : lsn_split_point.insert(lsn_range.end);
85 0 : }
86 0 : for layer in &all_delta_layers {
87 0 : let lsn_range = layer.lsn_range.clone();
88 0 : let intersects = lsn_split_point.range(lsn_range).collect_vec();
89 0 : if intersects.len() > 1 {
90 0 : let err = format!(
91 0 : "layer violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
92 0 : layer,
93 0 : intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
94 0 : );
95 0 : return Some(err);
96 0 : }
97 : }
98 0 : None
99 0 : }
100 :
101 0 : pub(crate) async fn branch_cleanup_and_check_errors(
102 0 : remote_client: &GenericRemoteStorage,
103 0 : id: &TenantShardTimelineId,
104 0 : tenant_objects: &mut TenantObjectListing,
105 0 : s3_active_branch: Option<&BranchData>,
106 0 : console_branch: Option<BranchData>,
107 0 : s3_data: Option<RemoteTimelineBlobData>,
108 0 : ) -> TimelineAnalysis {
109 0 : let mut result = TimelineAnalysis::new();
110 0 :
111 0 : info!("Checking timeline {id}");
112 :
113 0 : if let Some(s3_active_branch) = s3_active_branch {
114 0 : info!(
115 0 : "Checking console status for timeline for branch {:?}/{:?}",
116 : s3_active_branch.project_id, s3_active_branch.id
117 : );
118 0 : match console_branch {
119 0 : Some(_) => {result.errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
120 0 : s3_active_branch.id, s3_active_branch.project_id))
121 : },
122 : None => {
123 0 : result.errors.push(format!("Timeline has no branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
124 0 : s3_active_branch.id, s3_active_branch.project_id))
125 : }
126 : };
127 0 : }
128 :
129 0 : match s3_data {
130 0 : Some(s3_data) => {
131 0 : result
132 0 : .garbage_keys
133 0 : .extend(s3_data.unknown_keys.into_iter().map(|k| k.key.to_string()));
134 0 :
135 0 : match s3_data.blob_data {
136 : BlobDataParseResult::Parsed {
137 0 : index_part,
138 0 : index_part_generation: _index_part_generation,
139 0 : s3_layers: _s3_layers,
140 0 : } => {
141 0 : if !IndexPart::KNOWN_VERSIONS.contains(&index_part.version()) {
142 0 : result
143 0 : .errors
144 0 : .push(format!("index_part.json version: {}", index_part.version()))
145 0 : }
146 :
147 0 : let mut newest_versions = IndexPart::KNOWN_VERSIONS.iter().rev().take(3);
148 0 : if !newest_versions.any(|ip| ip == &index_part.version()) {
149 0 : info!(
150 0 : "index_part.json version is not latest: {}",
151 0 : index_part.version()
152 : );
153 0 : }
154 :
155 0 : if index_part.metadata.disk_consistent_lsn()
156 0 : != index_part.duplicated_disk_consistent_lsn()
157 : {
158 : // Tech debt: let's get rid of one of these, they are redundant
159 : // https://github.com/neondatabase/neon/issues/8343
160 0 : result.errors.push(format!(
161 0 : "Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
162 0 : index_part.metadata.disk_consistent_lsn(),
163 0 : index_part.duplicated_disk_consistent_lsn(),
164 0 : ))
165 0 : }
166 :
167 0 : if index_part.layer_metadata.is_empty() {
168 0 : if index_part.metadata.ancestor_timeline().is_none() {
169 0 : // The initial timeline with no ancestor should ALWAYS have layers.
170 0 : result.errors.push(
171 0 : "index_part.json has no layers (ancestor_timeline=None)"
172 0 : .to_string(),
173 0 : );
174 0 : } else {
175 : // Not an error, can happen for branches with zero writes, but notice that
176 0 : info!("index_part.json has no layers (ancestor_timeline exists)");
177 : }
178 0 : }
179 :
180 0 : if let Some(err) = check_valid_layermap(&index_part.layer_metadata) {
181 0 : result.errors.push(format!(
182 0 : "index_part.json contains invalid layer map structure: {err}"
183 0 : ));
184 0 : }
185 :
186 0 : for (layer, metadata) in index_part.layer_metadata {
187 0 : if metadata.file_size == 0 {
188 0 : result.errors.push(format!(
189 0 : "index_part.json contains a layer {} that has 0 size in its layer metadata", layer,
190 0 : ))
191 0 : }
192 :
193 0 : if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
194 0 : let path = remote_layer_path(
195 0 : &id.tenant_shard_id.tenant_id,
196 0 : &id.timeline_id,
197 0 : metadata.shard,
198 0 : &layer,
199 0 : metadata.generation,
200 0 : );
201 :
202 : // HEAD request used here to address a race condition when an index was uploaded concurrently
203 : // with our scan. We check if the object is uploaded to S3 after taking the listing snapshot.
204 0 : let response = remote_client
205 0 : .head_object(&path, &CancellationToken::new())
206 0 : .await;
207 :
208 0 : if response.is_err() {
209 : // Object is not present.
210 0 : let is_l0 = LayerMap::is_l0(layer.key_range(), layer.is_delta());
211 0 :
212 0 : let msg = format!(
213 0 : "index_part.json contains a layer {}{} (shard {}) that is not present in remote storage (layer_is_l0: {})",
214 0 : layer,
215 0 : metadata.generation.get_suffix(),
216 0 : metadata.shard,
217 0 : is_l0,
218 0 : );
219 0 :
220 0 : if is_l0 {
221 0 : result.warnings.push(msg);
222 0 : } else {
223 0 : result.errors.push(msg);
224 0 : }
225 0 : }
226 0 : }
227 : }
228 : }
229 0 : BlobDataParseResult::Relic => {}
230 : BlobDataParseResult::Incorrect {
231 0 : errors,
232 0 : s3_layers: _,
233 0 : } => result.errors.extend(
234 0 : errors
235 0 : .into_iter()
236 0 : .map(|error| format!("parse error: {error}")),
237 0 : ),
238 : }
239 : }
240 0 : None => result
241 0 : .errors
242 0 : .push("Timeline has no data on S3 at all".to_string()),
243 : }
244 :
245 0 : if result.errors.is_empty() {
246 0 : info!("No check errors found");
247 : } else {
248 0 : warn!("Timeline metadata errors: {0:?}", result.errors);
249 : }
250 :
251 0 : if !result.warnings.is_empty() {
252 0 : warn!("Timeline metadata warnings: {0:?}", result.warnings);
253 0 : }
254 :
255 0 : if !result.garbage_keys.is_empty() {
256 0 : error!(
257 0 : "The following keys should be removed from S3: {0:?}",
258 : result.garbage_keys
259 : )
260 0 : }
261 :
262 0 : result
263 0 : }
264 :
265 : #[derive(Default)]
266 : pub(crate) struct LayerRef {
267 : ref_count: usize,
268 : }
269 :
270 : /// Top-level index of objects in a tenant. This may be used by any shard-timeline within
271 : /// the tenant to query whether an object exists.
272 : #[derive(Default)]
273 : pub(crate) struct TenantObjectListing {
274 : shard_timelines: HashMap<(ShardIndex, TimelineId), HashMap<(LayerName, Generation), LayerRef>>,
275 : }
276 :
277 : impl TenantObjectListing {
278 : /// Having done an S3 listing of the keys within a timeline prefix, merge them into the overall
279 : /// list of layer keys for the Tenant.
280 0 : pub(crate) fn push(
281 0 : &mut self,
282 0 : ttid: TenantShardTimelineId,
283 0 : layers: HashSet<(LayerName, Generation)>,
284 0 : ) {
285 0 : let shard_index = ShardIndex::new(
286 0 : ttid.tenant_shard_id.shard_number,
287 0 : ttid.tenant_shard_id.shard_count,
288 0 : );
289 0 : let replaced = self.shard_timelines.insert(
290 0 : (shard_index, ttid.timeline_id),
291 0 : layers
292 0 : .into_iter()
293 0 : .map(|l| (l, LayerRef::default()))
294 0 : .collect(),
295 0 : );
296 0 :
297 0 : assert!(
298 0 : replaced.is_none(),
299 0 : "Built from an S3 object listing, which should never repeat a key"
300 : );
301 0 : }
302 :
303 : /// Having loaded a timeline index, check if a layer referenced by the index exists. If it does,
304 : /// the layer's refcount will be incremented. Later, after calling this for all references in all indices
305 : /// in a tenant, orphan layers may be detected by their zero refcounts.
306 : ///
307 : /// Returns true if the layer exists
308 0 : pub(crate) fn check_ref(
309 0 : &mut self,
310 0 : timeline_id: TimelineId,
311 0 : layer_file: &LayerName,
312 0 : metadata: &LayerFileMetadata,
313 0 : ) -> bool {
314 0 : let Some(shard_tl) = self.shard_timelines.get_mut(&(metadata.shard, timeline_id)) else {
315 0 : return false;
316 : };
317 :
318 0 : let Some(layer_ref) = shard_tl.get_mut(&(layer_file.clone(), metadata.generation)) else {
319 0 : return false;
320 : };
321 :
322 0 : layer_ref.ref_count += 1;
323 0 :
324 0 : true
325 0 : }
326 :
327 0 : pub(crate) fn get_orphans(&self) -> Vec<(ShardIndex, TimelineId, LayerName, Generation)> {
328 0 : let mut result = Vec::new();
329 0 : for ((shard_index, timeline_id), layers) in &self.shard_timelines {
330 0 : for ((layer_file, generation), layer_ref) in layers {
331 0 : if layer_ref.ref_count == 0 {
332 0 : result.push((*shard_index, *timeline_id, layer_file.clone(), *generation))
333 0 : }
334 : }
335 : }
336 :
337 0 : result
338 0 : }
339 : }
340 :
341 : #[derive(Debug)]
342 : pub(crate) struct RemoteTimelineBlobData {
343 : pub(crate) blob_data: BlobDataParseResult,
344 :
345 : // Index objects that were not used when loading `blob_data`, e.g. those from old generations
346 : pub(crate) unused_index_keys: Vec<ListingObject>,
347 :
348 : // Objects whose keys were not recognized at all, i.e. not layer files, not indices
349 : pub(crate) unknown_keys: Vec<ListingObject>,
350 : }
351 :
352 : #[derive(Debug)]
353 : pub(crate) enum BlobDataParseResult {
354 : Parsed {
355 : index_part: Box<IndexPart>,
356 : index_part_generation: Generation,
357 : s3_layers: HashSet<(LayerName, Generation)>,
358 : },
359 : /// The remains of a deleted Timeline (i.e. an initdb archive only)
360 : Relic,
361 : Incorrect {
362 : errors: Vec<String>,
363 : s3_layers: HashSet<(LayerName, Generation)>,
364 : },
365 : }
366 :
367 0 : pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> {
368 0 : match name.rsplit_once('-') {
369 : // FIXME: this is gross, just use a regex?
370 0 : Some((layer_filename, gen)) if gen.len() == 8 => {
371 0 : let layer = layer_filename.parse::<LayerName>()?;
372 0 : let gen =
373 0 : Generation::parse_suffix(gen).ok_or("Malformed generation suffix".to_string())?;
374 0 : Ok((layer, gen))
375 : }
376 0 : _ => Ok((name.parse::<LayerName>()?, Generation::none())),
377 : }
378 0 : }
379 :
380 0 : pub(crate) async fn list_timeline_blobs(
381 0 : remote_client: &GenericRemoteStorage,
382 0 : id: TenantShardTimelineId,
383 0 : root_target: &RootTarget,
384 0 : ) -> anyhow::Result<RemoteTimelineBlobData> {
385 0 : let mut s3_layers = HashSet::new();
386 0 :
387 0 : let mut errors = Vec::new();
388 0 : let mut unknown_keys = Vec::new();
389 0 :
390 0 : let mut timeline_dir_target = root_target.timeline_root(&id);
391 0 : timeline_dir_target.delimiter = String::new();
392 0 :
393 0 : let mut index_part_keys: Vec<ListingObject> = Vec::new();
394 0 : let mut initdb_archive: bool = false;
395 0 :
396 0 : let prefix_str = &timeline_dir_target
397 0 : .prefix_in_bucket
398 0 : .strip_prefix("/")
399 0 : .unwrap_or(&timeline_dir_target.prefix_in_bucket);
400 0 :
401 0 : let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
402 0 : while let Some(obj) = stream.next().await {
403 0 : let (key, Some(obj)) = obj? else {
404 0 : panic!("ListingObject not specified");
405 : };
406 :
407 0 : let blob_name = key.get_path().as_str().strip_prefix(prefix_str);
408 0 : match blob_name {
409 0 : Some(name) if name.starts_with("index_part.json") => {
410 0 : tracing::debug!("Index key {key}");
411 0 : index_part_keys.push(obj)
412 : }
413 0 : Some("initdb.tar.zst") => {
414 0 : tracing::debug!("initdb archive {key}");
415 0 : initdb_archive = true;
416 : }
417 0 : Some("initdb-preserved.tar.zst") => {
418 0 : tracing::info!("initdb archive preserved {key}");
419 : }
420 0 : Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
421 0 : Ok((new_layer, gen)) => {
422 0 : tracing::debug!("Parsed layer key: {new_layer} {gen:?}");
423 0 : s3_layers.insert((new_layer, gen));
424 : }
425 0 : Err(e) => {
426 0 : tracing::info!("Error parsing key {maybe_layer_name}");
427 0 : errors.push(
428 0 : format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
429 0 : );
430 0 : unknown_keys.push(obj);
431 : }
432 : },
433 : None => {
434 0 : tracing::warn!("Unknown key {key}");
435 0 : errors.push(format!("S3 list response got an object with odd key {key}"));
436 0 : unknown_keys.push(obj);
437 : }
438 : }
439 : }
440 :
441 0 : if index_part_keys.is_empty() && s3_layers.is_empty() && initdb_archive {
442 0 : tracing::debug!(
443 0 : "Timeline is empty apart from initdb archive: expected post-deletion state."
444 : );
445 0 : return Ok(RemoteTimelineBlobData {
446 0 : blob_data: BlobDataParseResult::Relic,
447 0 : unused_index_keys: index_part_keys,
448 0 : unknown_keys: Vec::new(),
449 0 : });
450 0 : }
451 :
452 : // Choose the index_part with the highest generation
453 0 : let (index_part_object, index_part_generation) = match index_part_keys
454 0 : .iter()
455 0 : .filter_map(|key| {
456 0 : // Stripping the index key to the last part, because RemotePath doesn't
457 0 : // like absolute paths, and depending on prefix_in_bucket it's possible
458 0 : // for the keys we read back to start with a slash.
459 0 : let basename = key.key.get_path().as_str().rsplit_once('/').unwrap().1;
460 0 : parse_remote_index_path(RemotePath::from_string(basename).unwrap()).map(|g| (key, g))
461 0 : })
462 0 : .max_by_key(|i| i.1)
463 0 : .map(|(k, g)| (k.clone(), g))
464 : {
465 0 : Some((key, gen)) => (Some::<ListingObject>(key.to_owned()), gen),
466 : None => {
467 : // Legacy/missing case: one or zero index parts, which did not have a generation
468 0 : (index_part_keys.pop(), Generation::none())
469 : }
470 : };
471 :
472 0 : match index_part_object.as_ref() {
473 0 : Some(selected) => index_part_keys.retain(|k| k != selected),
474 0 : None => {
475 0 : errors.push("S3 list response got no index_part.json file".to_string());
476 0 : }
477 : }
478 :
479 0 : if let Some(index_part_object_key) = index_part_object.as_ref() {
480 0 : let index_part_bytes =
481 0 : download_object_with_retries(remote_client, &index_part_object_key.key)
482 0 : .await
483 0 : .context("index_part.json download")?;
484 :
485 0 : match serde_json::from_slice(&index_part_bytes) {
486 0 : Ok(index_part) => {
487 0 : return Ok(RemoteTimelineBlobData {
488 0 : blob_data: BlobDataParseResult::Parsed {
489 0 : index_part: Box::new(index_part),
490 0 : index_part_generation,
491 0 : s3_layers,
492 0 : },
493 0 : unused_index_keys: index_part_keys,
494 0 : unknown_keys,
495 0 : })
496 : }
497 0 : Err(index_parse_error) => errors.push(format!(
498 0 : "index_part.json body parsing error: {index_parse_error}"
499 0 : )),
500 : }
501 0 : }
502 :
503 0 : if errors.is_empty() {
504 0 : errors.push(
505 0 : "Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
506 0 : );
507 0 : }
508 :
509 0 : Ok(RemoteTimelineBlobData {
510 0 : blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
511 0 : unused_index_keys: index_part_keys,
512 0 : unknown_keys,
513 0 : })
514 0 : }
|