Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 :
3 : use itertools::Itertools;
4 : use pageserver::tenant::checks::check_valid_layermap;
5 : use pageserver::tenant::layer_map::LayerMap;
6 : use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
7 : use pageserver::tenant::remote_timeline_client::manifest::TenantManifest;
8 : use pageserver_api::shard::ShardIndex;
9 : use tokio_util::sync::CancellationToken;
10 : use tracing::{info, warn};
11 : use utils::generation::Generation;
12 : use utils::id::TimelineId;
13 : use utils::shard::TenantShardId;
14 :
15 : use crate::cloud_admin_api::BranchData;
16 : use crate::metadata_stream::stream_listing;
17 : use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId};
18 : use futures_util::StreamExt;
19 : use pageserver::tenant::remote_timeline_client::{
20 : parse_remote_index_path, parse_remote_tenant_manifest_path, remote_layer_path,
21 : };
22 : use pageserver::tenant::storage_layer::LayerName;
23 : use pageserver::tenant::IndexPart;
24 : use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath};
25 :
26 : pub(crate) struct TimelineAnalysis {
27 : /// Anomalies detected
28 : pub(crate) errors: Vec<String>,
29 :
30 : /// Healthy-but-noteworthy, like old-versioned structures that are readable but
31 : /// worth reporting for awareness that we must not remove that old version decoding
32 : /// yet.
33 : pub(crate) warnings: Vec<String>,
34 :
35 : /// Objects whose keys were not recognized at all, i.e. not layer files, not indices, and not initdb archive.
36 : pub(crate) unknown_keys: Vec<String>,
37 : }
38 :
39 : impl TimelineAnalysis {
40 0 : fn new() -> Self {
41 0 : Self {
42 0 : errors: Vec::new(),
43 0 : warnings: Vec::new(),
44 0 : unknown_keys: Vec::new(),
45 0 : }
46 0 : }
47 :
48 : /// Whether a timeline is healthy.
49 0 : pub(crate) fn is_healthy(&self) -> bool {
50 0 : self.errors.is_empty() && self.warnings.is_empty()
51 0 : }
52 : }
53 :
54 0 : pub(crate) async fn branch_cleanup_and_check_errors(
55 0 : remote_client: &GenericRemoteStorage,
56 0 : id: &TenantShardTimelineId,
57 0 : tenant_objects: &mut TenantObjectListing,
58 0 : s3_active_branch: Option<&BranchData>,
59 0 : console_branch: Option<BranchData>,
60 0 : s3_data: Option<RemoteTimelineBlobData>,
61 0 : ) -> TimelineAnalysis {
62 0 : let mut result = TimelineAnalysis::new();
63 0 :
64 0 : info!("Checking timeline");
65 :
66 0 : if let Some(s3_active_branch) = s3_active_branch {
67 0 : info!(
68 0 : "Checking console status for timeline for branch {:?}/{:?}",
69 : s3_active_branch.project_id, s3_active_branch.id
70 : );
71 0 : match console_branch {
72 0 : Some(_) => {result.errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
73 0 : s3_active_branch.id, s3_active_branch.project_id))
74 : },
75 : None => {
76 0 : result.errors.push(format!("Timeline has no branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
77 0 : s3_active_branch.id, s3_active_branch.project_id))
78 : }
79 : };
80 0 : }
81 :
82 0 : match s3_data {
83 0 : Some(s3_data) => {
84 0 : result
85 0 : .unknown_keys
86 0 : .extend(s3_data.unknown_keys.into_iter().map(|k| k.key.to_string()));
87 0 :
88 0 : match s3_data.blob_data {
89 : BlobDataParseResult::Parsed {
90 0 : index_part,
91 0 : index_part_generation: _index_part_generation,
92 0 : s3_layers: _s3_layers,
93 0 : } => {
94 0 : if !IndexPart::KNOWN_VERSIONS.contains(&index_part.version()) {
95 0 : result
96 0 : .errors
97 0 : .push(format!("index_part.json version: {}", index_part.version()))
98 0 : }
99 :
100 0 : let mut newest_versions = IndexPart::KNOWN_VERSIONS.iter().rev().take(3);
101 0 : if !newest_versions.any(|ip| ip == &index_part.version()) {
102 0 : info!(
103 0 : "index_part.json version is not latest: {}",
104 0 : index_part.version()
105 : );
106 0 : }
107 :
108 0 : if index_part.metadata.disk_consistent_lsn()
109 0 : != index_part.duplicated_disk_consistent_lsn()
110 : {
111 : // Tech debt: let's get rid of one of these, they are redundant
112 : // https://github.com/neondatabase/neon/issues/8343
113 0 : result.errors.push(format!(
114 0 : "Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
115 0 : index_part.metadata.disk_consistent_lsn(),
116 0 : index_part.duplicated_disk_consistent_lsn(),
117 0 : ))
118 0 : }
119 :
120 0 : if index_part.layer_metadata.is_empty() {
121 0 : if index_part.metadata.ancestor_timeline().is_none() {
122 0 : // The initial timeline with no ancestor should ALWAYS have layers.
123 0 : result.errors.push(
124 0 : "index_part.json has no layers (ancestor_timeline=None)"
125 0 : .to_string(),
126 0 : );
127 0 : } else {
128 : // Not an error, can happen for branches with zero writes, but notice that
129 0 : info!("index_part.json has no layers (ancestor_timeline exists)");
130 : }
131 0 : }
132 :
133 0 : let layer_names = index_part.layer_metadata.keys().cloned().collect_vec();
134 0 : if let Some(err) = check_valid_layermap(&layer_names) {
135 0 : result.warnings.push(format!(
136 0 : "index_part.json contains invalid layer map structure: {err}"
137 0 : ));
138 0 : }
139 :
140 0 : for (layer, metadata) in index_part.layer_metadata {
141 0 : if metadata.file_size == 0 {
142 0 : result.errors.push(format!(
143 0 : "index_part.json contains a layer {} that has 0 size in its layer metadata", layer,
144 0 : ))
145 0 : }
146 :
147 0 : if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
148 0 : let path = remote_layer_path(
149 0 : &id.tenant_shard_id.tenant_id,
150 0 : &id.timeline_id,
151 0 : metadata.shard,
152 0 : &layer,
153 0 : metadata.generation,
154 0 : );
155 :
156 : // HEAD request used here to address a race condition when an index was uploaded concurrently
157 : // with our scan. We check if the object is uploaded to S3 after taking the listing snapshot.
158 0 : let response = remote_client
159 0 : .head_object(&path, &CancellationToken::new())
160 0 : .await;
161 :
162 0 : if response.is_err() {
163 : // Object is not present.
164 0 : let is_l0 = LayerMap::is_l0(layer.key_range(), layer.is_delta());
165 0 :
166 0 : let msg = format!(
167 0 : "index_part.json contains a layer {}{} (shard {}) that is not present in remote storage (layer_is_l0: {})",
168 0 : layer,
169 0 : metadata.generation.get_suffix(),
170 0 : metadata.shard,
171 0 : is_l0,
172 0 : );
173 0 :
174 0 : if is_l0 {
175 0 : result.warnings.push(msg);
176 0 : } else {
177 0 : result.errors.push(msg);
178 0 : }
179 0 : }
180 0 : }
181 : }
182 : }
183 0 : BlobDataParseResult::Relic => {}
184 : BlobDataParseResult::Incorrect {
185 0 : errors,
186 0 : s3_layers: _,
187 0 : } => result.errors.extend(
188 0 : errors
189 0 : .into_iter()
190 0 : .map(|error| format!("parse error: {error}")),
191 0 : ),
192 : }
193 : }
194 0 : None => result
195 0 : .errors
196 0 : .push("Timeline has no data on S3 at all".to_string()),
197 : }
198 :
199 0 : if result.errors.is_empty() {
200 0 : info!("No check errors found");
201 : } else {
202 0 : warn!("Timeline metadata errors: {0:?}", result.errors);
203 : }
204 :
205 0 : if !result.warnings.is_empty() {
206 0 : warn!("Timeline metadata warnings: {0:?}", result.warnings);
207 0 : }
208 :
209 0 : if !result.unknown_keys.is_empty() {
210 0 : warn!(
211 0 : "The following keys are not recognized: {0:?}",
212 : result.unknown_keys
213 : )
214 0 : }
215 :
216 0 : result
217 0 : }
218 :
219 : #[derive(Default)]
220 : pub(crate) struct LayerRef {
221 : ref_count: usize,
222 : }
223 :
224 : /// Top-level index of objects in a tenant. This may be used by any shard-timeline within
225 : /// the tenant to query whether an object exists.
226 : #[derive(Default)]
227 : pub(crate) struct TenantObjectListing {
228 : shard_timelines: HashMap<(ShardIndex, TimelineId), HashMap<(LayerName, Generation), LayerRef>>,
229 : }
230 :
231 : impl TenantObjectListing {
232 : /// Having done an S3 listing of the keys within a timeline prefix, merge them into the overall
233 : /// list of layer keys for the Tenant.
234 0 : pub(crate) fn push(
235 0 : &mut self,
236 0 : ttid: TenantShardTimelineId,
237 0 : layers: HashSet<(LayerName, Generation)>,
238 0 : ) {
239 0 : let shard_index = ShardIndex::new(
240 0 : ttid.tenant_shard_id.shard_number,
241 0 : ttid.tenant_shard_id.shard_count,
242 0 : );
243 0 : let replaced = self.shard_timelines.insert(
244 0 : (shard_index, ttid.timeline_id),
245 0 : layers
246 0 : .into_iter()
247 0 : .map(|l| (l, LayerRef::default()))
248 0 : .collect(),
249 0 : );
250 0 :
251 0 : assert!(
252 0 : replaced.is_none(),
253 0 : "Built from an S3 object listing, which should never repeat a key"
254 : );
255 0 : }
256 :
257 : /// Having loaded a timeline index, check if a layer referenced by the index exists. If it does,
258 : /// the layer's refcount will be incremented. Later, after calling this for all references in all indices
259 : /// in a tenant, orphan layers may be detected by their zero refcounts.
260 : ///
261 : /// Returns true if the layer exists
262 0 : pub(crate) fn check_ref(
263 0 : &mut self,
264 0 : timeline_id: TimelineId,
265 0 : layer_file: &LayerName,
266 0 : metadata: &LayerFileMetadata,
267 0 : ) -> bool {
268 0 : let Some(shard_tl) = self.shard_timelines.get_mut(&(metadata.shard, timeline_id)) else {
269 0 : return false;
270 : };
271 :
272 0 : let Some(layer_ref) = shard_tl.get_mut(&(layer_file.clone(), metadata.generation)) else {
273 0 : return false;
274 : };
275 :
276 0 : layer_ref.ref_count += 1;
277 0 :
278 0 : true
279 0 : }
280 :
281 0 : pub(crate) fn get_orphans(&self) -> Vec<(ShardIndex, TimelineId, LayerName, Generation)> {
282 0 : let mut result = Vec::new();
283 0 : for ((shard_index, timeline_id), layers) in &self.shard_timelines {
284 0 : for ((layer_file, generation), layer_ref) in layers {
285 0 : if layer_ref.ref_count == 0 {
286 0 : result.push((*shard_index, *timeline_id, layer_file.clone(), *generation))
287 0 : }
288 : }
289 : }
290 :
291 0 : result
292 0 : }
293 : }
294 :
295 : #[derive(Debug)]
296 : pub(crate) struct RemoteTimelineBlobData {
297 : pub(crate) blob_data: BlobDataParseResult,
298 :
299 : /// Index objects that were not used when loading `blob_data`, e.g. those from old generations
300 : pub(crate) unused_index_keys: Vec<ListingObject>,
301 :
302 : /// Objects whose keys were not recognized at all, i.e. not layer files, not indices
303 : pub(crate) unknown_keys: Vec<ListingObject>,
304 : }
305 :
306 : #[derive(Debug)]
307 : pub(crate) enum BlobDataParseResult {
308 : Parsed {
309 : index_part: Box<IndexPart>,
310 : index_part_generation: Generation,
311 : s3_layers: HashSet<(LayerName, Generation)>,
312 : },
313 : /// The remains of an uncleanly deleted Timeline or aborted timeline creation(e.g. an initdb archive only, or some layer without an index)
314 : Relic,
315 : Incorrect {
316 : errors: Vec<String>,
317 : s3_layers: HashSet<(LayerName, Generation)>,
318 : },
319 : }
320 :
321 0 : pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> {
322 0 : match name.rsplit_once('-') {
323 : // FIXME: this is gross, just use a regex?
324 0 : Some((layer_filename, gen)) if gen.len() == 8 => {
325 0 : let layer = layer_filename.parse::<LayerName>()?;
326 0 : let gen =
327 0 : Generation::parse_suffix(gen).ok_or("Malformed generation suffix".to_string())?;
328 0 : Ok((layer, gen))
329 : }
330 0 : _ => Ok((name.parse::<LayerName>()?, Generation::none())),
331 : }
332 0 : }
333 :
334 : /// Note (<https://github.com/neondatabase/neon/issues/8872>):
335 : /// Since we do not gurantee the order of the listing, we could list layer keys right before
336 : /// pageserver `RemoteTimelineClient` deletes the layer files and then the index.
337 : /// In the rare case, this would give back a transient error where the index key is missing.
338 : ///
339 : /// To avoid generating false positive, we try streaming the listing for a second time.
340 0 : pub(crate) async fn list_timeline_blobs(
341 0 : remote_client: &GenericRemoteStorage,
342 0 : id: TenantShardTimelineId,
343 0 : root_target: &RootTarget,
344 0 : ) -> anyhow::Result<RemoteTimelineBlobData> {
345 0 : let res = list_timeline_blobs_impl(remote_client, id, root_target).await?;
346 0 : match res {
347 0 : ListTimelineBlobsResult::Ready(data) => Ok(data),
348 : ListTimelineBlobsResult::MissingIndexPart(_) => {
349 : // Retry if listing raced with removal of an index
350 0 : let data = list_timeline_blobs_impl(remote_client, id, root_target)
351 0 : .await?
352 0 : .into_data();
353 0 : Ok(data)
354 : }
355 : }
356 0 : }
357 :
358 : enum ListTimelineBlobsResult {
359 : /// Blob data is ready to be intepreted.
360 : Ready(RemoteTimelineBlobData),
361 : /// The listing contained an index but when we tried to fetch it, we couldn't
362 : MissingIndexPart(RemoteTimelineBlobData),
363 : }
364 :
365 : impl ListTimelineBlobsResult {
366 : /// Get the inner blob data regardless the status.
367 0 : pub fn into_data(self) -> RemoteTimelineBlobData {
368 0 : match self {
369 0 : ListTimelineBlobsResult::Ready(data) => data,
370 0 : ListTimelineBlobsResult::MissingIndexPart(data) => data,
371 : }
372 0 : }
373 : }
374 :
375 : /// Returns [`ListTimelineBlobsResult::MissingIndexPart`] if blob data has layer files
376 : /// but is missing [`IndexPart`], otherwise returns [`ListTimelineBlobsResult::Ready`].
377 0 : async fn list_timeline_blobs_impl(
378 0 : remote_client: &GenericRemoteStorage,
379 0 : id: TenantShardTimelineId,
380 0 : root_target: &RootTarget,
381 0 : ) -> anyhow::Result<ListTimelineBlobsResult> {
382 0 : let mut s3_layers = HashSet::new();
383 0 :
384 0 : let mut errors = Vec::new();
385 0 : let mut unknown_keys = Vec::new();
386 0 :
387 0 : let mut timeline_dir_target = root_target.timeline_root(&id);
388 0 : timeline_dir_target.delimiter = String::new();
389 0 :
390 0 : let mut index_part_keys: Vec<ListingObject> = Vec::new();
391 0 : let mut initdb_archive: bool = false;
392 0 :
393 0 : let prefix_str = &timeline_dir_target
394 0 : .prefix_in_bucket
395 0 : .strip_prefix("/")
396 0 : .unwrap_or(&timeline_dir_target.prefix_in_bucket);
397 0 :
398 0 : let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
399 0 : while let Some(obj) = stream.next().await {
400 0 : let (key, Some(obj)) = obj? else {
401 0 : panic!("ListingObject not specified");
402 : };
403 :
404 0 : let blob_name = key.get_path().as_str().strip_prefix(prefix_str);
405 0 : match blob_name {
406 0 : Some(name) if name.starts_with("index_part.json") => {
407 0 : tracing::debug!("Index key {key}");
408 0 : index_part_keys.push(obj)
409 : }
410 0 : Some("initdb.tar.zst") => {
411 0 : tracing::debug!("initdb archive {key}");
412 0 : initdb_archive = true;
413 : }
414 0 : Some("initdb-preserved.tar.zst") => {
415 0 : tracing::info!("initdb archive preserved {key}");
416 : }
417 0 : Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
418 0 : Ok((new_layer, gen)) => {
419 0 : tracing::debug!("Parsed layer key: {new_layer} {gen:?}");
420 0 : s3_layers.insert((new_layer, gen));
421 : }
422 0 : Err(e) => {
423 0 : tracing::info!("Error parsing {maybe_layer_name} as layer name: {e}");
424 0 : unknown_keys.push(obj);
425 : }
426 : },
427 : None => {
428 0 : tracing::info!("S3 listed an unknown key: {key}");
429 0 : unknown_keys.push(obj);
430 : }
431 : }
432 : }
433 :
434 0 : if index_part_keys.is_empty() && s3_layers.is_empty() {
435 0 : tracing::debug!("Timeline is empty: expected post-deletion state.");
436 0 : if initdb_archive {
437 0 : tracing::info!("Timeline is post deletion but initdb archive is still present.");
438 0 : }
439 :
440 0 : return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
441 0 : blob_data: BlobDataParseResult::Relic,
442 0 : unused_index_keys: index_part_keys,
443 0 : unknown_keys,
444 0 : }));
445 0 : }
446 :
447 : // Choose the index_part with the highest generation
448 0 : let (index_part_object, index_part_generation) = match index_part_keys
449 0 : .iter()
450 0 : .filter_map(|key| {
451 0 : // Stripping the index key to the last part, because RemotePath doesn't
452 0 : // like absolute paths, and depending on prefix_in_bucket it's possible
453 0 : // for the keys we read back to start with a slash.
454 0 : let basename = key.key.get_path().as_str().rsplit_once('/').unwrap().1;
455 0 : parse_remote_index_path(RemotePath::from_string(basename).unwrap()).map(|g| (key, g))
456 0 : })
457 0 : .max_by_key(|i| i.1)
458 0 : .map(|(k, g)| (k.clone(), g))
459 : {
460 0 : Some((key, gen)) => (Some::<ListingObject>(key.to_owned()), gen),
461 : None => {
462 : // Legacy/missing case: one or zero index parts, which did not have a generation
463 0 : (index_part_keys.pop(), Generation::none())
464 : }
465 : };
466 :
467 0 : match index_part_object.as_ref() {
468 0 : Some(selected) => index_part_keys.retain(|k| k != selected),
469 : None => {
470 : // This case does not indicate corruption, but it should be very unusual. It can
471 : // happen if:
472 : // - timeline creation is in progress (first layer is written before index is written)
473 : // - timeline deletion happened while a stale pageserver was still attached, it might upload
474 : // a layer after the deletion is done.
475 0 : tracing::info!(
476 0 : "S3 list response got no index_part.json file but still has layer files"
477 : );
478 0 : return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
479 0 : blob_data: BlobDataParseResult::Relic,
480 0 : unused_index_keys: index_part_keys,
481 0 : unknown_keys,
482 0 : }));
483 : }
484 : }
485 :
486 0 : if let Some(index_part_object_key) = index_part_object.as_ref() {
487 0 : let index_part_bytes =
488 0 : match download_object_with_retries(remote_client, &index_part_object_key.key).await {
489 0 : Ok(index_part_bytes) => index_part_bytes,
490 0 : Err(e) => {
491 0 : // It is possible that the branch gets deleted in-between we list the objects
492 0 : // and we download the index part file.
493 0 : errors.push(format!("failed to download index_part.json: {e}"));
494 0 : return Ok(ListTimelineBlobsResult::MissingIndexPart(
495 0 : RemoteTimelineBlobData {
496 0 : blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
497 0 : unused_index_keys: index_part_keys,
498 0 : unknown_keys,
499 0 : },
500 0 : ));
501 : }
502 : };
503 :
504 0 : match serde_json::from_slice(&index_part_bytes) {
505 0 : Ok(index_part) => {
506 0 : return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
507 0 : blob_data: BlobDataParseResult::Parsed {
508 0 : index_part: Box::new(index_part),
509 0 : index_part_generation,
510 0 : s3_layers,
511 0 : },
512 0 : unused_index_keys: index_part_keys,
513 0 : unknown_keys,
514 0 : }))
515 : }
516 0 : Err(index_parse_error) => errors.push(format!(
517 0 : "index_part.json body parsing error: {index_parse_error}"
518 0 : )),
519 : }
520 0 : }
521 :
522 0 : if errors.is_empty() {
523 0 : errors.push(
524 0 : "Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
525 0 : );
526 0 : }
527 :
528 0 : Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
529 0 : blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
530 0 : unused_index_keys: index_part_keys,
531 0 : unknown_keys,
532 0 : }))
533 0 : }
534 :
535 : pub(crate) struct RemoteTenantManifestInfo {
536 : pub(crate) generation: Generation,
537 : pub(crate) manifest: TenantManifest,
538 : pub(crate) listing_object: ListingObject,
539 : }
540 :
541 : pub(crate) enum ListTenantManifestResult {
542 : WithErrors {
543 : errors: Vec<(String, String)>,
544 : #[allow(dead_code)]
545 : unknown_keys: Vec<ListingObject>,
546 : },
547 : NoErrors {
548 : latest_generation: Option<RemoteTenantManifestInfo>,
549 : manifests: Vec<(Generation, ListingObject)>,
550 : },
551 : }
552 :
553 : /// Lists the tenant manifests in remote storage and parses the latest one, returning a [`ListTenantManifestResult`] object.
554 0 : pub(crate) async fn list_tenant_manifests(
555 0 : remote_client: &GenericRemoteStorage,
556 0 : tenant_id: TenantShardId,
557 0 : root_target: &RootTarget,
558 0 : ) -> anyhow::Result<ListTenantManifestResult> {
559 0 : let mut errors = Vec::new();
560 0 : let mut unknown_keys = Vec::new();
561 0 :
562 0 : let mut tenant_root_target = root_target.tenant_root(&tenant_id);
563 0 : let original_prefix = tenant_root_target.prefix_in_bucket.clone();
564 : const TENANT_MANIFEST_STEM: &str = "tenant-manifest";
565 0 : tenant_root_target.prefix_in_bucket += TENANT_MANIFEST_STEM;
566 0 : tenant_root_target.delimiter = String::new();
567 0 :
568 0 : let mut manifests: Vec<(Generation, ListingObject)> = Vec::new();
569 0 :
570 0 : let prefix_str = &original_prefix
571 0 : .strip_prefix("/")
572 0 : .unwrap_or(&original_prefix);
573 0 :
574 0 : let mut stream = std::pin::pin!(stream_listing(remote_client, &tenant_root_target));
575 0 : 'outer: while let Some(obj) = stream.next().await {
576 0 : let (key, Some(obj)) = obj? else {
577 0 : panic!("ListingObject not specified");
578 : };
579 :
580 : 'err: {
581 : // TODO a let chain would be nicer here.
582 0 : let Some(name) = key.object_name() else {
583 0 : break 'err;
584 : };
585 0 : if !name.starts_with(TENANT_MANIFEST_STEM) {
586 0 : break 'err;
587 0 : }
588 0 : let Some(generation) = parse_remote_tenant_manifest_path(key.clone()) else {
589 0 : break 'err;
590 : };
591 0 : tracing::debug!("tenant manifest {key}");
592 0 : manifests.push((generation, obj));
593 0 : continue 'outer;
594 : }
595 0 : tracing::info!("Listed an unknown key: {key}");
596 0 : unknown_keys.push(obj);
597 : }
598 :
599 0 : if !unknown_keys.is_empty() {
600 0 : errors.push(((*prefix_str).to_owned(), "unknown keys listed".to_string()));
601 0 :
602 0 : return Ok(ListTenantManifestResult::WithErrors {
603 0 : errors,
604 0 : unknown_keys,
605 0 : });
606 0 : }
607 0 :
608 0 : if manifests.is_empty() {
609 0 : tracing::debug!("No manifest for timeline.");
610 :
611 0 : return Ok(ListTenantManifestResult::NoErrors {
612 0 : latest_generation: None,
613 0 : manifests,
614 0 : });
615 0 : }
616 0 :
617 0 : // Find the manifest with the highest generation
618 0 : let (latest_generation, latest_listing_object) = manifests
619 0 : .iter()
620 0 : .max_by_key(|i| i.0)
621 0 : .map(|(g, obj)| (*g, obj.clone()))
622 0 : .unwrap();
623 0 :
624 0 : manifests.retain(|(gen, _obj)| gen != &latest_generation);
625 :
626 0 : let manifest_bytes =
627 0 : match download_object_with_retries(remote_client, &latest_listing_object.key).await {
628 0 : Ok(bytes) => bytes,
629 0 : Err(e) => {
630 0 : // It is possible that the tenant gets deleted in-between we list the objects
631 0 : // and we download the manifest file.
632 0 : errors.push((
633 0 : latest_listing_object.key.get_path().as_str().to_owned(),
634 0 : format!("failed to download tenant-manifest.json: {e}"),
635 0 : ));
636 0 : return Ok(ListTenantManifestResult::WithErrors {
637 0 : errors,
638 0 : unknown_keys,
639 0 : });
640 : }
641 : };
642 :
643 0 : match TenantManifest::from_json_bytes(&manifest_bytes) {
644 0 : Ok(manifest) => {
645 0 : return Ok(ListTenantManifestResult::NoErrors {
646 0 : latest_generation: Some(RemoteTenantManifestInfo {
647 0 : generation: latest_generation,
648 0 : manifest,
649 0 : listing_object: latest_listing_object,
650 0 : }),
651 0 : manifests,
652 0 : });
653 : }
654 0 : Err(parse_error) => errors.push((
655 0 : latest_listing_object.key.get_path().as_str().to_owned(),
656 0 : format!("tenant-manifest.json body parsing error: {parse_error}"),
657 0 : )),
658 0 : }
659 0 :
660 0 : if errors.is_empty() {
661 0 : errors.push((
662 0 : (*prefix_str).to_owned(),
663 0 : "Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
664 0 : ));
665 0 : }
666 :
667 0 : Ok(ListTenantManifestResult::WithErrors {
668 0 : errors,
669 0 : unknown_keys,
670 0 : })
671 0 : }
|