Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 :
3 : use itertools::Itertools;
4 : use pageserver::tenant::checks::check_valid_layermap;
5 : use pageserver::tenant::layer_map::LayerMap;
6 : use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
7 : use pageserver_api::shard::ShardIndex;
8 : use tokio_util::sync::CancellationToken;
9 : use tracing::{info, warn};
10 : use utils::generation::Generation;
11 : use utils::id::TimelineId;
12 :
13 : use crate::cloud_admin_api::BranchData;
14 : use crate::metadata_stream::stream_listing;
15 : use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId};
16 : use futures_util::StreamExt;
17 : use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
18 : use pageserver::tenant::storage_layer::LayerName;
19 : use pageserver::tenant::IndexPart;
20 : use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath};
21 :
22 : pub(crate) struct TimelineAnalysis {
23 : /// Anomalies detected
24 : pub(crate) errors: Vec<String>,
25 :
26 : /// Healthy-but-noteworthy, like old-versioned structures that are readable but
27 : /// worth reporting for awareness that we must not remove that old version decoding
28 : /// yet.
29 : pub(crate) warnings: Vec<String>,
30 :
31 : /// Objects whose keys were not recognized at all, i.e. not layer files, not indices, and not initdb archive.
32 : pub(crate) unknown_keys: Vec<String>,
33 : }
34 :
35 : impl TimelineAnalysis {
36 0 : fn new() -> Self {
37 0 : Self {
38 0 : errors: Vec::new(),
39 0 : warnings: Vec::new(),
40 0 : unknown_keys: Vec::new(),
41 0 : }
42 0 : }
43 :
44 : /// Whether a timeline is healthy.
45 0 : pub(crate) fn is_healthy(&self) -> bool {
46 0 : self.errors.is_empty() && self.warnings.is_empty()
47 0 : }
48 : }
49 :
50 0 : pub(crate) async fn branch_cleanup_and_check_errors(
51 0 : remote_client: &GenericRemoteStorage,
52 0 : id: &TenantShardTimelineId,
53 0 : tenant_objects: &mut TenantObjectListing,
54 0 : s3_active_branch: Option<&BranchData>,
55 0 : console_branch: Option<BranchData>,
56 0 : s3_data: Option<RemoteTimelineBlobData>,
57 0 : ) -> TimelineAnalysis {
58 0 : let mut result = TimelineAnalysis::new();
59 0 :
60 0 : info!("Checking timeline");
61 :
62 0 : if let Some(s3_active_branch) = s3_active_branch {
63 0 : info!(
64 0 : "Checking console status for timeline for branch {:?}/{:?}",
65 : s3_active_branch.project_id, s3_active_branch.id
66 : );
67 0 : match console_branch {
68 0 : Some(_) => {result.errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
69 0 : s3_active_branch.id, s3_active_branch.project_id))
70 : },
71 : None => {
72 0 : result.errors.push(format!("Timeline has no branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
73 0 : s3_active_branch.id, s3_active_branch.project_id))
74 : }
75 : };
76 0 : }
77 :
78 0 : match s3_data {
79 0 : Some(s3_data) => {
80 0 : result
81 0 : .unknown_keys
82 0 : .extend(s3_data.unknown_keys.into_iter().map(|k| k.key.to_string()));
83 0 :
84 0 : match s3_data.blob_data {
85 : BlobDataParseResult::Parsed {
86 0 : index_part,
87 0 : index_part_generation: _index_part_generation,
88 0 : s3_layers: _s3_layers,
89 0 : } => {
90 0 : if !IndexPart::KNOWN_VERSIONS.contains(&index_part.version()) {
91 0 : result
92 0 : .errors
93 0 : .push(format!("index_part.json version: {}", index_part.version()))
94 0 : }
95 :
96 0 : let mut newest_versions = IndexPart::KNOWN_VERSIONS.iter().rev().take(3);
97 0 : if !newest_versions.any(|ip| ip == &index_part.version()) {
98 0 : info!(
99 0 : "index_part.json version is not latest: {}",
100 0 : index_part.version()
101 : );
102 0 : }
103 :
104 0 : if index_part.metadata.disk_consistent_lsn()
105 0 : != index_part.duplicated_disk_consistent_lsn()
106 : {
107 : // Tech debt: let's get rid of one of these, they are redundant
108 : // https://github.com/neondatabase/neon/issues/8343
109 0 : result.errors.push(format!(
110 0 : "Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
111 0 : index_part.metadata.disk_consistent_lsn(),
112 0 : index_part.duplicated_disk_consistent_lsn(),
113 0 : ))
114 0 : }
115 :
116 0 : if index_part.layer_metadata.is_empty() {
117 0 : if index_part.metadata.ancestor_timeline().is_none() {
118 0 : // The initial timeline with no ancestor should ALWAYS have layers.
119 0 : result.errors.push(
120 0 : "index_part.json has no layers (ancestor_timeline=None)"
121 0 : .to_string(),
122 0 : );
123 0 : } else {
124 : // Not an error, can happen for branches with zero writes, but notice that
125 0 : info!("index_part.json has no layers (ancestor_timeline exists)");
126 : }
127 0 : }
128 :
129 0 : let layer_names = index_part.layer_metadata.keys().cloned().collect_vec();
130 0 : if let Some(err) = check_valid_layermap(&layer_names) {
131 0 : result.errors.push(format!(
132 0 : "index_part.json contains invalid layer map structure: {err}"
133 0 : ));
134 0 : }
135 :
136 0 : for (layer, metadata) in index_part.layer_metadata {
137 0 : if metadata.file_size == 0 {
138 0 : result.errors.push(format!(
139 0 : "index_part.json contains a layer {} that has 0 size in its layer metadata", layer,
140 0 : ))
141 0 : }
142 :
143 0 : if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
144 0 : let path = remote_layer_path(
145 0 : &id.tenant_shard_id.tenant_id,
146 0 : &id.timeline_id,
147 0 : metadata.shard,
148 0 : &layer,
149 0 : metadata.generation,
150 0 : );
151 :
152 : // HEAD request used here to address a race condition when an index was uploaded concurrently
153 : // with our scan. We check if the object is uploaded to S3 after taking the listing snapshot.
154 0 : let response = remote_client
155 0 : .head_object(&path, &CancellationToken::new())
156 0 : .await;
157 :
158 0 : if response.is_err() {
159 : // Object is not present.
160 0 : let is_l0 = LayerMap::is_l0(layer.key_range(), layer.is_delta());
161 0 :
162 0 : let msg = format!(
163 0 : "index_part.json contains a layer {}{} (shard {}) that is not present in remote storage (layer_is_l0: {})",
164 0 : layer,
165 0 : metadata.generation.get_suffix(),
166 0 : metadata.shard,
167 0 : is_l0,
168 0 : );
169 0 :
170 0 : if is_l0 {
171 0 : result.warnings.push(msg);
172 0 : } else {
173 0 : result.errors.push(msg);
174 0 : }
175 0 : }
176 0 : }
177 : }
178 : }
179 0 : BlobDataParseResult::Relic => {}
180 : BlobDataParseResult::Incorrect {
181 0 : errors,
182 0 : s3_layers: _,
183 0 : } => result.errors.extend(
184 0 : errors
185 0 : .into_iter()
186 0 : .map(|error| format!("parse error: {error}")),
187 0 : ),
188 : }
189 : }
190 0 : None => result
191 0 : .errors
192 0 : .push("Timeline has no data on S3 at all".to_string()),
193 : }
194 :
195 0 : if result.errors.is_empty() {
196 0 : info!("No check errors found");
197 : } else {
198 0 : warn!("Timeline metadata errors: {0:?}", result.errors);
199 : }
200 :
201 0 : if !result.warnings.is_empty() {
202 0 : warn!("Timeline metadata warnings: {0:?}", result.warnings);
203 0 : }
204 :
205 0 : if !result.unknown_keys.is_empty() {
206 0 : warn!(
207 0 : "The following keys are not recognized: {0:?}",
208 : result.unknown_keys
209 : )
210 0 : }
211 :
212 0 : result
213 0 : }
214 :
215 : #[derive(Default)]
216 : pub(crate) struct LayerRef {
217 : ref_count: usize,
218 : }
219 :
220 : /// Top-level index of objects in a tenant. This may be used by any shard-timeline within
221 : /// the tenant to query whether an object exists.
222 : #[derive(Default)]
223 : pub(crate) struct TenantObjectListing {
224 : shard_timelines: HashMap<(ShardIndex, TimelineId), HashMap<(LayerName, Generation), LayerRef>>,
225 : }
226 :
227 : impl TenantObjectListing {
228 : /// Having done an S3 listing of the keys within a timeline prefix, merge them into the overall
229 : /// list of layer keys for the Tenant.
230 0 : pub(crate) fn push(
231 0 : &mut self,
232 0 : ttid: TenantShardTimelineId,
233 0 : layers: HashSet<(LayerName, Generation)>,
234 0 : ) {
235 0 : let shard_index = ShardIndex::new(
236 0 : ttid.tenant_shard_id.shard_number,
237 0 : ttid.tenant_shard_id.shard_count,
238 0 : );
239 0 : let replaced = self.shard_timelines.insert(
240 0 : (shard_index, ttid.timeline_id),
241 0 : layers
242 0 : .into_iter()
243 0 : .map(|l| (l, LayerRef::default()))
244 0 : .collect(),
245 0 : );
246 0 :
247 0 : assert!(
248 0 : replaced.is_none(),
249 0 : "Built from an S3 object listing, which should never repeat a key"
250 : );
251 0 : }
252 :
253 : /// Having loaded a timeline index, check if a layer referenced by the index exists. If it does,
254 : /// the layer's refcount will be incremented. Later, after calling this for all references in all indices
255 : /// in a tenant, orphan layers may be detected by their zero refcounts.
256 : ///
257 : /// Returns true if the layer exists
258 0 : pub(crate) fn check_ref(
259 0 : &mut self,
260 0 : timeline_id: TimelineId,
261 0 : layer_file: &LayerName,
262 0 : metadata: &LayerFileMetadata,
263 0 : ) -> bool {
264 0 : let Some(shard_tl) = self.shard_timelines.get_mut(&(metadata.shard, timeline_id)) else {
265 0 : return false;
266 : };
267 :
268 0 : let Some(layer_ref) = shard_tl.get_mut(&(layer_file.clone(), metadata.generation)) else {
269 0 : return false;
270 : };
271 :
272 0 : layer_ref.ref_count += 1;
273 0 :
274 0 : true
275 0 : }
276 :
277 0 : pub(crate) fn get_orphans(&self) -> Vec<(ShardIndex, TimelineId, LayerName, Generation)> {
278 0 : let mut result = Vec::new();
279 0 : for ((shard_index, timeline_id), layers) in &self.shard_timelines {
280 0 : for ((layer_file, generation), layer_ref) in layers {
281 0 : if layer_ref.ref_count == 0 {
282 0 : result.push((*shard_index, *timeline_id, layer_file.clone(), *generation))
283 0 : }
284 : }
285 : }
286 :
287 0 : result
288 0 : }
289 : }
290 :
291 : #[derive(Debug)]
292 : pub(crate) struct RemoteTimelineBlobData {
293 : pub(crate) blob_data: BlobDataParseResult,
294 :
295 : /// Index objects that were not used when loading `blob_data`, e.g. those from old generations
296 : pub(crate) unused_index_keys: Vec<ListingObject>,
297 :
298 : /// Objects whose keys were not recognized at all, i.e. not layer files, not indices
299 : pub(crate) unknown_keys: Vec<ListingObject>,
300 : }
301 :
302 : #[derive(Debug)]
303 : pub(crate) enum BlobDataParseResult {
304 : Parsed {
305 : index_part: Box<IndexPart>,
306 : index_part_generation: Generation,
307 : s3_layers: HashSet<(LayerName, Generation)>,
308 : },
309 : /// The remains of a deleted Timeline (i.e. an initdb archive only)
310 : Relic,
311 : Incorrect {
312 : errors: Vec<String>,
313 : s3_layers: HashSet<(LayerName, Generation)>,
314 : },
315 : }
316 :
317 0 : pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> {
318 0 : match name.rsplit_once('-') {
319 : // FIXME: this is gross, just use a regex?
320 0 : Some((layer_filename, gen)) if gen.len() == 8 => {
321 0 : let layer = layer_filename.parse::<LayerName>()?;
322 0 : let gen =
323 0 : Generation::parse_suffix(gen).ok_or("Malformed generation suffix".to_string())?;
324 0 : Ok((layer, gen))
325 : }
326 0 : _ => Ok((name.parse::<LayerName>()?, Generation::none())),
327 : }
328 0 : }
329 :
330 : /// Note (<https://github.com/neondatabase/neon/issues/8872>):
331 : /// Since we do not gurantee the order of the listing, we could list layer keys right before
332 : /// pageserver `RemoteTimelineClient` deletes the layer files and then the index.
333 : /// In the rare case, this would give back a transient error where the index key is missing.
334 : ///
335 : /// To avoid generating false positive, we try streaming the listing for a second time.
336 0 : pub(crate) async fn list_timeline_blobs(
337 0 : remote_client: &GenericRemoteStorage,
338 0 : id: TenantShardTimelineId,
339 0 : root_target: &RootTarget,
340 0 : ) -> anyhow::Result<RemoteTimelineBlobData> {
341 0 : let res = list_timeline_blobs_impl(remote_client, id, root_target).await?;
342 0 : match res {
343 0 : ListTimelineBlobsResult::Ready(data) => Ok(data),
344 : ListTimelineBlobsResult::MissingIndexPart(_) => {
345 : // Retry if index is missing.
346 0 : let data = list_timeline_blobs_impl(remote_client, id, root_target)
347 0 : .await?
348 0 : .into_data();
349 0 : Ok(data)
350 : }
351 : }
352 0 : }
353 :
354 : enum ListTimelineBlobsResult {
355 : /// Blob data is ready to be intepreted.
356 : Ready(RemoteTimelineBlobData),
357 : /// List timeline blobs has layer files but is missing [`IndexPart`].
358 : MissingIndexPart(RemoteTimelineBlobData),
359 : }
360 :
361 : impl ListTimelineBlobsResult {
362 : /// Get the inner blob data regardless the status.
363 0 : pub fn into_data(self) -> RemoteTimelineBlobData {
364 0 : match self {
365 0 : ListTimelineBlobsResult::Ready(data) => data,
366 0 : ListTimelineBlobsResult::MissingIndexPart(data) => data,
367 : }
368 0 : }
369 : }
370 :
371 : /// Returns [`ListTimelineBlobsResult::MissingIndexPart`] if blob data has layer files
372 : /// but is missing [`IndexPart`], otherwise returns [`ListTimelineBlobsResult::Ready`].
373 0 : async fn list_timeline_blobs_impl(
374 0 : remote_client: &GenericRemoteStorage,
375 0 : id: TenantShardTimelineId,
376 0 : root_target: &RootTarget,
377 0 : ) -> anyhow::Result<ListTimelineBlobsResult> {
378 0 : let mut s3_layers = HashSet::new();
379 0 :
380 0 : let mut errors = Vec::new();
381 0 : let mut unknown_keys = Vec::new();
382 0 :
383 0 : let mut timeline_dir_target = root_target.timeline_root(&id);
384 0 : timeline_dir_target.delimiter = String::new();
385 0 :
386 0 : let mut index_part_keys: Vec<ListingObject> = Vec::new();
387 0 : let mut initdb_archive: bool = false;
388 0 :
389 0 : let prefix_str = &timeline_dir_target
390 0 : .prefix_in_bucket
391 0 : .strip_prefix("/")
392 0 : .unwrap_or(&timeline_dir_target.prefix_in_bucket);
393 0 :
394 0 : let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
395 0 : while let Some(obj) = stream.next().await {
396 0 : let (key, Some(obj)) = obj? else {
397 0 : panic!("ListingObject not specified");
398 : };
399 :
400 0 : let blob_name = key.get_path().as_str().strip_prefix(prefix_str);
401 0 : match blob_name {
402 0 : Some(name) if name.starts_with("index_part.json") => {
403 0 : tracing::debug!("Index key {key}");
404 0 : index_part_keys.push(obj)
405 : }
406 0 : Some("initdb.tar.zst") => {
407 0 : tracing::debug!("initdb archive {key}");
408 0 : initdb_archive = true;
409 : }
410 0 : Some("initdb-preserved.tar.zst") => {
411 0 : tracing::info!("initdb archive preserved {key}");
412 : }
413 0 : Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
414 0 : Ok((new_layer, gen)) => {
415 0 : tracing::debug!("Parsed layer key: {new_layer} {gen:?}");
416 0 : s3_layers.insert((new_layer, gen));
417 : }
418 0 : Err(e) => {
419 0 : tracing::info!("Error parsing {maybe_layer_name} as layer name: {e}");
420 0 : unknown_keys.push(obj);
421 : }
422 : },
423 : None => {
424 0 : tracing::info!("S3 listed an unknown key: {key}");
425 0 : unknown_keys.push(obj);
426 : }
427 : }
428 : }
429 :
430 0 : if index_part_keys.is_empty() && s3_layers.is_empty() {
431 0 : tracing::debug!("Timeline is empty: expected post-deletion state.");
432 0 : if initdb_archive {
433 0 : tracing::info!("Timeline is post deletion but initdb archive is still present.");
434 0 : }
435 :
436 0 : return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
437 0 : blob_data: BlobDataParseResult::Relic,
438 0 : unused_index_keys: index_part_keys,
439 0 : unknown_keys,
440 0 : }));
441 0 : }
442 :
443 : // Choose the index_part with the highest generation
444 0 : let (index_part_object, index_part_generation) = match index_part_keys
445 0 : .iter()
446 0 : .filter_map(|key| {
447 0 : // Stripping the index key to the last part, because RemotePath doesn't
448 0 : // like absolute paths, and depending on prefix_in_bucket it's possible
449 0 : // for the keys we read back to start with a slash.
450 0 : let basename = key.key.get_path().as_str().rsplit_once('/').unwrap().1;
451 0 : parse_remote_index_path(RemotePath::from_string(basename).unwrap()).map(|g| (key, g))
452 0 : })
453 0 : .max_by_key(|i| i.1)
454 0 : .map(|(k, g)| (k.clone(), g))
455 : {
456 0 : Some((key, gen)) => (Some::<ListingObject>(key.to_owned()), gen),
457 : None => {
458 : // Legacy/missing case: one or zero index parts, which did not have a generation
459 0 : (index_part_keys.pop(), Generation::none())
460 : }
461 : };
462 :
463 0 : match index_part_object.as_ref() {
464 0 : Some(selected) => index_part_keys.retain(|k| k != selected),
465 : None => {
466 : // It is possible that the branch gets deleted after we got some layer files listed
467 : // and we no longer have the index file in the listing.
468 0 : errors.push(
469 0 : "S3 list response got no index_part.json file but still has layer files"
470 0 : .to_string(),
471 0 : );
472 0 : return Ok(ListTimelineBlobsResult::MissingIndexPart(
473 0 : RemoteTimelineBlobData {
474 0 : blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
475 0 : unused_index_keys: index_part_keys,
476 0 : unknown_keys,
477 0 : },
478 0 : ));
479 : }
480 : }
481 :
482 0 : if let Some(index_part_object_key) = index_part_object.as_ref() {
483 0 : let index_part_bytes =
484 0 : match download_object_with_retries(remote_client, &index_part_object_key.key).await {
485 0 : Ok(index_part_bytes) => index_part_bytes,
486 0 : Err(e) => {
487 0 : // It is possible that the branch gets deleted in-between we list the objects
488 0 : // and we download the index part file.
489 0 : errors.push(format!("failed to download index_part.json: {e}"));
490 0 : return Ok(ListTimelineBlobsResult::MissingIndexPart(
491 0 : RemoteTimelineBlobData {
492 0 : blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
493 0 : unused_index_keys: index_part_keys,
494 0 : unknown_keys,
495 0 : },
496 0 : ));
497 : }
498 : };
499 :
500 0 : match serde_json::from_slice(&index_part_bytes) {
501 0 : Ok(index_part) => {
502 0 : return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
503 0 : blob_data: BlobDataParseResult::Parsed {
504 0 : index_part: Box::new(index_part),
505 0 : index_part_generation,
506 0 : s3_layers,
507 0 : },
508 0 : unused_index_keys: index_part_keys,
509 0 : unknown_keys,
510 0 : }))
511 : }
512 0 : Err(index_parse_error) => errors.push(format!(
513 0 : "index_part.json body parsing error: {index_parse_error}"
514 0 : )),
515 : }
516 0 : }
517 :
518 0 : if errors.is_empty() {
519 0 : errors.push(
520 0 : "Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
521 0 : );
522 0 : }
523 :
524 0 : Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
525 0 : blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
526 0 : unused_index_keys: index_part_keys,
527 0 : unknown_keys,
528 0 : }))
529 0 : }
|