Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::time::SystemTime;
3 :
4 : use futures_util::StreamExt;
5 : use itertools::Itertools;
6 : use pageserver::tenant::IndexPart;
7 : use pageserver::tenant::checks::check_valid_layermap;
8 : use pageserver::tenant::layer_map::LayerMap;
9 : use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
10 : use pageserver::tenant::remote_timeline_client::manifest::TenantManifest;
11 : use pageserver::tenant::remote_timeline_client::{
12 : parse_remote_index_path, parse_remote_tenant_manifest_path, remote_layer_path,
13 : };
14 : use pageserver::tenant::storage_layer::LayerName;
15 : use pageserver_api::shard::ShardIndex;
16 : use remote_storage::{DownloadError, GenericRemoteStorage, ListingObject, RemotePath};
17 : use tokio_util::sync::CancellationToken;
18 : use tracing::{info, warn};
19 : use utils::generation::Generation;
20 : use utils::id::TimelineId;
21 : use utils::shard::TenantShardId;
22 :
23 : use crate::cloud_admin_api::BranchData;
24 : use crate::metadata_stream::stream_listing;
25 : use crate::{RootTarget, TenantShardTimelineId, download_object_with_retries};
26 :
27 : pub(crate) struct TimelineAnalysis {
28 : /// Anomalies detected
29 : pub(crate) errors: Vec<String>,
30 :
31 : /// Healthy-but-noteworthy, like old-versioned structures that are readable but
32 : /// worth reporting for awareness that we must not remove that old version decoding
33 : /// yet.
34 : pub(crate) warnings: Vec<String>,
35 :
36 : /// Objects whose keys were not recognized at all, i.e. not layer files, not indices, and not initdb archive.
37 : pub(crate) unknown_keys: Vec<String>,
38 : }
39 :
40 : impl TimelineAnalysis {
41 0 : fn new() -> Self {
42 0 : Self {
43 0 : errors: Vec::new(),
44 0 : warnings: Vec::new(),
45 0 : unknown_keys: Vec::new(),
46 0 : }
47 0 : }
48 :
49 : /// Whether a timeline is healthy.
50 0 : pub(crate) fn is_healthy(&self) -> bool {
51 0 : self.errors.is_empty() && self.warnings.is_empty()
52 0 : }
53 : }
54 :
55 0 : pub(crate) async fn branch_cleanup_and_check_errors(
56 0 : remote_client: &GenericRemoteStorage,
57 0 : id: &TenantShardTimelineId,
58 0 : tenant_objects: &mut TenantObjectListing,
59 0 : s3_active_branch: Option<&BranchData>,
60 0 : console_branch: Option<BranchData>,
61 0 : s3_data: Option<RemoteTimelineBlobData>,
62 0 : ) -> TimelineAnalysis {
63 0 : let mut result = TimelineAnalysis::new();
64 :
65 0 : info!("Checking timeline");
66 :
67 0 : if let Some(s3_active_branch) = s3_active_branch {
68 0 : info!(
69 0 : "Checking console status for timeline for branch {:?}/{:?}",
70 : s3_active_branch.project_id, s3_active_branch.id
71 : );
72 0 : match console_branch {
73 0 : Some(_) => {result.errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
74 : s3_active_branch.id, s3_active_branch.project_id))
75 : },
76 : None => {
77 0 : result.errors.push(format!("Timeline has no branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
78 : s3_active_branch.id, s3_active_branch.project_id))
79 : }
80 : };
81 0 : }
82 :
83 0 : match s3_data {
84 0 : Some(s3_data) => {
85 0 : result
86 0 : .unknown_keys
87 0 : .extend(s3_data.unknown_keys.into_iter().map(|k| k.key.to_string()));
88 :
89 0 : match s3_data.blob_data {
90 : BlobDataParseResult::Parsed {
91 0 : index_part,
92 : index_part_generation: _,
93 : s3_layers: _,
94 0 : index_part_last_modified_time,
95 0 : index_part_snapshot_time,
96 : } => {
97 : // Ignore missing file error if index_part downloaded is different from the one when listing the layer files.
98 0 : let ignore_error = index_part_snapshot_time < index_part_last_modified_time
99 0 : && !cfg!(debug_assertions);
100 0 : if !IndexPart::KNOWN_VERSIONS.contains(&index_part.version()) {
101 0 : result
102 0 : .errors
103 0 : .push(format!("index_part.json version: {}", index_part.version()))
104 0 : }
105 :
106 0 : let mut newest_versions = IndexPart::KNOWN_VERSIONS.iter().rev().take(3);
107 0 : if !newest_versions.any(|ip| ip == &index_part.version()) {
108 0 : info!(
109 0 : "index_part.json version is not latest: {}",
110 0 : index_part.version()
111 : );
112 0 : }
113 :
114 0 : if index_part.metadata.disk_consistent_lsn()
115 0 : != index_part.duplicated_disk_consistent_lsn()
116 : {
117 : // Tech debt: let's get rid of one of these, they are redundant
118 : // https://github.com/neondatabase/neon/issues/8343
119 0 : result.errors.push(format!(
120 0 : "Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
121 0 : index_part.metadata.disk_consistent_lsn(),
122 0 : index_part.duplicated_disk_consistent_lsn(),
123 : ))
124 0 : }
125 :
126 0 : if index_part.layer_metadata.is_empty() {
127 0 : if index_part.metadata.ancestor_timeline().is_none() {
128 0 : // The initial timeline with no ancestor should ALWAYS have layers.
129 0 : result.errors.push(
130 0 : "index_part.json has no layers (ancestor_timeline=None)"
131 0 : .to_string(),
132 0 : );
133 0 : } else {
134 : // Not an error, can happen for branches with zero writes, but notice that
135 0 : info!("index_part.json has no layers (ancestor_timeline exists)");
136 : }
137 0 : }
138 :
139 0 : let layer_names = index_part.layer_metadata.keys().cloned().collect_vec();
140 0 : if let Some(err) = check_valid_layermap(&layer_names) {
141 0 : result.warnings.push(format!(
142 0 : "index_part.json contains invalid layer map structure: {err}"
143 0 : ));
144 0 : }
145 :
146 0 : for (layer, metadata) in index_part.layer_metadata {
147 0 : if metadata.file_size == 0 {
148 0 : result.errors.push(format!(
149 0 : "index_part.json contains a layer {layer} that has 0 size in its layer metadata",
150 : ))
151 0 : }
152 :
153 0 : if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
154 0 : let path = remote_layer_path(
155 0 : &id.tenant_shard_id.tenant_id,
156 0 : &id.timeline_id,
157 0 : metadata.shard,
158 0 : &layer,
159 0 : metadata.generation,
160 : );
161 :
162 : // HEAD request used here to address a race condition when an index was uploaded concurrently
163 : // with our scan. We check if the object is uploaded to S3 after taking the listing snapshot.
164 0 : let response = remote_client
165 0 : .head_object(&path, &CancellationToken::new())
166 0 : .await;
167 :
168 0 : match response {
169 0 : Ok(_) => {}
170 : Err(DownloadError::NotFound) => {
171 : // Object is not present.
172 0 : let is_l0 =
173 0 : LayerMap::is_l0(layer.key_range(), layer.is_delta());
174 :
175 0 : let msg = format!(
176 0 : "index_part.json contains a layer {}{} (shard {}) that is not present in remote storage (layer_is_l0: {})",
177 : layer,
178 0 : metadata.generation.get_suffix(),
179 : metadata.shard,
180 : is_l0,
181 : );
182 :
183 0 : if is_l0 || ignore_error {
184 0 : result.warnings.push(msg);
185 0 : } else {
186 0 : result.errors.push(msg);
187 0 : }
188 : }
189 0 : Err(e) => {
190 0 : tracing::warn!(
191 0 : "cannot check if the layer {}{} is present in remote storage (error: {})",
192 : layer,
193 0 : metadata.generation.get_suffix(),
194 : e,
195 : );
196 : }
197 : }
198 0 : }
199 : }
200 : }
201 0 : BlobDataParseResult::Relic => {}
202 : BlobDataParseResult::Incorrect {
203 0 : errors,
204 : s3_layers: _,
205 0 : } => result.errors.extend(
206 0 : errors
207 0 : .into_iter()
208 0 : .map(|error| format!("parse error: {error}")),
209 : ),
210 : }
211 : }
212 0 : None => result
213 0 : .errors
214 0 : .push("Timeline has no data on S3 at all".to_string()),
215 : }
216 :
217 0 : if result.errors.is_empty() {
218 0 : info!("No check errors found");
219 : } else {
220 0 : warn!("Timeline metadata errors: {0:?}", result.errors);
221 : }
222 :
223 0 : if !result.warnings.is_empty() {
224 0 : warn!("Timeline metadata warnings: {0:?}", result.warnings);
225 0 : }
226 :
227 0 : if !result.unknown_keys.is_empty() {
228 0 : warn!(
229 0 : "The following keys are not recognized: {0:?}",
230 : result.unknown_keys
231 : )
232 0 : }
233 :
234 0 : result
235 0 : }
236 :
237 : #[derive(Default)]
238 : pub(crate) struct LayerRef {
239 : ref_count: usize,
240 : }
241 :
242 : /// Top-level index of objects in a tenant. This may be used by any shard-timeline within
243 : /// the tenant to query whether an object exists.
244 : #[derive(Default)]
245 : pub(crate) struct TenantObjectListing {
246 : shard_timelines: HashMap<(ShardIndex, TimelineId), HashMap<(LayerName, Generation), LayerRef>>,
247 : }
248 :
249 : impl TenantObjectListing {
250 : /// Having done an S3 listing of the keys within a timeline prefix, merge them into the overall
251 : /// list of layer keys for the Tenant.
252 0 : pub(crate) fn push(
253 0 : &mut self,
254 0 : ttid: TenantShardTimelineId,
255 0 : layers: HashSet<(LayerName, Generation)>,
256 0 : ) {
257 0 : let shard_index = ShardIndex::new(
258 0 : ttid.tenant_shard_id.shard_number,
259 0 : ttid.tenant_shard_id.shard_count,
260 : );
261 0 : let replaced = self.shard_timelines.insert(
262 0 : (shard_index, ttid.timeline_id),
263 0 : layers
264 0 : .into_iter()
265 0 : .map(|l| (l, LayerRef::default()))
266 0 : .collect(),
267 : );
268 :
269 0 : assert!(
270 0 : replaced.is_none(),
271 0 : "Built from an S3 object listing, which should never repeat a key"
272 : );
273 0 : }
274 :
275 : /// Having loaded a timeline index, check if a layer referenced by the index exists. If it does,
276 : /// the layer's refcount will be incremented. Later, after calling this for all references in all indices
277 : /// in a tenant, orphan layers may be detected by their zero refcounts.
278 : ///
279 : /// Returns true if the layer exists
280 0 : pub(crate) fn check_ref(
281 0 : &mut self,
282 0 : timeline_id: TimelineId,
283 0 : layer_file: &LayerName,
284 0 : metadata: &LayerFileMetadata,
285 0 : ) -> bool {
286 0 : let Some(shard_tl) = self.shard_timelines.get_mut(&(metadata.shard, timeline_id)) else {
287 0 : return false;
288 : };
289 :
290 0 : let Some(layer_ref) = shard_tl.get_mut(&(layer_file.clone(), metadata.generation)) else {
291 0 : return false;
292 : };
293 :
294 0 : layer_ref.ref_count += 1;
295 :
296 0 : true
297 0 : }
298 :
299 0 : pub(crate) fn get_orphans(&self) -> Vec<(ShardIndex, TimelineId, LayerName, Generation)> {
300 0 : let mut result = Vec::new();
301 0 : for ((shard_index, timeline_id), layers) in &self.shard_timelines {
302 0 : for ((layer_file, generation), layer_ref) in layers {
303 0 : if layer_ref.ref_count == 0 {
304 0 : result.push((*shard_index, *timeline_id, layer_file.clone(), *generation))
305 0 : }
306 : }
307 : }
308 :
309 0 : result
310 0 : }
311 : }
312 :
313 : #[derive(Debug)]
314 : pub(crate) struct RemoteTimelineBlobData {
315 : pub(crate) blob_data: BlobDataParseResult,
316 :
317 : /// Index objects that were not used when loading `blob_data`, e.g. those from old generations
318 : pub(crate) unused_index_keys: Vec<ListingObject>,
319 :
320 : /// Objects whose keys were not recognized at all, i.e. not layer files, not indices
321 : pub(crate) unknown_keys: Vec<ListingObject>,
322 : }
323 :
324 : #[derive(Debug)]
325 : pub(crate) enum BlobDataParseResult {
326 : Parsed {
327 : index_part: Box<IndexPart>,
328 : index_part_generation: Generation,
329 : index_part_last_modified_time: SystemTime,
330 : index_part_snapshot_time: SystemTime,
331 : s3_layers: HashSet<(LayerName, Generation)>,
332 : },
333 : /// The remains of an uncleanly deleted Timeline or aborted timeline creation(e.g. an initdb archive only, or some layer without an index)
334 : Relic,
335 : Incorrect {
336 : errors: Vec<String>,
337 : s3_layers: HashSet<(LayerName, Generation)>,
338 : },
339 : }
340 :
341 0 : pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> {
342 0 : match name.rsplit_once('-') {
343 : // FIXME: this is gross, just use a regex?
344 0 : Some((layer_filename, gen_)) if gen_.len() == 8 => {
345 0 : let layer = layer_filename.parse::<LayerName>()?;
346 0 : let gen_ =
347 0 : Generation::parse_suffix(gen_).ok_or("Malformed generation suffix".to_string())?;
348 0 : Ok((layer, gen_))
349 : }
350 0 : _ => Ok((name.parse::<LayerName>()?, Generation::none())),
351 : }
352 0 : }
353 :
354 : /// Note (<https://github.com/neondatabase/neon/issues/8872>):
355 : /// Since we do not gurantee the order of the listing, we could list layer keys right before
356 : /// pageserver `RemoteTimelineClient` deletes the layer files and then the index.
357 : /// In the rare case, this would give back a transient error where the index key is missing.
358 : ///
359 : /// To avoid generating false positive, we try streaming the listing for a second time.
360 0 : pub(crate) async fn list_timeline_blobs(
361 0 : remote_client: &GenericRemoteStorage,
362 0 : id: TenantShardTimelineId,
363 0 : root_target: &RootTarget,
364 0 : ) -> anyhow::Result<RemoteTimelineBlobData> {
365 0 : let res = list_timeline_blobs_impl(remote_client, id, root_target).await?;
366 0 : match res {
367 0 : ListTimelineBlobsResult::Ready(data) => Ok(data),
368 : ListTimelineBlobsResult::MissingIndexPart(_) => {
369 0 : tracing::warn!("listing raced with removal of an index, retrying");
370 : // Retry if listing raced with removal of an index
371 0 : let data = list_timeline_blobs_impl(remote_client, id, root_target)
372 0 : .await?
373 0 : .into_data();
374 0 : Ok(data)
375 : }
376 : }
377 0 : }
378 :
379 : enum ListTimelineBlobsResult {
380 : /// Blob data is ready to be intepreted.
381 : Ready(RemoteTimelineBlobData),
382 : /// The listing contained an index but when we tried to fetch it, we couldn't
383 : MissingIndexPart(RemoteTimelineBlobData),
384 : }
385 :
386 : impl ListTimelineBlobsResult {
387 : /// Get the inner blob data regardless the status.
388 0 : pub fn into_data(self) -> RemoteTimelineBlobData {
389 0 : match self {
390 0 : ListTimelineBlobsResult::Ready(data) => data,
391 0 : ListTimelineBlobsResult::MissingIndexPart(data) => data,
392 : }
393 0 : }
394 : }
395 :
396 : /// Returns [`ListTimelineBlobsResult::MissingIndexPart`] if blob data has layer files
397 : /// but is missing [`IndexPart`], otherwise returns [`ListTimelineBlobsResult::Ready`].
398 0 : async fn list_timeline_blobs_impl(
399 0 : remote_client: &GenericRemoteStorage,
400 0 : id: TenantShardTimelineId,
401 0 : root_target: &RootTarget,
402 0 : ) -> anyhow::Result<ListTimelineBlobsResult> {
403 0 : let mut s3_layers = HashSet::new();
404 :
405 0 : let mut errors = Vec::new();
406 0 : let mut unknown_keys = Vec::new();
407 :
408 0 : let mut timeline_dir_target = root_target.timeline_root(&id);
409 0 : timeline_dir_target.delimiter = String::new();
410 :
411 0 : let mut index_part_keys: Vec<ListingObject> = Vec::new();
412 0 : let mut initdb_archive: bool = false;
413 :
414 0 : let prefix_str = &timeline_dir_target
415 0 : .prefix_in_bucket
416 0 : .strip_prefix("/")
417 0 : .unwrap_or(&timeline_dir_target.prefix_in_bucket);
418 :
419 0 : let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
420 0 : while let Some(obj) = stream.next().await {
421 0 : let (key, Some(obj)) = obj? else {
422 0 : panic!("ListingObject not specified");
423 : };
424 :
425 0 : let blob_name = key.get_path().as_str().strip_prefix(prefix_str);
426 0 : match blob_name {
427 0 : Some(name) if name.starts_with("index_part.json") => {
428 0 : tracing::debug!("Index key {key}");
429 0 : index_part_keys.push(obj)
430 : }
431 0 : Some("initdb.tar.zst") => {
432 0 : tracing::debug!("initdb archive {key}");
433 0 : initdb_archive = true;
434 : }
435 0 : Some("initdb-preserved.tar.zst") => {
436 0 : tracing::info!("initdb archive preserved {key}");
437 : }
438 0 : Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
439 0 : Ok((new_layer, gen_)) => {
440 0 : tracing::debug!("Parsed layer key: {new_layer} {gen_:?}");
441 0 : s3_layers.insert((new_layer, gen_));
442 : }
443 0 : Err(e) => {
444 0 : tracing::info!("Error parsing {maybe_layer_name} as layer name: {e}");
445 0 : unknown_keys.push(obj);
446 : }
447 : },
448 : None => {
449 0 : tracing::info!("S3 listed an unknown key: {key}");
450 0 : unknown_keys.push(obj);
451 : }
452 : }
453 : }
454 :
455 0 : if index_part_keys.is_empty() && s3_layers.is_empty() {
456 0 : tracing::info!("Timeline is empty: expected post-deletion state.");
457 0 : if initdb_archive {
458 0 : tracing::info!("Timeline is post deletion but initdb archive is still present.");
459 0 : }
460 :
461 0 : return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
462 0 : blob_data: BlobDataParseResult::Relic,
463 0 : unused_index_keys: index_part_keys,
464 0 : unknown_keys,
465 0 : }));
466 0 : }
467 :
468 : // Choose the index_part with the highest generation
469 0 : let (index_part_object, index_part_generation) = match index_part_keys
470 0 : .iter()
471 0 : .filter_map(|key| {
472 : // Stripping the index key to the last part, because RemotePath doesn't
473 : // like absolute paths, and depending on prefix_in_bucket it's possible
474 : // for the keys we read back to start with a slash.
475 0 : let basename = key.key.get_path().as_str().rsplit_once('/').unwrap().1;
476 0 : parse_remote_index_path(RemotePath::from_string(basename).unwrap()).map(|g| (key, g))
477 0 : })
478 0 : .max_by_key(|i| i.1)
479 0 : .map(|(k, g)| (k.clone(), g))
480 : {
481 0 : Some((key, gen_)) => (Some::<ListingObject>(key.to_owned()), gen_),
482 : None => {
483 : // Legacy/missing case: one or zero index parts, which did not have a generation
484 0 : (index_part_keys.pop(), Generation::none())
485 : }
486 : };
487 :
488 0 : match index_part_object.as_ref() {
489 0 : Some(selected) => index_part_keys.retain(|k| k != selected),
490 : None => {
491 : // This case does not indicate corruption, but it should be very unusual. It can
492 : // happen if:
493 : // - timeline creation is in progress (first layer is written before index is written)
494 : // - timeline deletion happened while a stale pageserver was still attached, it might upload
495 : // a layer after the deletion is done.
496 0 : tracing::info!(
497 0 : "S3 list response got no index_part.json file but still has layer files"
498 : );
499 0 : return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
500 0 : blob_data: BlobDataParseResult::Relic,
501 0 : unused_index_keys: index_part_keys,
502 0 : unknown_keys,
503 0 : }));
504 : }
505 : }
506 :
507 0 : if let Some(index_part_object_key) = index_part_object.as_ref() {
508 0 : let (index_part_bytes, index_part_last_modified_time) =
509 0 : match download_object_with_retries(remote_client, &index_part_object_key.key).await {
510 0 : Ok(data) => data,
511 0 : Err(e) => {
512 : // It is possible that the branch gets deleted in-between we list the objects
513 : // and we download the index part file.
514 0 : errors.push(format!("failed to download index_part.json: {e}"));
515 0 : return Ok(ListTimelineBlobsResult::MissingIndexPart(
516 0 : RemoteTimelineBlobData {
517 0 : blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
518 0 : unused_index_keys: index_part_keys,
519 0 : unknown_keys,
520 0 : },
521 0 : ));
522 : }
523 : };
524 0 : let index_part_snapshot_time = index_part_object_key.last_modified;
525 0 : match serde_json::from_slice(&index_part_bytes) {
526 0 : Ok(index_part) => {
527 0 : return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
528 0 : blob_data: BlobDataParseResult::Parsed {
529 0 : index_part: Box::new(index_part),
530 0 : index_part_generation,
531 0 : s3_layers,
532 0 : index_part_last_modified_time,
533 0 : index_part_snapshot_time,
534 0 : },
535 0 : unused_index_keys: index_part_keys,
536 0 : unknown_keys,
537 0 : }));
538 : }
539 0 : Err(index_parse_error) => errors.push(format!(
540 0 : "index_part.json body parsing error: {index_parse_error}"
541 : )),
542 : }
543 0 : }
544 :
545 0 : if errors.is_empty() {
546 0 : errors.push(
547 0 : "Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
548 0 : );
549 0 : }
550 :
551 0 : Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
552 0 : blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
553 0 : unused_index_keys: index_part_keys,
554 0 : unknown_keys,
555 0 : }))
556 0 : }
557 :
558 : pub(crate) struct RemoteTenantManifestInfo {
559 : pub(crate) generation: Generation,
560 : pub(crate) manifest: TenantManifest,
561 : pub(crate) listing_object: ListingObject,
562 : }
563 :
564 : pub(crate) enum ListTenantManifestResult {
565 : WithErrors {
566 : errors: Vec<(String, String)>,
567 : #[allow(dead_code)]
568 : unknown_keys: Vec<ListingObject>,
569 : },
570 : NoErrors {
571 : latest_generation: Option<RemoteTenantManifestInfo>,
572 : manifests: Vec<(Generation, ListingObject)>,
573 : },
574 : }
575 :
576 : /// Lists the tenant manifests in remote storage and parses the latest one, returning a [`ListTenantManifestResult`] object.
577 0 : pub(crate) async fn list_tenant_manifests(
578 0 : remote_client: &GenericRemoteStorage,
579 0 : tenant_id: TenantShardId,
580 0 : root_target: &RootTarget,
581 0 : ) -> anyhow::Result<ListTenantManifestResult> {
582 0 : let mut errors = Vec::new();
583 0 : let mut unknown_keys = Vec::new();
584 :
585 0 : let mut tenant_root_target = root_target.tenant_root(&tenant_id);
586 0 : let original_prefix = tenant_root_target.prefix_in_bucket.clone();
587 : const TENANT_MANIFEST_STEM: &str = "tenant-manifest";
588 0 : tenant_root_target.prefix_in_bucket += TENANT_MANIFEST_STEM;
589 0 : tenant_root_target.delimiter = String::new();
590 :
591 0 : let mut manifests: Vec<(Generation, ListingObject)> = Vec::new();
592 :
593 0 : let prefix_str = &original_prefix
594 0 : .strip_prefix("/")
595 0 : .unwrap_or(&original_prefix);
596 :
597 0 : let mut stream = std::pin::pin!(stream_listing(remote_client, &tenant_root_target));
598 0 : 'outer: while let Some(obj) = stream.next().await {
599 0 : let (key, Some(obj)) = obj? else {
600 0 : panic!("ListingObject not specified");
601 : };
602 :
603 : 'err: {
604 : // TODO a let chain would be nicer here.
605 0 : let Some(name) = key.object_name() else {
606 0 : break 'err;
607 : };
608 0 : if !name.starts_with(TENANT_MANIFEST_STEM) {
609 0 : break 'err;
610 0 : }
611 0 : let Some(generation) = parse_remote_tenant_manifest_path(key.clone()) else {
612 0 : break 'err;
613 : };
614 0 : tracing::debug!("tenant manifest {key}");
615 0 : manifests.push((generation, obj));
616 0 : continue 'outer;
617 : }
618 0 : tracing::info!("Listed an unknown key: {key}");
619 0 : unknown_keys.push(obj);
620 : }
621 :
622 0 : if !unknown_keys.is_empty() {
623 0 : errors.push(((*prefix_str).to_owned(), "unknown keys listed".to_string()));
624 :
625 0 : return Ok(ListTenantManifestResult::WithErrors {
626 0 : errors,
627 0 : unknown_keys,
628 0 : });
629 0 : }
630 :
631 0 : if manifests.is_empty() {
632 0 : tracing::debug!("No manifest for timeline.");
633 :
634 0 : return Ok(ListTenantManifestResult::NoErrors {
635 0 : latest_generation: None,
636 0 : manifests,
637 0 : });
638 0 : }
639 :
640 : // Find the manifest with the highest generation
641 0 : let (latest_generation, latest_listing_object) = manifests
642 0 : .iter()
643 0 : .max_by_key(|i| i.0)
644 0 : .map(|(g, obj)| (*g, obj.clone()))
645 0 : .unwrap();
646 :
647 0 : manifests.retain(|(gen_, _obj)| gen_ != &latest_generation);
648 :
649 0 : let manifest_bytes =
650 0 : match download_object_with_retries(remote_client, &latest_listing_object.key).await {
651 0 : Ok((bytes, _)) => bytes,
652 0 : Err(e) => {
653 : // It is possible that the tenant gets deleted in-between we list the objects
654 : // and we download the manifest file.
655 0 : errors.push((
656 0 : latest_listing_object.key.get_path().as_str().to_owned(),
657 0 : format!("failed to download tenant-manifest.json: {e}"),
658 0 : ));
659 0 : return Ok(ListTenantManifestResult::WithErrors {
660 0 : errors,
661 0 : unknown_keys,
662 0 : });
663 : }
664 : };
665 :
666 0 : match TenantManifest::from_json_bytes(&manifest_bytes) {
667 0 : Ok(manifest) => {
668 0 : return Ok(ListTenantManifestResult::NoErrors {
669 0 : latest_generation: Some(RemoteTenantManifestInfo {
670 0 : generation: latest_generation,
671 0 : manifest,
672 0 : listing_object: latest_listing_object,
673 0 : }),
674 0 : manifests,
675 0 : });
676 : }
677 0 : Err(parse_error) => errors.push((
678 0 : latest_listing_object.key.get_path().as_str().to_owned(),
679 0 : format!("tenant-manifest.json body parsing error: {parse_error}"),
680 0 : )),
681 : }
682 :
683 0 : if errors.is_empty() {
684 0 : errors.push((
685 0 : (*prefix_str).to_owned(),
686 0 : "Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
687 0 : ));
688 0 : }
689 :
690 0 : Ok(ListTenantManifestResult::WithErrors {
691 0 : errors,
692 0 : unknown_keys,
693 0 : })
694 0 : }
|